package org.apache.kafka.streams.processor.internals;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.admin.AdminClientTestUtils;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ListOffsetsOptions;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.test.MockStandbyUpdateListener;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@MockitoSettings(strictness = Strictness.STRICT_STUBS)
@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.class */
public class StoreChangelogReaderTest {

    @Mock
    private ProcessorStateManager stateManager;

    @Mock
    private ProcessorStateManager activeStateManager;

    @Mock
    private ProcessorStateManager standbyStateManager;

    @Mock
    private ProcessorStateManager.StateStoreMetadata storeMetadata;

    @Mock
    private ProcessorStateManager.StateStoreMetadata storeMetadataOne;

    @Mock
    private ProcessorStateManager.StateStoreMetadata storeMetadataTwo;

    @Mock
    private StateStore store;

    @Parameterized.Parameter
    public Task.TaskType type;

    @Rule
    public final MockitoRule mockitoRule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
    private final String storeName = "store";
    private final String topicName = AssignmentTestUtils.TOPIC_PREFIX;
    private final LogContext logContext = new LogContext("test-reader ");
    private final TopicPartition tp = new TopicPartition(AssignmentTestUtils.TOPIC_PREFIX, 0);
    private final TopicPartition tp1 = new TopicPartition("one", 0);
    private final TopicPartition tp2 = new TopicPartition("two", 0);
    private final StreamsConfig config = new StreamsConfig(StreamsTestUtils.getStreamsConfig("test-reader"));
    private final MockTime time = new MockTime();
    private final MockStateRestoreListener callback = new MockStateRestoreListener();
    private final KafkaException kaboom = new KafkaException("KABOOM!");
    private final MockStandbyUpdateListener standbyListener = new MockStandbyUpdateListener();
    private final MockStateRestoreListener exceptionCallback = new MockStateRestoreListener() { // from class: org.apache.kafka.streams.processor.internals.StoreChangelogReaderTest.1
        @Override // org.apache.kafka.test.MockStateRestoreListener
        public void onRestoreStart(TopicPartition topicPartition, String str, long j, long j2) {
            throw StoreChangelogReaderTest.this.kaboom;
        }

        @Override // org.apache.kafka.test.MockStateRestoreListener
        public void onBatchRestored(TopicPartition topicPartition, String str, long j, long j2) {
            throw StoreChangelogReaderTest.this.kaboom;
        }

        @Override // org.apache.kafka.test.MockStateRestoreListener
        public void onRestoreEnd(TopicPartition topicPartition, String str, long j) {
            throw StoreChangelogReaderTest.this.kaboom;
        }
    };
    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    private final MockAdminClient adminClient = new MockAdminClient();
    private final StoreChangelogReader changelogReader = new StoreChangelogReader(this.time, this.config, this.logContext, this.adminClient, this.consumer, this.callback, this.standbyListener);

    @Parameterized.Parameters
    public static Object[] data() {
        return new Object[]{Task.TaskType.STANDBY, Task.TaskType.ACTIVE};
    }

    private void setupStateManagerMock() {
        Mockito.when(this.stateManager.storeMetadata(this.tp)).thenReturn(this.storeMetadata);
        Mockito.when(this.stateManager.taskType()).thenReturn(this.type);
    }

    private void setupActiveStateManager() {
        Mockito.when(this.activeStateManager.storeMetadata(this.tp)).thenReturn(this.storeMetadata);
        Mockito.when(this.activeStateManager.taskType()).thenReturn(Task.TaskType.ACTIVE);
    }

    private void setupStandbyStateManager() {
        Mockito.when(this.standbyStateManager.storeMetadata(this.tp)).thenReturn(this.storeMetadata);
        Mockito.when(this.standbyStateManager.taskType()).thenReturn(Task.TaskType.STANDBY);
    }

    private void setupStoreMetadata() {
        Mockito.when(this.storeMetadata.changelogPartition()).thenReturn(this.tp);
        Mockito.when(this.storeMetadata.store()).thenReturn(this.store);
    }

    private void setupStore() {
        Mockito.when(this.store.name()).thenReturn("store");
    }

