package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.TimestampedBytesStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockRestoreCallback;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(EasyMockRunner.class)
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.class */
public class ProcessorStateManagerTest {
    private final String applicationId = "test-application";
    private final String persistentStoreName = "persistentStore";
    private final String persistentStoreTwoName = "persistentStore2";
    private final String nonPersistentStoreName = "nonPersistentStore";
    private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic("test-application", "persistentStore");
    private final String persistentStoreTwoTopicName = ProcessorStateManager.storeChangelogTopic("test-application", "persistentStore2");
    private final String nonPersistentStoreTopicName = ProcessorStateManager.storeChangelogTopic("test-application", "nonPersistentStore");
    private final MockKeyValueStore persistentStore = new MockKeyValueStore("persistentStore", true);
    private final MockKeyValueStore persistentStoreTwo = new MockKeyValueStore("persistentStore2", true);
    private final MockKeyValueStore nonPersistentStore = new MockKeyValueStore("nonPersistentStore", false);
    private final TopicPartition persistentStorePartition = new TopicPartition(this.persistentStoreTopicName, 1);
    private final TopicPartition persistentStoreTwoPartition = new TopicPartition(this.persistentStoreTwoTopicName, 1);
    private final TopicPartition nonPersistentStorePartition = new TopicPartition(this.nonPersistentStoreTopicName, 1);
    private final TopicPartition irrelevantPartition = new TopicPartition("other-topic", 1);
    private final TaskId taskId = new TaskId(0, 1);
    private final Integer key = 1;
    private final String value = "the-value";
    private final byte[] keyBytes = {0, 0, 0, 1};
    private final byte[] valueBytes = "the-value".getBytes(StandardCharsets.UTF_8);
    private final ConsumerRecord<byte[], byte[]> consumerRecord = new ConsumerRecord<>(this.persistentStoreTopicName, 1, 100, this.keyBytes, this.valueBytes);
    private final MockChangelogReader changelogReader = new MockChangelogReader();
    private final LogContext logContext = new LogContext("process-state-manager-test ");
    private final StateRestoreCallback noopStateRestoreCallback = (bArr, bArr2) -> {
    };
    private File baseDir;
    private File checkpointFile;
    private OffsetCheckpoint checkpoint;
    private StateDirectory stateDirectory;

    @Mock(type = MockType.NICE)
    private StateStore store;

    @Mock(type = MockType.NICE)
    private ProcessorStateManager.StateStoreMetadata storeMetadata;

    @Mock(type = MockType.NICE)
    private InternalProcessorContext context;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest$ConverterStore.class */
    public static class ConverterStore extends MockKeyValueStore implements TimestampedBytesStore {
        ConverterStore(String str, boolean z) {
            super(str, z);
        }
    }

    @Before
    public void setup() {
        this.baseDir = TestUtils.tempDirectory();
        this.stateDirectory = new StateDirectory(new StreamsConfig(new Properties() { // from class: org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest.1
            {
                put("application.id", "test-application");
                put("bootstrap.servers", "dummy:1234");
                put("state.dir", ProcessorStateManagerTest.this.baseDir.getPath());
            }
        }), new MockTime(), true);
        this.checkpointFile = new File(this.stateDirectory.directoryForTask(this.taskId), ".checkpoint");
        this.checkpoint = new OffsetCheckpoint(this.checkpointFile);
        EasyMock.expect(this.storeMetadata.changelogPartition()).andReturn(this.persistentStorePartition).anyTimes();
        EasyMock.expect(this.storeMetadata.store()).andReturn(this.store).anyTimes();
        EasyMock.expect(this.store.name()).andReturn("persistentStore").anyTimes();
        EasyMock.replay(new Object[]{this.storeMetadata, this.store});
    }

    @After
    public void cleanup() throws IOException {
        Utils.delete(this.baseDir);
    }

    @Test
    public void shouldReturnDefaultChangelogTopicName() {
        MatcherAssert.assertThat(ProcessorStateManager.storeChangelogTopic("appId", "store"), Is.is("appId-store-changelog"));
    }

    @Test
    public void shouldReturnBaseDir() {
        Assert.assertEquals(this.stateDirectory.directoryForTask(this.taskId), getStateManager(Task.TaskType.ACTIVE).baseDir());
    }

    @Test
    public void shouldReportTaskType() {
        Assert.assertEquals(Task.TaskType.STANDBY, getStateManager(Task.TaskType.STANDBY).taskType());
        Assert.assertEquals(Task.TaskType.ACTIVE, getStateManager(Task.TaskType.ACTIVE).taskType());
    }