    @Test
    public void shouldNotRegisterSameStoreMultipleTimes() {
        setupStateManagerMock();
        this.changelogReader.register(this.tp, this.stateManager);
        Assert.assertEquals(StoreChangelogReader.ChangelogState.REGISTERED, this.changelogReader.changelogMetadata(this.tp).state());
        Assert.assertNull(this.changelogReader.changelogMetadata(this.tp).endOffset());
        Assert.assertEquals(0L, this.changelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertThrows(IllegalStateException.class, () -> {
            this.changelogReader.register(this.tp, this.stateManager);
        });
    }

    @Test
    public void shouldNotRegisterStoreWithoutMetadata() {
        Assert.assertThrows(IllegalStateException.class, () -> {
            this.changelogReader.register(new TopicPartition("ChangelogWithoutStoreMetadata", 0), this.stateManager);
        });
    }

    @Test
    public void shouldSupportUnregisterChangelogBeforeInitialization() {
        setupStateManagerMock();
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 100L));
        StoreChangelogReader storeChangelogReader = new StoreChangelogReader(this.time, this.config, this.logContext, this.adminClient, this.consumer, this.callback, this.standbyListener);
        storeChangelogReader.register(this.tp, this.stateManager);
        if (this.type == Task.TaskType.STANDBY) {
            storeChangelogReader.transitToUpdateStandby();
        }
        storeChangelogReader.unregister(Collections.singleton(this.tp));
        Assert.assertEquals(Collections.emptySet(), this.consumer.assignment());
        Assert.assertNull(this.callback.restoreTopicPartition);
        Assert.assertNull(this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_START));
        Assert.assertNull(this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_SUSPENDED));
        Assert.assertNull(this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_BATCH));
        Assert.assertNull(this.standbyListener.capturedStore(MockStandbyUpdateListener.UPDATE_SUSPENDED));
        Assert.assertNull(this.standbyListener.capturedStore(MockStandbyUpdateListener.UPDATE_START));
        Assert.assertNull(this.standbyListener.capturedStore(MockStandbyUpdateListener.UPDATE_START));
        Assert.assertNull(this.standbyListener.capturedStore(MockStandbyUpdateListener.UPDATE_BATCH));
    }

    @Test
    public void shouldSupportUnregisterChangelogBeforeCompletion() {
        setupStateManagerMock();
        setupStoreMetadata();
        setupStore();
        Map map = (Map) Mockito.mock(Map.class);
        Mockito.when(map.get(null)).thenReturn(Mockito.mock(Task.class));
        Mockito.when(Boolean.valueOf(map.containsKey(null))).thenReturn(true);
        Mockito.when(this.storeMetadata.offset()).thenReturn(9L);
        if (this.type == Task.TaskType.STANDBY) {
            Mockito.when(this.storeMetadata.endOffset()).thenReturn(10L);
            Mockito.when(Boolean.valueOf(this.stateManager.changelogAsSource(this.tp))).thenReturn(true);
        }
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 100L));
        StoreChangelogReader storeChangelogReader = new StoreChangelogReader(this.time, this.config, this.logContext, this.adminClient, this.consumer, this.callback, this.standbyListener);
        storeChangelogReader.register(this.tp, this.stateManager);
        if (this.type == Task.TaskType.STANDBY) {
            storeChangelogReader.transitToUpdateStandby();
        }
        storeChangelogReader.restore(map);
        Assert.assertEquals(0L, storeChangelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals(Collections.emptySet(), storeChangelogReader.completedChangelogs());
        Assert.assertEquals(10L, this.consumer.position(this.tp));
        Assert.assertEquals(Collections.emptySet(), this.consumer.paused());
        Assert.assertEquals(Collections.singleton(this.tp), this.consumer.assignment());
        storeChangelogReader.unregister(Collections.singleton(this.tp));
        Assert.assertEquals(Collections.emptySet(), this.consumer.assignment());
        if (this.type == Task.TaskType.ACTIVE) {
            Assert.assertEquals(this.tp, this.callback.restoreTopicPartition);
            Assert.assertEquals("store", this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_START));
            Assert.assertEquals("store", this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_SUSPENDED));
        } else {
            Assert.assertNull(this.callback.restoreTopicPartition);
            Assert.assertNull(this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_START));
            Assert.assertNull(this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_SUSPENDED));
            Assert.assertEquals("store", this.standbyListener.capturedStore(MockStandbyUpdateListener.UPDATE_START));
            Assert.assertEquals(this.tp, this.standbyListener.updatePartition);
        }
        Assert.assertNull(this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_BATCH));
    }

    @Test
    public void shouldSupportUnregisterChangelogAfterCompletion() {
        setupStateManagerMock();
        setupStoreMetadata();
        setupStore();
        Map map = (Map) Mockito.mock(Map.class);
        Mockito.when(map.get(null)).thenReturn(Mockito.mock(Task.class));
        Mockito.when(Boolean.valueOf(map.containsKey(null))).thenReturn(true);
        Mockito.when(this.storeMetadata.offset()).thenReturn(9L);
        if (this.type == Task.TaskType.STANDBY) {
            Mockito.when(this.storeMetadata.endOffset()).thenReturn(10L);
            Mockito.when(Boolean.valueOf(this.stateManager.changelogAsSource(this.tp))).thenReturn(true);
        }
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 10L));
        StoreChangelogReader storeChangelogReader = new StoreChangelogReader(this.time, this.config, this.logContext, this.adminClient, this.consumer, this.callback, this.standbyListener);
        storeChangelogReader.register(this.tp, this.stateManager);
        if (this.type == Task.TaskType.STANDBY) {
            storeChangelogReader.transitToUpdateStandby();
        }
        storeChangelogReader.restore(map);
        Assert.assertEquals(0L, storeChangelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals(10L, this.consumer.position(this.tp));
        Assert.assertEquals(Collections.singleton(this.tp), this.consumer.assignment());
        if (this.type == Task.TaskType.ACTIVE) {
            Assert.assertEquals(Collections.singleton(this.tp), storeChangelogReader.completedChangelogs());
            Assert.assertEquals(Collections.singleton(this.tp), this.consumer.paused());
        } else {
            Assert.assertEquals(Collections.emptySet(), storeChangelogReader.completedChangelogs());
            Assert.assertEquals(Collections.emptySet(), this.consumer.paused());
        }
        storeChangelogReader.unregister(Collections.singleton(this.tp));
        Assert.assertEquals(Collections.emptySet(), this.consumer.assignment());
        if (this.type != Task.TaskType.ACTIVE) {
            Assert.assertNull(this.callback.storeNameCalledStates.get(MockStandbyUpdateListener.UPDATE_SUSPENDED));
            Assert.assertNull(this.callback.storeNameCalledStates.get(MockStandbyUpdateListener.UPDATE_BATCH));
            Assert.assertEquals("store", this.standbyListener.capturedStore(MockStandbyUpdateListener.UPDATE_START));
            Assert.assertEquals(this.tp, this.standbyListener.updatePartition);
            return;
        }
        Assert.assertEquals(this.tp, this.callback.restoreTopicPartition);
        Assert.assertEquals("store", this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_START));
        Assert.assertEquals("store", this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_END));
        Assert.assertNull(this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_SUSPENDED));
        Assert.assertNull(this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_BATCH));
    }

    @Test
    public void shouldInitializeChangelogAndCheckForCompletion() {
        setupStateManagerMock();
        setupStoreMetadata();
        setupStore();
        Map map = (Map) Mockito.mock(Map.class);
        Mockito.when(map.get(null)).thenReturn(Mockito.mock(Task.class));
        Mockito.when(Boolean.valueOf(map.containsKey(null))).thenReturn(true);
        Mockito.when(this.storeMetadata.offset()).thenReturn(9L);
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 10L));
        StoreChangelogReader storeChangelogReader = new StoreChangelogReader(this.time, this.config, this.logContext, this.adminClient, this.consumer, this.callback, this.standbyListener);
        storeChangelogReader.register(this.tp, this.stateManager);
        storeChangelogReader.restore(map);
        Assert.assertEquals(this.type == Task.TaskType.ACTIVE ? StoreChangelogReader.ChangelogState.COMPLETED : StoreChangelogReader.ChangelogState.RESTORING, storeChangelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals(this.type == Task.TaskType.ACTIVE ? 10L : null, storeChangelogReader.changelogMetadata(this.tp).endOffset());
        Assert.assertEquals(0L, storeChangelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals(this.type == Task.TaskType.ACTIVE ? Collections.singleton(this.tp) : Collections.emptySet(), storeChangelogReader.completedChangelogs());
        Assert.assertEquals(10L, this.consumer.position(this.tp));
        Assert.assertEquals(Collections.singleton(this.tp), this.consumer.paused());
        if (this.type == Task.TaskType.ACTIVE) {
            Assert.assertEquals(this.tp, this.callback.restoreTopicPartition);
            Assert.assertEquals("store", this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_START));
            Assert.assertEquals("store", this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_END));
            Assert.assertNull(this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_BATCH));
        }
    }

    @Test
    public void shouldTriggerRestoreListenerWithOffsetZeroIfPositionThrowsTimeoutException() {
        if (this.type == Task.TaskType.ACTIVE) {
            setupStateManagerMock();
            setupStoreMetadata();
            Map map = (Map) Mockito.mock(Map.class);
            Mockito.when(map.get(null)).thenReturn(Mockito.mock(Task.class));
            Mockito.when(Boolean.valueOf(map.containsKey(null))).thenReturn(true);
            Mockito.when(this.stateManager.changelogOffsets()).thenReturn(Collections.singletonMap(this.tp, 5L));
            this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 10L));
            MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) { // from class: org.apache.kafka.streams.processor.internals.StoreChangelogReaderTest.2
                public long position(TopicPartition topicPartition) {
                    throw new TimeoutException("KABOOM!");
                }
            };
            mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.tp, 5L));
            StoreChangelogReader storeChangelogReader = new StoreChangelogReader(this.time, this.config, this.logContext, this.adminClient, mockConsumer, this.callback, this.standbyListener);
            storeChangelogReader.register(this.tp, this.stateManager);
            storeChangelogReader.restore(map);
            MatcherAssert.assertThat(Long.valueOf(this.callback.restoreStartOffset), Matchers.equalTo(0L));
        }
    }

    @Test
    public void shouldPollWithRightTimeoutWithStateUpdater() {
        setupStateManagerMock();
        setupStoreMetadata();
        setupStore();
        shouldPollWithRightTimeout(true);
    }

    @Test
    public void shouldPollWithRightTimeoutWithoutStateUpdater() {
        setupStateManagerMock();
        setupStoreMetadata();
        setupStore();
        shouldPollWithRightTimeout(false);
    }

    private void shouldPollWithRightTimeout(boolean z) {
        Properties properties = new Properties();
        properties.put("__state.updater.enabled__", Boolean.valueOf(z));
        shouldPollWithRightTimeout(properties);
    }

    @Test
    public void shouldPollWithRightTimeoutWithStateUpdaterDefault() {
        setupStateManagerMock();
        setupStoreMetadata();
        setupStore();
        shouldPollWithRightTimeout(new Properties());
    }

    private void shouldPollWithRightTimeout(Properties properties) {
        TaskId taskId = new TaskId(0, 0);
        Mockito.when(this.storeMetadata.offset()).thenReturn((Object) null).thenReturn(9L);
        Mockito.when(this.stateManager.taskId()).thenReturn(taskId);
        this.consumer.updateBeginningOffsets(Collections.singletonMap(this.tp, 5L));
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 11L));
        StreamsConfig streamsConfig = new StreamsConfig(StreamsTestUtils.getStreamsConfig("test-reader", properties));
        StoreChangelogReader storeChangelogReader = new StoreChangelogReader(this.time, streamsConfig, this.logContext, this.adminClient, this.consumer, this.callback, this.standbyListener);
        storeChangelogReader.register(this.tp, this.stateManager);
        if (this.type == Task.TaskType.STANDBY) {
            storeChangelogReader.transitToUpdateStandby();
        }
        storeChangelogReader.restore(Collections.singletonMap(taskId, Mockito.mock(Task.class)));
        if (this.type == Task.TaskType.ACTIVE) {
            Assert.assertEquals(Duration.ofMillis(streamsConfig.getLong("poll.ms").longValue()), this.consumer.lastPollTimeout());
        } else if (!properties.containsKey("__state.updater.enabled__") || ((Boolean) properties.get("__state.updater.enabled__")).booleanValue()) {
            Assert.assertEquals(Duration.ofMillis(streamsConfig.getLong("poll.ms").longValue()), this.consumer.lastPollTimeout());
        } else {
            Assert.assertEquals(Duration.ZERO, this.consumer.lastPollTimeout());
        }
    }

    @Test
    public void shouldRestoreFromPositionAndCheckForCompletion() {
        setupStateManagerMock();
        setupStoreMetadata();
        setupStore();
        TaskId taskId = new TaskId(0, 0);
        Mockito.when(this.storeMetadata.offset()).thenReturn(5L);
        if (this.type == Task.TaskType.STANDBY) {
            Mockito.when(this.storeMetadata.endOffset()).thenReturn(10L);
        }
        Mockito.when(this.stateManager.taskId()).thenReturn(taskId);
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 10L));
        StoreChangelogReader storeChangelogReader = new StoreChangelogReader(this.time, this.config, this.logContext, this.adminClient, this.consumer, this.callback, this.standbyListener);
        storeChangelogReader.register(this.tp, this.stateManager);
        if (this.type == Task.TaskType.STANDBY) {
            storeChangelogReader.transitToUpdateStandby();
        }
        storeChangelogReader.restore(Collections.singletonMap(taskId, Mockito.mock(Task.class)));
        Assert.assertEquals(StoreChangelogReader.ChangelogState.RESTORING, storeChangelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals(0L, storeChangelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertTrue(storeChangelogReader.completedChangelogs().isEmpty());
        Assert.assertEquals(6L, this.consumer.position(this.tp));
        Assert.assertEquals(Collections.emptySet(), this.consumer.paused());
        if (this.type == Task.TaskType.ACTIVE) {
            Assert.assertEquals(10L, storeChangelogReader.changelogMetadata(this.tp).endOffset().longValue());
            Assert.assertEquals(this.tp, this.callback.restoreTopicPartition);
            Assert.assertEquals("store", this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_START));
            Assert.assertNull(this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_END));
            Assert.assertNull(this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_BATCH));
        } else {
            Assert.assertNull(storeChangelogReader.changelogMetadata(this.tp).endOffset());
        }
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 6L, "key".getBytes(), "value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 7L, "key".getBytes(), "value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 8L, (Object) null, "value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 9L, "key".getBytes(), "value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 11L, "key".getBytes(), "value".getBytes()));
        storeChangelogReader.restore(Collections.singletonMap(taskId, Mockito.mock(Task.class)));
        Assert.assertEquals(12L, this.consumer.position(this.tp));
        if (this.type != Task.TaskType.ACTIVE) {
            Assert.assertEquals(StoreChangelogReader.ChangelogState.RESTORING, storeChangelogReader.changelogMetadata(this.tp).state());
            Assert.assertEquals(4L, storeChangelogReader.changelogMetadata(this.tp).totalRestored());
            Assert.assertEquals(0L, storeChangelogReader.changelogMetadata(this.tp).bufferedRecords().size());
            Assert.assertEquals(Collections.emptySet(), storeChangelogReader.completedChangelogs());
            Assert.assertEquals(Collections.emptySet(), this.consumer.paused());
            return;
        }
        Assert.assertEquals(StoreChangelogReader.ChangelogState.COMPLETED, storeChangelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals(3L, storeChangelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals(1L, storeChangelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assert.assertEquals(Collections.singleton(this.tp), storeChangelogReader.completedChangelogs());
        Assert.assertEquals(Collections.singleton(this.tp), this.consumer.paused());
        Assert.assertEquals("store", this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_BATCH));
        Assert.assertEquals("store", this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_END));
    }

    @Test
    public void shouldRestoreFromBeginningAndCheckCompletion() {
        setupStateManagerMock();
        setupStoreMetadata();
        setupStore();
        TaskId taskId = new TaskId(0, 0);
        if (this.type == Task.TaskType.STANDBY && this.logContext.logger(StoreChangelogReader.class).isDebugEnabled()) {
            Mockito.when(this.storeMetadata.offset()).thenReturn((Object) null).thenReturn((Object) null).thenReturn(9L);
            Mockito.when(this.storeMetadata.endOffset()).thenReturn(10L);
        } else {
            Mockito.when(this.storeMetadata.offset()).thenReturn((Object) null).thenReturn(9L);
        }
        Mockito.when(this.stateManager.taskId()).thenReturn(taskId);
        this.consumer.updateBeginningOffsets(Collections.singletonMap(this.tp, 5L));
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 11L));
        StoreChangelogReader storeChangelogReader = new StoreChangelogReader(this.time, this.config, this.logContext, this.adminClient, this.consumer, this.callback, this.standbyListener);
        storeChangelogReader.register(this.tp, this.stateManager);
        if (this.type == Task.TaskType.STANDBY) {
            storeChangelogReader.transitToUpdateStandby();
        }
        storeChangelogReader.restore(Collections.singletonMap(taskId, Mockito.mock(Task.class)));
        Assert.assertEquals(StoreChangelogReader.ChangelogState.RESTORING, storeChangelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals(0L, storeChangelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals(5L, this.consumer.position(this.tp));
        Assert.assertEquals(Collections.emptySet(), this.consumer.paused());
        if (this.type == Task.TaskType.ACTIVE) {
            Assert.assertEquals(11L, storeChangelogReader.changelogMetadata(this.tp).endOffset().longValue());
            Assert.assertEquals(this.tp, this.callback.restoreTopicPartition);
            Assert.assertEquals("store", this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_START));
            Assert.assertNull(this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_END));
            Assert.assertNull(this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_BATCH));
        } else {
            Assert.assertNull(storeChangelogReader.changelogMetadata(this.tp).endOffset());
        }
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 6L, "key".getBytes(), "value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 7L, "key".getBytes(), "value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 8L, (Object) null, "value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 9L, "key".getBytes(), "value".getBytes()));
        storeChangelogReader.restore(Collections.singletonMap(taskId, Mockito.mock(Task.class)));
        Assert.assertEquals(StoreChangelogReader.ChangelogState.RESTORING, storeChangelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals(3L, storeChangelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals(0L, storeChangelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assert.assertEquals(0L, storeChangelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
        this.consumer.seek(this.tp, 11L);
        storeChangelogReader.restore(Collections.singletonMap(taskId, Mockito.mock(Task.class)));
        Assert.assertEquals(11L, this.consumer.position(this.tp));
        Assert.assertEquals(3L, storeChangelogReader.changelogMetadata(this.tp).totalRestored());
        if (this.type != Task.TaskType.ACTIVE) {
            Assert.assertEquals(StoreChangelogReader.ChangelogState.RESTORING, storeChangelogReader.changelogMetadata(this.tp).state());
            Assert.assertEquals(Collections.emptySet(), storeChangelogReader.completedChangelogs());
            Assert.assertEquals(Collections.emptySet(), this.consumer.paused());
        } else {
            Assert.assertEquals(StoreChangelogReader.ChangelogState.COMPLETED, storeChangelogReader.changelogMetadata(this.tp).state());
            Assert.assertEquals(3L, storeChangelogReader.changelogMetadata(this.tp).totalRestored());
            Assert.assertEquals(Collections.singleton(this.tp), storeChangelogReader.completedChangelogs());
            Assert.assertEquals(Collections.singleton(this.tp), this.consumer.paused());
            Assert.assertEquals("store", this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_BATCH));
            Assert.assertEquals("store", this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_END));
        }
    }

    @Test
    public void shouldCheckCompletionIfPositionLargerThanEndOffset() {
        setupActiveStateManager();
        setupStoreMetadata();
        setupStore();
        Map map = (Map) Mockito.mock(Map.class);
        Mockito.when(map.get(null)).thenReturn(Mockito.mock(Task.class));
        Mockito.when(Boolean.valueOf(map.containsKey(null))).thenReturn(true);
        Mockito.when(this.storeMetadata.offset()).thenReturn(5L);
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 0L));
        StoreChangelogReader storeChangelogReader = new StoreChangelogReader(this.time, this.config, this.logContext, this.adminClient, this.consumer, this.callback, this.standbyListener);
        storeChangelogReader.register(this.tp, this.activeStateManager);
        storeChangelogReader.restore(map);
        Assert.assertEquals(StoreChangelogReader.ChangelogState.COMPLETED, storeChangelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals(0L, storeChangelogReader.changelogMetadata(this.tp).endOffset().longValue());
        Assert.assertEquals(0L, storeChangelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals(Collections.singleton(this.tp), storeChangelogReader.completedChangelogs());
        Assert.assertEquals(6L, this.consumer.position(this.tp));
        Assert.assertEquals(Collections.singleton(this.tp), this.consumer.paused());
        Assert.assertEquals(this.tp, this.callback.restoreTopicPartition);
        Assert.assertEquals("store", this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_START));
        Assert.assertEquals("store", this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_END));
        Assert.assertNull(this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_BATCH));
    }

    @Test
    public void shouldRequestPositionAndHandleTimeoutException() {
        setupActiveStateManager();
        setupStoreMetadata();
        setupStore();
        TaskId taskId = new TaskId(0, 0);
        Task task = (Task) Mockito.mock(Task.class);
        Mockito.when(this.storeMetadata.offset()).thenReturn(10L);
        Mockito.when(this.activeStateManager.changelogOffsets()).thenReturn(Collections.singletonMap(this.tp, 10L));
        Mockito.when(this.activeStateManager.taskId()).thenReturn(taskId);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) { // from class: org.apache.kafka.streams.processor.internals.StoreChangelogReaderTest.3
            public long position(TopicPartition topicPartition) {
                if (atomicBoolean.get()) {
                    return 10L;
                }
                throw new TimeoutException("KABOOM!");
            }
        };
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 10L));
        StoreChangelogReader storeChangelogReader = new StoreChangelogReader(this.time, this.config, this.logContext, this.adminClient, mockConsumer, this.callback, this.standbyListener);
        storeChangelogReader.register(this.tp, this.activeStateManager);
        storeChangelogReader.restore(Collections.singletonMap(taskId, task));
        Assert.assertEquals(StoreChangelogReader.ChangelogState.RESTORING, storeChangelogReader.changelogMetadata(this.tp).state());
        Assert.assertTrue(storeChangelogReader.completedChangelogs().isEmpty());
        Assert.assertEquals(10L, storeChangelogReader.changelogMetadata(this.tp).endOffset().longValue());
        ((Task) Mockito.verify(task)).clearTaskTimeout();
        ((Task) Mockito.verify(task)).maybeInitTaskTimeoutOrThrow(ArgumentMatchers.anyLong(), (Exception) ArgumentMatchers.any());
        ((Task) Mockito.verify(task)).recordRestoration((Time) ArgumentMatchers.any(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyBoolean());
        atomicBoolean.set(true);
        Mockito.reset(new Task[]{task});
        storeChangelogReader.restore(Collections.singletonMap(taskId, task));
        Assert.assertEquals(StoreChangelogReader.ChangelogState.COMPLETED, storeChangelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals(10L, storeChangelogReader.changelogMetadata(this.tp).endOffset().longValue());
        Assert.assertEquals(Collections.singleton(this.tp), storeChangelogReader.completedChangelogs());
        Assert.assertEquals(10L, mockConsumer.position(this.tp));
        ((Task) Mockito.verify(task)).clearTaskTimeout();
    }

    @Test
    public void shouldThrowIfPositionFail() {
        setupActiveStateManager();
        setupStoreMetadata();
        setupStore();
        TaskId taskId = new TaskId(0, 0);
        Mockito.when(this.activeStateManager.taskId()).thenReturn(taskId);
        Mockito.when(this.storeMetadata.offset()).thenReturn(10L);
        MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) { // from class: org.apache.kafka.streams.processor.internals.StoreChangelogReaderTest.4
            public long position(TopicPartition topicPartition) {
                throw StoreChangelogReaderTest.this.kaboom;
            }
        };
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 10L));
        StoreChangelogReader storeChangelogReader = new StoreChangelogReader(this.time, this.config, this.logContext, this.adminClient, mockConsumer, this.callback, this.standbyListener);
        storeChangelogReader.register(this.tp, this.activeStateManager);
        Assert.assertEquals(this.kaboom, Assert.assertThrows(StreamsException.class, () -> {
            storeChangelogReader.restore(Collections.singletonMap(taskId, Mockito.mock(Task.class)));
        }).getCause());
    }

    @Test
    public void shouldRequestEndOffsetsAndHandleTimeoutException() {
        setupActiveStateManager();
        setupStoreMetadata();
        setupStore();
        TaskId taskId = new TaskId(0, 0);
        Task task = (Task) Mockito.mock(Task.class);
        Mockito.when(this.storeMetadata.offset()).thenReturn(5L);
        Mockito.when(this.activeStateManager.changelogOffsets()).thenReturn(Collections.singletonMap(this.tp, 5L));
        Mockito.when(this.activeStateManager.taskId()).thenReturn(taskId);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        MockAdminClient mockAdminClient = new MockAdminClient() { // from class: org.apache.kafka.streams.processor.internals.StoreChangelogReaderTest.5
            public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> map, ListOffsetsOptions listOffsetsOptions) {
                if (atomicBoolean.get()) {
                    return super.listOffsets(map, listOffsetsOptions);
                }
                atomicBoolean.set(true);
                throw new TimeoutException("KABOOM!");
            }
        };
        mockAdminClient.updateEndOffsets(Collections.singletonMap(this.tp, 10L));
        MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) { // from class: org.apache.kafka.streams.processor.internals.StoreChangelogReaderTest.6
            public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> set) {
                throw new AssertionError("Should not trigger this function");
            }
        };
        StoreChangelogReader storeChangelogReader = new StoreChangelogReader(this.time, this.config, this.logContext, mockAdminClient, mockConsumer, this.callback, this.standbyListener);
        storeChangelogReader.register(this.tp, this.activeStateManager);
        storeChangelogReader.restore(Collections.singletonMap(taskId, task));
        Assert.assertEquals(StoreChangelogReader.ChangelogState.REGISTERED, storeChangelogReader.changelogMetadata(this.tp).state());
        Assert.assertNull(storeChangelogReader.changelogMetadata(this.tp).endOffset());
        Assert.assertTrue(atomicBoolean.get());
        ((Task) Mockito.verify(task)).maybeInitTaskTimeoutOrThrow(ArgumentMatchers.anyLong(), (Exception) ArgumentMatchers.any());
        Mockito.reset(new Task[]{task});
        storeChangelogReader.restore(Collections.singletonMap(taskId, task));
        Assert.assertEquals(StoreChangelogReader.ChangelogState.RESTORING, storeChangelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals(10L, storeChangelogReader.changelogMetadata(this.tp).endOffset().longValue());
        Assert.assertEquals(6L, mockConsumer.position(this.tp));
        ((Task) Mockito.verify(task)).clearTaskTimeout();
        ((Task) Mockito.verify(task)).recordRestoration((Time) ArgumentMatchers.any(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyBoolean());
    }

    @Test
    public void shouldThrowIfEndOffsetsFail() {
        setupActiveStateManager();
        Mockito.when(this.storeMetadata.changelogPartition()).thenReturn(this.tp);
        TaskId taskId = new TaskId(0, 0);
        Mockito.when(this.activeStateManager.taskId()).thenReturn(taskId);
        MockAdminClient mockAdminClient = new MockAdminClient() { // from class: org.apache.kafka.streams.processor.internals.StoreChangelogReaderTest.7
            public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> map, ListOffsetsOptions listOffsetsOptions) {
                throw StoreChangelogReaderTest.this.kaboom;
            }
        };
        mockAdminClient.updateEndOffsets(Collections.singletonMap(this.tp, 0L));
        StoreChangelogReader storeChangelogReader = new StoreChangelogReader(this.time, this.config, this.logContext, mockAdminClient, this.consumer, this.callback, this.standbyListener);
        storeChangelogReader.register(this.tp, this.activeStateManager);
        Assert.assertEquals(this.kaboom, Assert.assertThrows(StreamsException.class, () -> {
            storeChangelogReader.restore(Collections.singletonMap(taskId, Mockito.mock(Task.class)));
        }).getCause());
    }

    @Test
    public void shouldRequestCommittedOffsetsAndHandleTimeoutException() {
        setupStateManagerMock();
        setupStoreMetadata();
        setupStore();
        TaskId taskId = new TaskId(0, 0);
        Task task = (Task) Mockito.mock(Task.class);
        if (this.type == Task.TaskType.ACTIVE) {
            task.clearTaskTimeout();
        }
        Mockito.when(Boolean.valueOf(this.stateManager.changelogAsSource(this.tp))).thenReturn(true);
        Mockito.when(this.storeMetadata.offset()).thenReturn(5L);
        Mockito.when(this.stateManager.taskId()).thenReturn(taskId);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        MockAdminClient mockAdminClient = new MockAdminClient() { // from class: org.apache.kafka.streams.processor.internals.StoreChangelogReaderTest.8
            public synchronized ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> map, ListConsumerGroupOffsetsOptions listConsumerGroupOffsetsOptions) {
                if (atomicBoolean.get()) {
                    return super.listConsumerGroupOffsets(map, listConsumerGroupOffsetsOptions);
                }
                atomicBoolean.set(true);
                return AdminClientTestUtils.listConsumerGroupOffsetsResult(map.keySet().iterator().next(), new TimeoutException("KABOOM!"));
            }
        };
        mockAdminClient.updateEndOffsets(Collections.singletonMap(this.tp, 20L));
        mockAdminClient.updateConsumerGroupOffsets(Collections.singletonMap(this.tp, 10L));
        StoreChangelogReader storeChangelogReader = new StoreChangelogReader(this.time, this.config, this.logContext, mockAdminClient, this.consumer, this.callback, this.standbyListener);
        storeChangelogReader.register(this.tp, this.stateManager);
        storeChangelogReader.restore(Collections.singletonMap(taskId, task));
        Assert.assertEquals(this.type == Task.TaskType.ACTIVE ? StoreChangelogReader.ChangelogState.REGISTERED : StoreChangelogReader.ChangelogState.RESTORING, storeChangelogReader.changelogMetadata(this.tp).state());
        if (this.type == Task.TaskType.ACTIVE) {
            Assert.assertNull(storeChangelogReader.changelogMetadata(this.tp).endOffset());
        } else {
            Assert.assertEquals(0L, storeChangelogReader.changelogMetadata(this.tp).endOffset().longValue());
        }
        Assert.assertTrue(atomicBoolean.get());
        ((Task) Mockito.verify(task)).maybeInitTaskTimeoutOrThrow(ArgumentMatchers.anyLong(), (Exception) ArgumentMatchers.any());
        Mockito.reset(new Task[]{task});
        storeChangelogReader.restore(Collections.singletonMap(taskId, task));
        Assert.assertEquals(StoreChangelogReader.ChangelogState.RESTORING, storeChangelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals(this.type == Task.TaskType.ACTIVE ? 10L : 0L, storeChangelogReader.changelogMetadata(this.tp).endOffset().longValue());
        Assert.assertEquals(6L, this.consumer.position(this.tp));
        if (this.type == Task.TaskType.ACTIVE) {
            ((Task) Mockito.verify(task, Mockito.times(2))).clearTaskTimeout();
            ((Task) Mockito.verify(task)).recordRestoration((Time) ArgumentMatchers.any(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyBoolean());
        }
    }

    @Test
    public void shouldThrowIfCommittedOffsetsFail() {
        setupStateManagerMock();
        Mockito.when(this.storeMetadata.changelogPartition()).thenReturn(this.tp);
        TaskId taskId = new TaskId(0, 0);
        Mockito.when(this.stateManager.taskId()).thenReturn(taskId);
        Mockito.when(Boolean.valueOf(this.stateManager.changelogAsSource(this.tp))).thenReturn(true);
        MockAdminClient mockAdminClient = new MockAdminClient() { // from class: org.apache.kafka.streams.processor.internals.StoreChangelogReaderTest.9
            public synchronized ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> map, ListConsumerGroupOffsetsOptions listConsumerGroupOffsetsOptions) {
                throw StoreChangelogReaderTest.this.kaboom;
            }
        };
        mockAdminClient.updateEndOffsets(Collections.singletonMap(this.tp, 10L));
        StoreChangelogReader storeChangelogReader = new StoreChangelogReader(this.time, this.config, this.logContext, mockAdminClient, this.consumer, this.callback, this.standbyListener);
        storeChangelogReader.register(this.tp, this.stateManager);
        Assert.assertEquals(this.kaboom, Assert.assertThrows(StreamsException.class, () -> {
            storeChangelogReader.restore(Collections.singletonMap(taskId, Mockito.mock(Task.class)));
        }).getCause());
    }

    @Test
    public void shouldThrowIfUnsubscribeFail() {
        StoreChangelogReader storeChangelogReader = new StoreChangelogReader(this.time, this.config, this.logContext, this.adminClient, new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) { // from class: org.apache.kafka.streams.processor.internals.StoreChangelogReaderTest.10
            public void unsubscribe() {
                throw StoreChangelogReaderTest.this.kaboom;
            }
        }, this.callback, this.standbyListener);
        storeChangelogReader.getClass();
        Assert.assertEquals(this.kaboom, Assert.assertThrows(StreamsException.class, storeChangelogReader::clear).getCause());
    }

    @Test
    public void shouldOnlyRestoreStandbyChangelogInUpdateStandbyState() {
        setupStandbyStateManager();
        setupStoreMetadata();
        setupStore();
        Map map = (Map) Mockito.mock(Map.class);
        Mockito.when(map.get(null)).thenReturn(Mockito.mock(Task.class));
        Mockito.when(Boolean.valueOf(map.containsKey(null))).thenReturn(true);
        Mockito.when(this.storeMetadata.offset()).thenReturn(3L);
        Mockito.when(this.storeMetadata.endOffset()).thenReturn(20L);
        this.consumer.updateBeginningOffsets(Collections.singletonMap(this.tp, 0L));
        this.changelogReader.register(this.tp, this.standbyStateManager);
        this.changelogReader.restore(map);
        Assert.assertNull(this.callback.restoreTopicPartition);
        Assert.assertNull(this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_START));
        Assert.assertEquals(StoreChangelogReader.ChangelogState.RESTORING, this.changelogReader.changelogMetadata(this.tp).state());
        Assert.assertNull(this.changelogReader.changelogMetadata(this.tp).endOffset());
        Assert.assertEquals(0L, this.changelogReader.changelogMetadata(this.tp).totalRestored());
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 6L, "key".getBytes(), "value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 7L, "key".getBytes(), "value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 8L, (Object) null, "value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 9L, "key".getBytes(), "value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 10L, "key".getBytes(), "value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 11L, "key".getBytes(), "value".getBytes()));
        this.changelogReader.restore(map);
        Assert.assertEquals(StoreChangelogReader.ChangelogState.RESTORING, this.changelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals(0L, this.changelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertTrue(this.changelogReader.changelogMetadata(this.tp).bufferedRecords().isEmpty());
        Assert.assertEquals(Collections.singleton(this.tp), this.consumer.paused());
        this.changelogReader.transitToUpdateStandby();
        this.changelogReader.restore(map);
        Assert.assertEquals(StoreChangelogReader.ChangelogState.RESTORING, this.changelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals(5L, this.changelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertTrue(this.changelogReader.changelogMetadata(this.tp).bufferedRecords().isEmpty());
    }

    @Test
    public void shouldNotUpdateLimitForNonSourceStandbyChangelog() {
        setupStandbyStateManager();
        setupStoreMetadata();
        setupStore();
        Map map = (Map) Mockito.mock(Map.class);
        Mockito.when(map.get(null)).thenReturn(Mockito.mock(Task.class));
        Mockito.when(Boolean.valueOf(map.containsKey(null))).thenReturn(true);
        Mockito.when(this.storeMetadata.offset()).thenReturn(3L);
        Mockito.when(this.storeMetadata.endOffset()).thenReturn(20L);
        Mockito.when(Boolean.valueOf(this.standbyStateManager.changelogAsSource(this.tp))).thenReturn(false);
        MockAdminClient mockAdminClient = new MockAdminClient() { // from class: org.apache.kafka.streams.processor.internals.StoreChangelogReaderTest.11
            public synchronized ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> map2, ListConsumerGroupOffsetsOptions listConsumerGroupOffsetsOptions) {
                throw new AssertionError("Should not try to fetch committed offsets");
            }
        };
        Properties properties = new Properties();
        properties.put("commit.interval.ms", 100L);
        StoreChangelogReader storeChangelogReader = new StoreChangelogReader(this.time, new StreamsConfig(StreamsTestUtils.getStreamsConfig("test-reader", properties)), this.logContext, mockAdminClient, this.consumer, this.callback, this.standbyListener);
        storeChangelogReader.transitToUpdateStandby();
        this.consumer.updateBeginningOffsets(Collections.singletonMap(this.tp, 0L));
        storeChangelogReader.register(this.tp, this.standbyStateManager);
        Assert.assertNull(storeChangelogReader.changelogMetadata(this.tp).endOffset());
        Assert.assertEquals(0L, storeChangelogReader.changelogMetadata(this.tp).totalRestored());
        storeChangelogReader.restore(map);
        Assert.assertNull(this.callback.restoreTopicPartition);
        Assert.assertNull(this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_START));
        Assert.assertEquals(StoreChangelogReader.ChangelogState.RESTORING, storeChangelogReader.changelogMetadata(this.tp).state());
        Assert.assertNull(storeChangelogReader.changelogMetadata(this.tp).endOffset());
        Assert.assertEquals(0L, storeChangelogReader.changelogMetadata(this.tp).totalRestored());
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 5L, "key".getBytes(), "value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 6L, "key".getBytes(), "value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 7L, "key".getBytes(), "value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 8L, (Object) null, "value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 9L, "key".getBytes(), "value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 10L, "key".getBytes(), "value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 11L, "key".getBytes(), "value".getBytes()));
        storeChangelogReader.restore(map);
        Assert.assertEquals(StoreChangelogReader.ChangelogState.RESTORING, storeChangelogReader.changelogMetadata(this.tp).state());
        Assert.assertNull(storeChangelogReader.changelogMetadata(this.tp).endOffset());
        Assert.assertEquals(6L, storeChangelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals(0L, storeChangelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assert.assertEquals(0L, storeChangelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
        Assert.assertNull(this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_END));
        Assert.assertNull(this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_BATCH));
    }

    @Test
    public void shouldRestoreToLimitInStandbyState() {
        setupStandbyStateManager();
        setupStoreMetadata();
        setupStore();
        Map map = (Map) Mockito.mock(Map.class);
        Mockito.when(map.get(null)).thenReturn(Mockito.mock(Task.class));
        Mockito.when(Boolean.valueOf(map.containsKey(null))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.standbyStateManager.changelogAsSource(this.tp))).thenReturn(true);
        Mockito.when(this.storeMetadata.offset()).thenReturn(3L);
        Mockito.when(this.storeMetadata.endOffset()).thenReturn(20L);
        long milliseconds = this.time.milliseconds();
        Properties properties = new Properties();
        properties.put("commit.interval.ms", 100L);
        StoreChangelogReader storeChangelogReader = new StoreChangelogReader(this.time, new StreamsConfig(StreamsTestUtils.getStreamsConfig("test-reader", properties)), this.logContext, this.adminClient, this.consumer, this.callback, this.standbyListener);
        storeChangelogReader.transitToUpdateStandby();
        this.consumer.updateBeginningOffsets(Collections.singletonMap(this.tp, 0L));
        this.adminClient.updateConsumerGroupOffsets(Collections.singletonMap(this.tp, 7L));
        storeChangelogReader.register(this.tp, this.standbyStateManager);
        Assert.assertEquals(0L, storeChangelogReader.changelogMetadata(this.tp).endOffset().longValue());
        Assert.assertEquals(0L, storeChangelogReader.changelogMetadata(this.tp).totalRestored());
        storeChangelogReader.restore(map);
        Assert.assertNull(this.callback.restoreTopicPartition);
        Assert.assertNull(this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_START));
        Assert.assertEquals(StoreChangelogReader.ChangelogState.RESTORING, storeChangelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals(7L, storeChangelogReader.changelogMetadata(this.tp).endOffset().longValue());
        Assert.assertEquals(0L, storeChangelogReader.changelogMetadata(this.tp).totalRestored());
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 5L, "key".getBytes(), "value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 6L, "key".getBytes(), "value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 7L, "key".getBytes(), "value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 8L, (Object) null, "value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 9L, "key".getBytes(), "value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 10L, "key".getBytes(), "value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 11L, "key".getBytes(), "value".getBytes()));
        storeChangelogReader.restore(map);
        Assert.assertEquals(StoreChangelogReader.ChangelogState.RESTORING, storeChangelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals(7L, storeChangelogReader.changelogMetadata(this.tp).endOffset().longValue());
        Assert.assertEquals(2L, storeChangelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals(4L, storeChangelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assert.assertEquals(0L, storeChangelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
        Assert.assertNull(this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_END));
        Assert.assertNull(this.callback.storeNameCalledStates.get(MockStateRestoreListener.RESTORE_BATCH));
        this.adminClient.updateConsumerGroupOffsets(Collections.singletonMap(this.tp, 10L));
        this.time.setCurrentTimeMs(milliseconds + 100);
        storeChangelogReader.restore(map);
        Assert.assertEquals(7L, storeChangelogReader.changelogMetadata(this.tp).endOffset().longValue());
        Assert.assertEquals(2L, storeChangelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals(4L, storeChangelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assert.assertEquals(0L, storeChangelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
        this.time.setCurrentTimeMs(milliseconds + 101);
        storeChangelogReader.restore(map);
        Assert.assertEquals(10L, storeChangelogReader.changelogMetadata(this.tp).endOffset().longValue());
        Assert.assertEquals(2L, storeChangelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals(4L, storeChangelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assert.assertEquals(2L, storeChangelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
        storeChangelogReader.restore(map);
        Assert.assertEquals(10L, storeChangelogReader.changelogMetadata(this.tp).endOffset().longValue());
        Assert.assertEquals(4L, storeChangelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals(2L, storeChangelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assert.assertEquals(0L, storeChangelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
        this.adminClient.updateConsumerGroupOffsets(Collections.singletonMap(this.tp, 15L));
        this.time.setCurrentTimeMs(milliseconds + 201);
        storeChangelogReader.restore(map);
        Assert.assertEquals(10L, storeChangelogReader.changelogMetadata(this.tp).endOffset().longValue());
        Assert.assertEquals(4L, storeChangelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals(2L, storeChangelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assert.assertEquals(0L, storeChangelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
        this.time.setCurrentTimeMs(milliseconds + 202);
        storeChangelogReader.enforceRestoreActive();
        storeChangelogReader.restore(map);
        Assert.assertEquals(10L, storeChangelogReader.changelogMetadata(this.tp).endOffset().longValue());
        Assert.assertEquals(4L, storeChangelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals(2L, storeChangelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assert.assertEquals(0L, storeChangelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
        storeChangelogReader.transitToUpdateStandby();
        storeChangelogReader.restore(map);
        Assert.assertEquals(15L, storeChangelogReader.changelogMetadata(this.tp).endOffset().longValue());
        Assert.assertEquals(4L, storeChangelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals(2L, storeChangelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assert.assertEquals(2L, storeChangelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
        storeChangelogReader.restore(map);
        Assert.assertEquals(15L, storeChangelogReader.changelogMetadata(this.tp).endOffset().longValue());
        Assert.assertEquals(6L, storeChangelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals(0L, storeChangelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assert.assertEquals(0L, storeChangelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 12L, "key".getBytes(), "value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 13L, "key".getBytes(), "value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 14L, "key".getBytes(), "value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 15L, "key".getBytes(), "value".getBytes()));
        storeChangelogReader.restore(map);
        Assert.assertEquals(15L, storeChangelogReader.changelogMetadata(this.tp).endOffset().longValue());
        Assert.assertEquals(9L, storeChangelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals(1L, storeChangelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assert.assertEquals(0L, storeChangelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
    }

    @Test
    public void shouldRestoreMultipleChangelogs() {
        setupActiveStateManager();
        setupStoreMetadata();
        setupStore();
        Map map = (Map) Mockito.mock(Map.class);
        Mockito.when(map.get(null)).thenReturn(Mockito.mock(Task.class));
        Mockito.when(Boolean.valueOf(map.containsKey(null))).thenReturn(true);
        Mockito.when(this.storeMetadataOne.changelogPartition()).thenReturn(this.tp1);
        Mockito.when(this.storeMetadataOne.store()).thenReturn(this.store);
        Mockito.when(this.storeMetadataTwo.changelogPartition()).thenReturn(this.tp2);
        Mockito.when(this.storeMetadataTwo.store()).thenReturn(this.store);
        Mockito.when(this.storeMetadata.offset()).thenReturn(0L);
        Mockito.when(this.storeMetadataOne.offset()).thenReturn(0L);
        Mockito.when(this.storeMetadataTwo.offset()).thenReturn(0L);
        Mockito.when(this.activeStateManager.storeMetadata(this.tp1)).thenReturn(this.storeMetadataOne);
        Mockito.when(this.activeStateManager.storeMetadata(this.tp2)).thenReturn(this.storeMetadataTwo);
        Mockito.when(this.activeStateManager.changelogOffsets()).thenReturn(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.tp, 5L), Utils.mkEntry(this.tp1, 5L), Utils.mkEntry(this.tp2, 5L)}));
        setupConsumer(10L, this.tp);
        setupConsumer(5L, this.tp1);
        setupConsumer(3L, this.tp2);
        this.changelogReader.register(this.tp, this.activeStateManager);
        this.changelogReader.register(this.tp1, this.activeStateManager);
        this.changelogReader.register(this.tp2, this.activeStateManager);
        this.changelogReader.restore(map);
        Assert.assertEquals(StoreChangelogReader.ChangelogState.RESTORING, this.changelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals(StoreChangelogReader.ChangelogState.RESTORING, this.changelogReader.changelogMetadata(this.tp1).state());
        Assert.assertEquals(StoreChangelogReader.ChangelogState.RESTORING, this.changelogReader.changelogMetadata(this.tp2).state());
        this.changelogReader.unregister(Collections.singletonList(this.tp));
        Assert.assertNull(this.changelogReader.changelogMetadata(this.tp));
        Assert.assertFalse(this.changelogReader.isEmpty());
        Assert.assertEquals(StoreChangelogReader.ChangelogState.RESTORING, this.changelogReader.changelogMetadata(this.tp1).state());
        Assert.assertEquals(StoreChangelogReader.ChangelogState.RESTORING, this.changelogReader.changelogMetadata(this.tp2).state());
        this.changelogReader.clear();
        Assert.assertTrue(this.changelogReader.isEmpty());
        Assert.assertNull(this.changelogReader.changelogMetadata(this.tp1));
        Assert.assertNull(this.changelogReader.changelogMetadata(this.tp2));
        Assert.assertEquals(this.changelogReader.state(), StoreChangelogReader.ChangelogReaderState.ACTIVE_RESTORING);
    }

    @Test
    public void shouldTransitState() {
        setupActiveStateManager();
        setupStoreMetadata();
        setupStore();
        Mockito.when(this.standbyStateManager.taskType()).thenReturn(Task.TaskType.STANDBY);
        TaskId taskId = new TaskId(0, 0);
        Mockito.when(this.storeMetadataOne.changelogPartition()).thenReturn(this.tp1);
        Mockito.when(this.storeMetadataOne.store()).thenReturn(this.store);
        Mockito.when(this.storeMetadataTwo.changelogPartition()).thenReturn(this.tp2);
        Mockito.when(this.storeMetadataTwo.store()).thenReturn(this.store);
        Mockito.when(this.storeMetadata.offset()).thenReturn(5L);
        Mockito.when(this.storeMetadataOne.offset()).thenReturn(5L);
        Mockito.when(this.storeMetadataTwo.offset()).thenReturn(5L);
        Mockito.when(this.standbyStateManager.storeMetadata(this.tp1)).thenReturn(this.storeMetadataOne);
        Mockito.when(this.standbyStateManager.storeMetadata(this.tp2)).thenReturn(this.storeMetadataTwo);
        Mockito.when(this.activeStateManager.changelogOffsets()).thenReturn(Collections.singletonMap(this.tp, 5L));
        Mockito.when(this.activeStateManager.taskId()).thenReturn(taskId);
        Mockito.when(this.standbyStateManager.taskId()).thenReturn(taskId);
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 10L));
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp1, 10L));
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp2, 10L));
        StoreChangelogReader storeChangelogReader = new StoreChangelogReader(this.time, this.config, this.logContext, this.adminClient, this.consumer, this.callback, this.standbyListener);
        Assert.assertEquals(StoreChangelogReader.ChangelogReaderState.ACTIVE_RESTORING, storeChangelogReader.state());
        storeChangelogReader.register(this.tp, this.activeStateManager);
        storeChangelogReader.register(this.tp1, this.standbyStateManager);
        storeChangelogReader.register(this.tp2, this.standbyStateManager);
        Assert.assertEquals(StoreChangelogReader.ChangelogState.REGISTERED, storeChangelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals(StoreChangelogReader.ChangelogState.REGISTERED, storeChangelogReader.changelogMetadata(this.tp1).state());
        Assert.assertEquals(StoreChangelogReader.ChangelogState.REGISTERED, storeChangelogReader.changelogMetadata(this.tp2).state());
        Assert.assertEquals(Collections.emptySet(), this.consumer.assignment());
        storeChangelogReader.restore(Collections.singletonMap(taskId, Mockito.mock(Task.class)));
        Assert.assertEquals(StoreChangelogReader.ChangelogState.RESTORING, storeChangelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals(StoreChangelogReader.ChangelogState.RESTORING, storeChangelogReader.changelogMetadata(this.tp1).state());
        Assert.assertEquals(StoreChangelogReader.ChangelogState.RESTORING, storeChangelogReader.changelogMetadata(this.tp2).state());
        Assert.assertEquals(Utils.mkSet(new TopicPartition[]{this.tp, this.tp1, this.tp2}), this.consumer.assignment());
        Assert.assertEquals(Utils.mkSet(new TopicPartition[]{this.tp1, this.tp2}), this.consumer.paused());
        Assert.assertEquals(StoreChangelogReader.ChangelogReaderState.ACTIVE_RESTORING, storeChangelogReader.state());
        storeChangelogReader.enforceRestoreActive();
        Assert.assertEquals(StoreChangelogReader.ChangelogReaderState.ACTIVE_RESTORING, storeChangelogReader.state());
        storeChangelogReader.transitToUpdateStandby();
        Assert.assertEquals(StoreChangelogReader.ChangelogReaderState.STANDBY_UPDATING, storeChangelogReader.state());
        Assert.assertEquals(StoreChangelogReader.ChangelogState.RESTORING, storeChangelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals(StoreChangelogReader.ChangelogState.RESTORING, storeChangelogReader.changelogMetadata(this.tp1).state());
        Assert.assertEquals(StoreChangelogReader.ChangelogState.RESTORING, storeChangelogReader.changelogMetadata(this.tp2).state());
        Assert.assertEquals(Utils.mkSet(new TopicPartition[]{this.tp, this.tp1, this.tp2}), this.consumer.assignment());
        Assert.assertEquals(Collections.emptySet(), this.consumer.paused());
        storeChangelogReader.getClass();
        Assert.assertThrows(IllegalStateException.class, storeChangelogReader::transitToUpdateStandby);
        storeChangelogReader.unregister(Collections.singletonList(this.tp));
        storeChangelogReader.register(this.tp, this.activeStateManager);
        Assert.assertThrows(IllegalStateException.class, () -> {
            storeChangelogReader.restore(Collections.singletonMap(taskId, Mockito.mock(Task.class)));
        });
        Assert.assertEquals(StoreChangelogReader.ChangelogState.RESTORING, storeChangelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals(StoreChangelogReader.ChangelogState.RESTORING, storeChangelogReader.changelogMetadata(this.tp1).state());
        Assert.assertEquals(StoreChangelogReader.ChangelogState.RESTORING, storeChangelogReader.changelogMetadata(this.tp2).state());
        Assert.assertEquals(Utils.mkSet(new TopicPartition[]{this.tp, this.tp1, this.tp2}), this.consumer.assignment());
        Assert.assertEquals(Collections.emptySet(), this.consumer.paused());
        Assert.assertEquals(StoreChangelogReader.ChangelogReaderState.STANDBY_UPDATING, storeChangelogReader.state());
        storeChangelogReader.enforceRestoreActive();
        Assert.assertEquals(StoreChangelogReader.ChangelogReaderState.ACTIVE_RESTORING, storeChangelogReader.state());
        Assert.assertEquals(Utils.mkSet(new TopicPartition[]{this.tp, this.tp1, this.tp2}), this.consumer.assignment());
        Assert.assertEquals(Utils.mkSet(new TopicPartition[]{this.tp1, this.tp2}), this.consumer.paused());
    }

    @Test
    public void shouldTransitStateBackToActiveRestoringAfterRemovingLastTask() {
        Mockito.when(this.standbyStateManager.taskType()).thenReturn(Task.TaskType.STANDBY);
        StoreChangelogReader storeChangelogReader = new StoreChangelogReader(this.time, this.config, this.logContext, this.adminClient, this.consumer, this.callback, this.standbyListener);
        Mockito.when(this.standbyStateManager.storeMetadata(this.tp1)).thenReturn(this.storeMetadataOne);
        storeChangelogReader.register(this.tp1, this.standbyStateManager);
        storeChangelogReader.transitToUpdateStandby();
        storeChangelogReader.unregister(Utils.mkSet(new TopicPartition[]{this.tp1}));
        Assert.assertTrue(storeChangelogReader.isEmpty());
        Assert.assertEquals(StoreChangelogReader.ChangelogReaderState.ACTIVE_RESTORING, storeChangelogReader.state());
    }

    @Test
    public void shouldThrowIfRestoreCallbackThrows() {
        setupActiveStateManager();
        setupStoreMetadata();
        setupStore();
        TaskId taskId = new TaskId(0, 0);
        Mockito.when(this.storeMetadata.offset()).thenReturn(5L);
        Mockito.when(this.activeStateManager.taskId()).thenReturn(taskId);
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 10L));
        StoreChangelogReader storeChangelogReader = new StoreChangelogReader(this.time, this.config, this.logContext, this.adminClient, this.consumer, this.exceptionCallback, this.standbyListener);
        storeChangelogReader.register(this.tp, this.activeStateManager);
        Assert.assertEquals(this.kaboom, Assert.assertThrows(StreamsException.class, () -> {
            storeChangelogReader.restore(Collections.singletonMap(taskId, Mockito.mock(Task.class)));
        }).getCause());
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 6L, "key".getBytes(), "value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 0, 7L, "key".getBytes(), "value".getBytes()));
        Assert.assertEquals(this.kaboom, Assert.assertThrows(StreamsException.class, () -> {
            storeChangelogReader.restore(Collections.singletonMap(taskId, Mockito.mock(Task.class)));
        }).getCause());
        this.consumer.seek(this.tp, 10L);
        Assert.assertEquals(this.kaboom, Assert.assertThrows(StreamsException.class, () -> {
            storeChangelogReader.restore(Collections.singletonMap(taskId, Mockito.mock(Task.class)));
        }).getCause());
    }

    @Test
    public void shouldNotThrowOnUnknownRevokedPartition() {
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(StoreChangelogReader.class);
        Throwable th = null;
        try {
            createAndRegister.setClassLoggerToDebug(StoreChangelogReader.class);
            this.changelogReader.unregister(Collections.singletonList(new TopicPartition("unknown", 0)));
            MatcherAssert.assertThat(createAndRegister.getMessages(), Matchers.hasItem("test-reader Changelog partition unknown-0 could not be found, it could be already cleaned up during the handling of task corruption and never restore again"));
            if (createAndRegister != null) {
                if (0 == 0) {
                    createAndRegister.close();
                    return;
                }
                try {
                    createAndRegister.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createAndRegister != null) {
                if (0 != 0) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th3;
        }
    }

    private void setupConsumer(long j, TopicPartition topicPartition) {
        assignPartition(j, topicPartition);
        addRecords(j, topicPartition);
        this.consumer.assign(Collections.emptyList());
    }

    private void addRecords(long j, TopicPartition topicPartition) {
        for (int i = 0; i < j; i++) {
            this.consumer.addRecord(new ConsumerRecord(topicPartition.topic(), topicPartition.partition(), i, new byte[0], new byte[0]));
        }
    }

    private void assignPartition(long j, TopicPartition topicPartition) {
        this.consumer.updatePartitions(topicPartition.topic(), Collections.singletonList(new PartitionInfo(topicPartition.topic(), topicPartition.partition(), (Node) null, (Node[]) null, (Node[]) null)));
        this.consumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L));
        this.consumer.updateEndOffsets(Collections.singletonMap(topicPartition, Long.valueOf(Math.max(0L, j) + 1)));
        this.adminClient.updateEndOffsets(Collections.singletonMap(topicPartition, Long.valueOf(Math.max(0L, j) + 1)));
        this.consumer.assign(Collections.singletonList(topicPartition));
    }
}