    @Test
    public void shouldReportChangelogAsSource() {
        ProcessorStateManager processorStateManager = new ProcessorStateManager(this.taskId, Task.TaskType.STANDBY, false, this.logContext, this.stateDirectory, this.changelogReader, Utils.mkMap(new Map.Entry[]{Utils.mkEntry("persistentStore", this.persistentStoreTopicName), Utils.mkEntry("persistentStore2", this.persistentStoreTwoTopicName), Utils.mkEntry("nonPersistentStore", this.nonPersistentStoreTopicName)}), Utils.mkSet(new TopicPartition[]{this.persistentStorePartition, this.nonPersistentStorePartition}));
        Assert.assertTrue(processorStateManager.changelogAsSource(this.persistentStorePartition));
        Assert.assertTrue(processorStateManager.changelogAsSource(this.nonPersistentStorePartition));
        Assert.assertFalse(processorStateManager.changelogAsSource(this.persistentStoreTwoPartition));
    }

    @Test
    public void shouldFindSingleStoreForChangelog() {
        ProcessorStateManager processorStateManager = new ProcessorStateManager(this.taskId, Task.TaskType.STANDBY, false, this.logContext, this.stateDirectory, this.changelogReader, Utils.mkMap(new Map.Entry[]{Utils.mkEntry("persistentStore", this.persistentStoreTopicName), Utils.mkEntry("persistentStore2", this.persistentStoreTopicName)}), Collections.emptySet());
        processorStateManager.registerStore(this.persistentStore, this.persistentStore.stateRestoreCallback);
        processorStateManager.registerStore(this.persistentStoreTwo, this.persistentStore.stateRestoreCallback);
        Assert.assertThrows(IllegalStateException.class, () -> {
            processorStateManager.updateChangelogOffsets(Collections.singletonMap(this.persistentStorePartition, 0L));
        });
    }

    @Test
    public void shouldRestoreStoreWithRestoreCallback() {
        MockRestoreCallback mockRestoreCallback = new MockRestoreCallback();
        KeyValue pair = KeyValue.pair(this.keyBytes, this.valueBytes);
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);
        try {
            stateManager.registerStore(this.persistentStore, mockRestoreCallback);
            ProcessorStateManager.StateStoreMetadata storeMetadata = stateManager.storeMetadata(this.persistentStorePartition);
            MatcherAssert.assertThat(storeMetadata, CoreMatchers.notNullValue());
            stateManager.restore(storeMetadata, Collections.singletonList(this.consumerRecord));
            MatcherAssert.assertThat(Integer.valueOf(mockRestoreCallback.restored.size()), Is.is(1));
            Assert.assertTrue(mockRestoreCallback.restored.contains(pair));
            Assert.assertEquals(Collections.singletonMap(this.persistentStorePartition, 101L), stateManager.changelogOffsets());
            stateManager.close();
        } catch (Throwable th) {
            stateManager.close();
            throw th;
        }
    }

    @Test
    public void shouldRestoreNonTimestampedStoreWithNoConverter() {
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);
        try {
            stateManager.registerStore(this.persistentStore, this.persistentStore.stateRestoreCallback);
            ProcessorStateManager.StateStoreMetadata storeMetadata = stateManager.storeMetadata(this.persistentStorePartition);
            MatcherAssert.assertThat(storeMetadata, CoreMatchers.notNullValue());
            stateManager.restore(storeMetadata, Collections.singletonList(this.consumerRecord));
            MatcherAssert.assertThat(Integer.valueOf(this.persistentStore.keys.size()), Is.is(1));
            Assert.assertTrue(this.persistentStore.keys.contains(this.key));
            Assert.assertEquals(9L, this.persistentStore.values.get(0).length);
        } finally {
            stateManager.close();
        }
    }

    @Test
    public void shouldRestoreTimestampedStoreWithConverter() {
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);
        MockKeyValueStore converterStore = getConverterStore();
        try {
            stateManager.registerStore(converterStore, converterStore.stateRestoreCallback);
            ProcessorStateManager.StateStoreMetadata storeMetadata = stateManager.storeMetadata(this.persistentStorePartition);
            MatcherAssert.assertThat(storeMetadata, CoreMatchers.notNullValue());
            stateManager.restore(storeMetadata, Collections.singletonList(this.consumerRecord));
            MatcherAssert.assertThat(Integer.valueOf(converterStore.keys.size()), Is.is(1));
            Assert.assertTrue(converterStore.keys.contains(this.key));
            Assert.assertEquals(17L, converterStore.values.get(0).length);
            stateManager.close();
        } catch (Throwable th) {
            stateManager.close();
            throw th;
        }
    }

    @Test
    public void shouldUnregisterChangelogsDuringClose() {
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);
        EasyMock.reset(new Object[]{this.storeMetadata});
        StateStore stateStore = (StateStore) EasyMock.createMock(StateStore.class);
        EasyMock.expect(this.storeMetadata.changelogPartition()).andStubReturn(this.persistentStorePartition);
        EasyMock.expect(this.storeMetadata.store()).andStubReturn(stateStore);
        EasyMock.expect(stateStore.name()).andStubReturn("persistentStore");
        this.context.uninitialize();
        stateStore.init(this.context, stateStore);
        EasyMock.replay(new Object[]{this.storeMetadata, this.context, stateStore});
        stateManager.registerStateStores(Collections.singletonList(stateStore), this.context);
        EasyMock.verify(new Object[]{this.context, stateStore});
        stateManager.registerStore(stateStore, this.noopStateRestoreCallback);
        Assert.assertTrue(this.changelogReader.isPartitionRegistered(this.persistentStorePartition));
        EasyMock.reset(new Object[]{stateStore});
        EasyMock.expect(stateStore.name()).andStubReturn("persistentStore");
        stateStore.close();
        EasyMock.replay(new Object[]{stateStore});
        stateManager.close();
        EasyMock.verify(new Object[]{stateStore});
        Assert.assertFalse(this.changelogReader.isPartitionRegistered(this.persistentStorePartition));
    }

    @Test
    public void shouldRecycleStoreAndReregisterChangelog() {
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);
        EasyMock.reset(new Object[]{this.storeMetadata});
        StateStore stateStore = (StateStore) EasyMock.createMock(StateStore.class);
        EasyMock.expect(this.storeMetadata.changelogPartition()).andStubReturn(this.persistentStorePartition);
        EasyMock.expect(this.storeMetadata.store()).andStubReturn(stateStore);
        EasyMock.expect(stateStore.name()).andStubReturn("persistentStore");
        this.context.uninitialize();
        stateStore.init(this.context, stateStore);
        EasyMock.replay(new Object[]{this.storeMetadata, this.context, stateStore});
        stateManager.registerStateStores(Collections.singletonList(stateStore), this.context);
        EasyMock.verify(new Object[]{this.context, stateStore});
        stateManager.registerStore(stateStore, this.noopStateRestoreCallback);
        Assert.assertTrue(this.changelogReader.isPartitionRegistered(this.persistentStorePartition));
        stateManager.recycle();
        Assert.assertFalse(this.changelogReader.isPartitionRegistered(this.persistentStorePartition));
        MatcherAssert.assertThat(stateManager.getStore("persistentStore"), CoreMatchers.equalTo(stateStore));
        EasyMock.reset(new Object[]{this.context, stateStore});
        this.context.uninitialize();
        EasyMock.expect(stateStore.name()).andStubReturn("persistentStore");
        EasyMock.replay(new Object[]{this.context, stateStore});
        stateManager.registerStateStores(Collections.singletonList(stateStore), this.context);
        EasyMock.verify(new Object[]{this.context, stateStore});
        Assert.assertTrue(this.changelogReader.isPartitionRegistered(this.persistentStorePartition));
    }

    @Test
    public void shouldRegisterPersistentStores() {
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);
        try {
            stateManager.registerStore(this.persistentStore, this.persistentStore.stateRestoreCallback);
            Assert.assertTrue(this.changelogReader.isPartitionRegistered(this.persistentStorePartition));
        } finally {
            stateManager.close();
        }
    }

    @Test
    public void shouldRegisterNonPersistentStore() {
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);
        try {
            stateManager.registerStore(this.nonPersistentStore, this.nonPersistentStore.stateRestoreCallback);
            Assert.assertTrue(this.changelogReader.isPartitionRegistered(this.nonPersistentStorePartition));
        } finally {
            stateManager.close();
        }
    }

    @Test
    public void shouldNotRegisterNonLoggedStore() {
        ProcessorStateManager processorStateManager = new ProcessorStateManager(this.taskId, Task.TaskType.STANDBY, false, this.logContext, this.stateDirectory, this.changelogReader, Collections.emptyMap(), Collections.emptySet());
        try {
            processorStateManager.registerStore(this.persistentStore, this.persistentStore.stateRestoreCallback);
            Assert.assertFalse(this.changelogReader.isPartitionRegistered(this.persistentStorePartition));
        } finally {
            processorStateManager.close();
        }
    }

    @Test
    public void shouldInitializeOffsetsFromCheckpointFile() throws IOException {
        this.checkpoint.write(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.persistentStorePartition, 10L), Utils.mkEntry(this.nonPersistentStorePartition, 10L), Utils.mkEntry(this.irrelevantPartition, 999L)}));
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);
        try {
            stateManager.registerStore(this.persistentStore, this.persistentStore.stateRestoreCallback);
            stateManager.registerStore(this.persistentStoreTwo, this.persistentStoreTwo.stateRestoreCallback);
            stateManager.registerStore(this.nonPersistentStore, this.nonPersistentStore.stateRestoreCallback);
            stateManager.initializeStoreOffsetsFromCheckpoint(true);
            Assert.assertTrue(this.checkpointFile.exists());
            Assert.assertEquals(Utils.mkSet(new TopicPartition[]{this.persistentStorePartition, this.persistentStoreTwoPartition, this.nonPersistentStorePartition}), stateManager.changelogPartitions());
            Assert.assertEquals(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.persistentStorePartition, 11L), Utils.mkEntry(this.persistentStoreTwoPartition, 0L), Utils.mkEntry(this.nonPersistentStorePartition, 0L)}), stateManager.changelogOffsets());
            Assert.assertNull(stateManager.storeMetadata(this.irrelevantPartition));
            Assert.assertNull(stateManager.storeMetadata(this.persistentStoreTwoPartition).offset());
            MatcherAssert.assertThat(stateManager.storeMetadata(this.persistentStorePartition).offset(), CoreMatchers.equalTo(10L));
            Assert.assertNull(stateManager.storeMetadata(this.nonPersistentStorePartition).offset());
            stateManager.close();
        } catch (Throwable th) {
            stateManager.close();
            throw th;
        }
    }

    @Test
    public void shouldInitializeOffsetsFromCheckpointFileAndDeleteIfEOSEnabled() throws IOException {
        this.checkpoint.write(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.persistentStorePartition, 10L), Utils.mkEntry(this.nonPersistentStorePartition, 10L), Utils.mkEntry(this.irrelevantPartition, 999L)}));
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE, true);
        try {
            stateManager.registerStore(this.persistentStore, this.persistentStore.stateRestoreCallback);
            stateManager.registerStore(this.persistentStoreTwo, this.persistentStoreTwo.stateRestoreCallback);
            stateManager.registerStore(this.nonPersistentStore, this.nonPersistentStore.stateRestoreCallback);
            stateManager.initializeStoreOffsetsFromCheckpoint(true);
            Assert.assertFalse(this.checkpointFile.exists());
            Assert.assertEquals(Utils.mkSet(new TopicPartition[]{this.persistentStorePartition, this.persistentStoreTwoPartition, this.nonPersistentStorePartition}), stateManager.changelogPartitions());
            Assert.assertEquals(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.persistentStorePartition, 11L), Utils.mkEntry(this.persistentStoreTwoPartition, 0L), Utils.mkEntry(this.nonPersistentStorePartition, 0L)}), stateManager.changelogOffsets());
            Assert.assertNull(stateManager.storeMetadata(this.irrelevantPartition));
            Assert.assertNull(stateManager.storeMetadata(this.persistentStoreTwoPartition).offset());
            MatcherAssert.assertThat(stateManager.storeMetadata(this.persistentStorePartition).offset(), CoreMatchers.equalTo(10L));
            Assert.assertNull(stateManager.storeMetadata(this.nonPersistentStorePartition).offset());
            stateManager.close();
        } catch (Throwable th) {
            stateManager.close();
            throw th;
        }
    }

    @Test
    public void shouldGetRegisteredStore() {
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);
        try {
            stateManager.registerStore(this.persistentStore, this.persistentStore.stateRestoreCallback);
            stateManager.registerStore(this.nonPersistentStore, this.nonPersistentStore.stateRestoreCallback);
            Assert.assertNull(stateManager.getStore("noSuchStore"));
            Assert.assertEquals(this.persistentStore, stateManager.getStore("persistentStore"));
            Assert.assertEquals(this.nonPersistentStore, stateManager.getStore("nonPersistentStore"));
        } finally {
            stateManager.close();
        }
    }

    @Test
    public void shouldGetChangelogPartitionForRegisteredStore() {
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);
        stateManager.registerStore(this.persistentStore, this.persistentStore.stateRestoreCallback);
        TopicPartition registeredChangelogPartitionFor = stateManager.registeredChangelogPartitionFor("persistentStore");
        MatcherAssert.assertThat(registeredChangelogPartitionFor.topic(), Is.is(this.persistentStoreTopicName));
        MatcherAssert.assertThat(Integer.valueOf(registeredChangelogPartitionFor.partition()), Is.is(Integer.valueOf(this.taskId.partition)));
    }

    @Test
    public void shouldThrowIfStateStoreIsNotRegistered() {
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);
        Assert.assertThrows("State store persistentStore for which the registered changelog partition should be retrieved has not been registered", IllegalStateException.class, () -> {
            stateManager.registeredChangelogPartitionFor("persistentStore");
        });
    }

    @Test
    public void shouldThrowIfStateStoreHasLoggingDisabled() {
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);
        stateManager.registerStore(new MockKeyValueStore("store-with-logging-disabled", true), (StateRestoreCallback) null);
        Assert.assertThrows("Registered state store store-with-logging-disabled does not have a registered changelog partition. This may happen if logging is disabled for the state store.", IllegalStateException.class, () -> {
            stateManager.registeredChangelogPartitionFor("store-with-logging-disabled");
        });
    }

    @Test
    public void shouldFlushCheckpointAndClose() throws IOException {
        this.checkpoint.write(Collections.emptyMap());
        HashMap hashMap = new HashMap();
        hashMap.put(this.persistentStorePartition, 123L);
        hashMap.put(this.nonPersistentStorePartition, 456L);
        hashMap.put(new TopicPartition("nonRegisteredTopic", 1), 789L);
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);
        try {
            Assert.assertFalse(this.checkpointFile.exists());
            stateManager.registerStore(this.persistentStore, this.persistentStore.stateRestoreCallback);
            stateManager.registerStore(this.nonPersistentStore, this.nonPersistentStore.stateRestoreCallback);
            stateManager.flush();
            Assert.assertTrue(this.persistentStore.flushed);
            Assert.assertTrue(this.nonPersistentStore.flushed);
            MatcherAssert.assertThat(Integer.valueOf(this.persistentStore.getLastFlushCount()), Matchers.lessThan(Integer.valueOf(this.nonPersistentStore.getLastFlushCount())));
            stateManager.updateChangelogOffsets(hashMap);
            stateManager.checkpoint();
            Assert.assertTrue(this.checkpointFile.exists());
            MatcherAssert.assertThat(this.checkpoint.read(), Is.is(Collections.singletonMap(new TopicPartition(this.persistentStoreTopicName, 1), 123L)));
            stateManager.close();
            Assert.assertTrue(this.persistentStore.closed);
            Assert.assertTrue(this.nonPersistentStore.closed);
        } catch (Throwable th) {
            stateManager.flush();
            Assert.assertTrue(this.persistentStore.flushed);
            Assert.assertTrue(this.nonPersistentStore.flushed);
            MatcherAssert.assertThat(Integer.valueOf(this.persistentStore.getLastFlushCount()), Matchers.lessThan(Integer.valueOf(this.nonPersistentStore.getLastFlushCount())));
            stateManager.updateChangelogOffsets(hashMap);
            stateManager.checkpoint();
            Assert.assertTrue(this.checkpointFile.exists());
            MatcherAssert.assertThat(this.checkpoint.read(), Is.is(Collections.singletonMap(new TopicPartition(this.persistentStoreTopicName, 1), 123L)));
            stateManager.close();
            Assert.assertTrue(this.persistentStore.closed);
            Assert.assertTrue(this.nonPersistentStore.closed);
            throw th;
        }
    }

    @Test
    public void shouldOverrideOffsetsWhenRestoreAndProcess() throws IOException {
        this.checkpoint.write(Collections.singletonMap(this.persistentStorePartition, 99L));
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);
        try {
            stateManager.registerStore(this.persistentStore, this.persistentStore.stateRestoreCallback);
            stateManager.initializeStoreOffsetsFromCheckpoint(true);
            ProcessorStateManager.StateStoreMetadata storeMetadata = stateManager.storeMetadata(this.persistentStorePartition);
            MatcherAssert.assertThat(storeMetadata, CoreMatchers.notNullValue());
            MatcherAssert.assertThat(storeMetadata.offset(), CoreMatchers.equalTo(99L));
            stateManager.restore(storeMetadata, Collections.singletonList(this.consumerRecord));
            MatcherAssert.assertThat(storeMetadata.offset(), CoreMatchers.equalTo(100L));
            stateManager.updateChangelogOffsets(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.persistentStorePartition, 220L), Utils.mkEntry(this.irrelevantPartition, 9000L)}));
            stateManager.checkpoint();
            MatcherAssert.assertThat(stateManager.storeMetadata(this.irrelevantPartition), CoreMatchers.equalTo((Object) null));
            MatcherAssert.assertThat(storeMetadata.offset(), CoreMatchers.equalTo(220L));
            stateManager.close();
        } catch (Throwable th) {
            stateManager.close();
            throw th;
        }
    }

    @Test
    public void shouldWriteCheckpointForPersistentStore() throws IOException {
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);
        try {
            stateManager.registerStore(this.persistentStore, this.persistentStore.stateRestoreCallback);
            stateManager.initializeStoreOffsetsFromCheckpoint(true);
            ProcessorStateManager.StateStoreMetadata storeMetadata = stateManager.storeMetadata(this.persistentStorePartition);
            MatcherAssert.assertThat(storeMetadata, CoreMatchers.notNullValue());
            stateManager.restore(storeMetadata, Collections.singletonList(this.consumerRecord));
            stateManager.checkpoint();
            MatcherAssert.assertThat(this.checkpoint.read(), CoreMatchers.equalTo(Collections.singletonMap(this.persistentStorePartition, 100L)));
            stateManager.close();
        } catch (Throwable th) {
            stateManager.close();
            throw th;
        }
    }

    @Test
    public void shouldNotWriteCheckpointForNonPersistentStore() throws IOException {
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);
        try {
            stateManager.registerStore(this.nonPersistentStore, this.nonPersistentStore.stateRestoreCallback);
            stateManager.initializeStoreOffsetsFromCheckpoint(true);
            MatcherAssert.assertThat(stateManager.storeMetadata(this.nonPersistentStorePartition), CoreMatchers.notNullValue());
            stateManager.updateChangelogOffsets(Collections.singletonMap(this.nonPersistentStorePartition, 876L));
            stateManager.checkpoint();
            MatcherAssert.assertThat(this.checkpoint.read(), CoreMatchers.equalTo(Collections.emptyMap()));
            stateManager.close();
        } catch (Throwable th) {
            stateManager.close();
            throw th;
        }
    }

    @Test
    public void shouldNotWriteCheckpointForStoresWithoutChangelogTopic() throws IOException {
        ProcessorStateManager processorStateManager = new ProcessorStateManager(this.taskId, Task.TaskType.STANDBY, false, this.logContext, this.stateDirectory, this.changelogReader, Collections.emptyMap(), Collections.emptySet());
        try {
            processorStateManager.registerStore(this.persistentStore, this.persistentStore.stateRestoreCallback);
            processorStateManager.updateChangelogOffsets(Collections.singletonMap(this.persistentStorePartition, 987L));
            processorStateManager.checkpoint();
            MatcherAssert.assertThat(this.checkpoint.read(), CoreMatchers.equalTo(Collections.emptyMap()));
        } finally {
            processorStateManager.close();
        }
    }

    @Test
    public void shouldThrowIllegalArgumentExceptionIfStoreNameIsSameAsCheckpointFileName() {
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            stateManager.registerStore(new MockKeyValueStore(".checkpoint", true), (StateRestoreCallback) null);
        });
    }

    @Test
    public void shouldThrowIllegalArgumentExceptionOnRegisterWhenStoreHasAlreadyBeenRegistered() {
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);
        stateManager.registerStore(this.persistentStore, this.persistentStore.stateRestoreCallback);
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            stateManager.registerStore(this.persistentStore, this.persistentStore.stateRestoreCallback);
        });
    }

    @Test
    public void shouldThrowProcessorStateExceptionOnFlushIfStoreThrowsAnException() {
        final RuntimeException runtimeException = new RuntimeException("KABOOM!");
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);
        MockKeyValueStore mockKeyValueStore = new MockKeyValueStore("persistentStore", true) { // from class: org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest.2
            @Override // org.apache.kafka.test.MockKeyValueStore
            public void flush() {
                throw runtimeException;
            }
        };
        stateManager.registerStore(mockKeyValueStore, mockKeyValueStore.stateRestoreCallback);
        stateManager.getClass();
        Assert.assertEquals(runtimeException, Assert.assertThrows(ProcessorStateException.class, stateManager::flush).getCause());
    }

    @Test
    public void shouldPreserveStreamsExceptionOnFlushIfStoreThrows() {
        final StreamsException streamsException = new StreamsException("KABOOM!");
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);
        MockKeyValueStore mockKeyValueStore = new MockKeyValueStore("persistentStore", true) { // from class: org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest.3
            @Override // org.apache.kafka.test.MockKeyValueStore
            public void flush() {
                throw streamsException;
            }
        };
        stateManager.registerStore(mockKeyValueStore, mockKeyValueStore.stateRestoreCallback);
        stateManager.getClass();
        Assert.assertEquals(streamsException, Assert.assertThrows(StreamsException.class, stateManager::flush));
    }

    @Test
    public void shouldThrowProcessorStateExceptionOnCloseIfStoreThrowsAnException() {
        final RuntimeException runtimeException = new RuntimeException("KABOOM!");
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);
        MockKeyValueStore mockKeyValueStore = new MockKeyValueStore("persistentStore", true) { // from class: org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest.4
            @Override // org.apache.kafka.test.MockKeyValueStore
            public void close() {
                throw runtimeException;
            }
        };
        stateManager.registerStore(mockKeyValueStore, mockKeyValueStore.stateRestoreCallback);
        stateManager.getClass();
        Assert.assertEquals(runtimeException, Assert.assertThrows(ProcessorStateException.class, stateManager::close).getCause());
    }

    @Test
    public void shouldPreserveStreamsExceptionOnCloseIfStoreThrows() {
        final StreamsException streamsException = new StreamsException("KABOOM!");
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);
        MockKeyValueStore mockKeyValueStore = new MockKeyValueStore("persistentStore", true) { // from class: org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest.5
            @Override // org.apache.kafka.test.MockKeyValueStore
            public void close() {
                throw streamsException;
            }
        };
        stateManager.registerStore(mockKeyValueStore, mockKeyValueStore.stateRestoreCallback);
        stateManager.getClass();
        Assert.assertEquals(streamsException, Assert.assertThrows(StreamsException.class, stateManager::close));
    }

    @Test
    public void shouldThrowIfRestoringUnregisteredStore() {
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);
        Assert.assertThrows(IllegalStateException.class, () -> {
            stateManager.restore(this.storeMetadata, Collections.emptyList());
        });
    }

    @Test
    public void shouldLogAWarningIfCheckpointThrowsAnIOException() {
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);
        stateManager.registerStore(this.persistentStore, this.persistentStore.stateRestoreCallback);
        this.stateDirectory.clean();
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(ProcessorStateManager.class);
        Throwable th = null;
        try {
            stateManager.updateChangelogOffsets(Collections.singletonMap(this.persistentStorePartition, 10L));
            stateManager.checkpoint();
            boolean z = false;
            Iterator<LogCaptureAppender.Event> it = createAndRegister.getEvents().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                LogCaptureAppender.Event next = it.next();
                if ("WARN".equals(next.getLevel()) && next.getMessage().startsWith("process-state-manager-test Failed to write offset checkpoint file to [") && next.getMessage().endsWith(".checkpoint]. This may occur if OS cleaned the state.dir in case when it located in /tmp directory. This may also occur due to running multiple instances on the same machine using the same state dir. Changing the location of state.dir may resolve the problem.") && next.getThrowableInfo().get().startsWith("java.io.FileNotFoundException: ")) {
                    z = true;
                    break;
                }
            }
            Assert.assertTrue(z);
            if (createAndRegister != null) {
                if (0 == 0) {
                    createAndRegister.close();
                    return;
                }
                try {
                    createAndRegister.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createAndRegister != null) {
                if (0 != 0) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldThrowIfLoadCheckpointThrows() throws Exception {
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);
        stateManager.registerStore(this.persistentStore, this.persistentStore.stateRestoreCallback);
        File file = new File(stateManager.baseDir(), ".checkpoint");
        file.createNewFile();
        FileWriter fileWriter = new FileWriter(file);
        fileWriter.write("abcdefg");
        fileWriter.close();
        try {
            stateManager.initializeStoreOffsetsFromCheckpoint(true);
            Assert.fail("should have thrown processor state exception when IO exception happens");
        } catch (ProcessorStateException e) {
        }
    }

    @Test
    public void shouldThrowIfRestoreCallbackThrows() {
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);
        stateManager.registerStore(this.persistentStore, (bArr, bArr2) -> {
            throw new RuntimeException("KABOOM!");
        });
        try {
            stateManager.restore(stateManager.storeMetadata(this.persistentStorePartition), Collections.singletonList(this.consumerRecord));
            Assert.fail("should have thrown processor state exception when IO exception happens");
        } catch (ProcessorStateException e) {
        }
    }

    @Test
    public void shouldFlushGoodStoresEvenSomeThrowsException() {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        MockKeyValueStore mockKeyValueStore = new MockKeyValueStore("persistentStore", true) { // from class: org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest.6
            @Override // org.apache.kafka.test.MockKeyValueStore
            public void flush() {
                throw new RuntimeException("KABOOM!");
            }
        };
        MockKeyValueStore mockKeyValueStore2 = new MockKeyValueStore("persistentStore2", true) { // from class: org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest.7
            @Override // org.apache.kafka.test.MockKeyValueStore
            public void flush() {
                atomicBoolean.set(true);
            }
        };
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);
        stateManager.registerStore(mockKeyValueStore, mockKeyValueStore.stateRestoreCallback);
        stateManager.registerStore(mockKeyValueStore2, mockKeyValueStore2.stateRestoreCallback);
        try {
            stateManager.flush();
        } catch (ProcessorStateException e) {
        }
        Assert.assertTrue(atomicBoolean.get());
    }

    @Test
    public void shouldCloseAllStoresEvenIfStoreThrowsException() {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        MockKeyValueStore mockKeyValueStore = new MockKeyValueStore("persistentStore", true) { // from class: org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest.8
            @Override // org.apache.kafka.test.MockKeyValueStore
            public void close() {
                throw new RuntimeException("KABOOM!");
            }
        };
        MockKeyValueStore mockKeyValueStore2 = new MockKeyValueStore("persistentStore2", true) { // from class: org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest.9
            @Override // org.apache.kafka.test.MockKeyValueStore
            public void close() {
                atomicBoolean.set(true);
            }
        };
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);
        stateManager.registerStore(mockKeyValueStore, mockKeyValueStore.stateRestoreCallback);
        stateManager.registerStore(mockKeyValueStore2, mockKeyValueStore2.stateRestoreCallback);
        try {
            stateManager.close();
        } catch (ProcessorStateException e) {
        }
        Assert.assertTrue(atomicBoolean.get());
    }

    @Test
    public void shouldThrowTaskCorruptedWithoutPersistentStoreCheckpointAndNonEmptyDir() throws IOException {
        this.checkpoint.write(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.persistentStorePartition, 10L), Utils.mkEntry(this.nonPersistentStorePartition, 10L), Utils.mkEntry(this.irrelevantPartition, 999L)}));
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE, true);
        try {
            stateManager.registerStore(this.persistentStore, this.persistentStore.stateRestoreCallback);
            stateManager.registerStore(this.persistentStoreTwo, this.persistentStoreTwo.stateRestoreCallback);
            stateManager.registerStore(this.nonPersistentStore, this.nonPersistentStore.stateRestoreCallback);
            Assert.assertEquals(Collections.singletonMap(this.taskId, stateManager.changelogPartitions()), Assert.assertThrows(TaskCorruptedException.class, () -> {
                stateManager.initializeStoreOffsetsFromCheckpoint(false);
            }).corruptedTaskWithChangelogs());
            stateManager.close();
        } catch (Throwable th) {
            stateManager.close();
            throw th;
        }
    }

    @Test
    public void shouldNotThrowTaskCorruptedWithoutInMemoryStoreCheckpointAndNonEmptyDir() throws IOException {
        this.checkpoint.write(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.persistentStorePartition, 10L), Utils.mkEntry(this.irrelevantPartition, 999L)}));
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE, true);
        try {
            stateManager.registerStore(this.persistentStore, this.persistentStore.stateRestoreCallback);
            stateManager.registerStore(this.nonPersistentStore, this.nonPersistentStore.stateRestoreCallback);
            stateManager.initializeStoreOffsetsFromCheckpoint(false);
            stateManager.close();
        } catch (Throwable th) {
            stateManager.close();
            throw th;
        }
    }

    @Test
    public void shouldNotThrowTaskCorruptedExceptionAfterCheckpointing() {
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE, true);
        try {
            stateManager.registerStore(this.persistentStore, this.persistentStore.stateRestoreCallback);
            stateManager.registerStore(this.nonPersistentStore, this.nonPersistentStore.stateRestoreCallback);
            stateManager.initializeStoreOffsetsFromCheckpoint(true);
            MatcherAssert.assertThat(stateManager.storeMetadata(this.nonPersistentStorePartition), CoreMatchers.notNullValue());
            MatcherAssert.assertThat(stateManager.storeMetadata(this.persistentStorePartition), CoreMatchers.notNullValue());
            stateManager.updateChangelogOffsets(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.nonPersistentStorePartition, 876L), Utils.mkEntry(this.persistentStorePartition, 666L)}));
            stateManager.checkpoint();
            stateManager.close();
            Assert.assertNull(stateManager.storeMetadata(this.nonPersistentStorePartition));
            Assert.assertNull(stateManager.storeMetadata(this.persistentStorePartition));
            stateManager.registerStore(this.persistentStore, this.persistentStore.stateRestoreCallback);
            stateManager.registerStore(this.nonPersistentStore, this.nonPersistentStore.stateRestoreCallback);
            stateManager.initializeStoreOffsetsFromCheckpoint(false);
            MatcherAssert.assertThat(stateManager.storeMetadata(this.nonPersistentStorePartition), CoreMatchers.notNullValue());
            MatcherAssert.assertThat(stateManager.storeMetadata(this.persistentStorePartition), CoreMatchers.notNullValue());
        } finally {
            stateManager.close();
        }
    }

    @Test
    public void shouldThrowIllegalStateIfInitializingOffsetsForCorruptedTasks() {
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE, true);
        try {
            stateManager.registerStore(this.persistentStore, this.persistentStore.stateRestoreCallback);
            stateManager.markChangelogAsCorrupted(Utils.mkSet(new TopicPartition[]{this.persistentStorePartition}));
            Assert.assertTrue(Assert.assertThrows(ProcessorStateException.class, () -> {
                stateManager.initializeStoreOffsetsFromCheckpoint(true);
            }).getCause() instanceof IllegalStateException);
        } finally {
            stateManager.close();
        }
    }

    @Test
    public void shouldBeAbleToCloseWithoutRegisteringAnyStores() {
        getStateManager(Task.TaskType.ACTIVE, true).close();
    }

    @Test
    public void shouldDeleteCheckPointFileIfEosEnabled() throws IOException {
        this.checkpoint.write(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.persistentStorePartition, 10L), Utils.mkEntry(this.nonPersistentStorePartition, 10L), Utils.mkEntry(this.irrelevantPartition, 999L)}));
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE, true);
        stateManager.deleteCheckPointFileIfEOSEnabled();
        stateManager.close();
        Assert.assertFalse(this.checkpointFile.exists());
    }

    @Test
    public void shouldNotDeleteCheckPointFileIfEosNotEnabled() throws IOException {
        this.checkpoint.write(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.persistentStorePartition, 10L), Utils.mkEntry(this.nonPersistentStorePartition, 10L), Utils.mkEntry(this.irrelevantPartition, 999L)}));
        ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE, false);
        stateManager.deleteCheckPointFileIfEOSEnabled();
        stateManager.close();
        Assert.assertTrue(this.checkpointFile.exists());
    }

    private ProcessorStateManager getStateManager(Task.TaskType taskType, boolean z) {
        return new ProcessorStateManager(this.taskId, taskType, z, this.logContext, this.stateDirectory, this.changelogReader, Utils.mkMap(new Map.Entry[]{Utils.mkEntry("persistentStore", this.persistentStoreTopicName), Utils.mkEntry("persistentStore2", this.persistentStoreTwoTopicName), Utils.mkEntry("nonPersistentStore", this.nonPersistentStoreTopicName)}), Collections.emptySet());
    }

    private ProcessorStateManager getStateManager(Task.TaskType taskType) {
        return getStateManager(taskType, false);
    }

    private MockKeyValueStore getConverterStore() {
        return new ConverterStore("persistentStore", true);
    }
}
