package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockProcessorContext;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.NoOpRecordCollector;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.class */
public class ProcessorStateManagerTest {
    private File baseDir;
    private StateDirectory stateDirectory;
    private final Set<TopicPartition> noPartitions = Collections.emptySet();
    private final String applicationId = "test-application";
    private final String stateDir = "test";
    private final String persistentStoreName = "persistentStore";
    private final String nonPersistentStoreName = "nonPersistentStore";
    private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic("test-application", "persistentStore");
    private final String nonPersistentStoreTopicName = ProcessorStateManager.storeChangelogTopic("test-application", "nonPersistentStore");

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest$MockRestoreConsumer.class */
    public static class MockRestoreConsumer extends MockConsumer<byte[], byte[]> {
        private final Serializer<Integer> serializer;
        public TopicPartition assignedPartition;
        public TopicPartition seekPartition;
        public long seekOffset;
        public boolean seekToBeginingCalled;
        public boolean seekToEndCalled;
        private long endOffset;
        private long currentOffset;
        private ArrayList<ConsumerRecord<byte[], byte[]>> recordBuffer;

        /* JADX INFO: Access modifiers changed from: package-private */
        public MockRestoreConsumer() {
            super(OffsetResetStrategy.EARLIEST);
            this.serializer = new IntegerSerializer();
            this.assignedPartition = null;
            this.seekPartition = null;
            this.seekOffset = -1L;
            this.seekToBeginingCalled = false;
            this.seekToEndCalled = false;
            this.endOffset = 0L;
            this.currentOffset = 0L;
            this.recordBuffer = new ArrayList<>();
            reset();
        }

        public void reset() {
            this.assignedPartition = null;
            this.seekOffset = -1L;
            this.seekToBeginingCalled = false;
            this.seekToEndCalled = false;
            this.endOffset = 0L;
            this.recordBuffer.clear();
        }

        public void bufferRecord(ConsumerRecord<Integer, Integer> consumerRecord) {
            this.recordBuffer.add(new ConsumerRecord<>(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset(), 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.serializer.serialize(consumerRecord.topic(), consumerRecord.key()), this.serializer.serialize(consumerRecord.topic(), consumerRecord.value())));
            this.endOffset = consumerRecord.offset();
            super.updateEndOffsets(Collections.singletonMap(this.assignedPartition, Long.valueOf(this.endOffset)));
        }

        public synchronized void assign(Collection<TopicPartition> collection) {
            int size = collection.size();
            if (size > 1) {
                throw new IllegalArgumentException("RestoreConsumer: more than one partition specified");
            }
            if (size == 1) {
                if (this.assignedPartition != null) {
                    throw new IllegalStateException("RestoreConsumer: partition already assigned");
                }
                this.assignedPartition = collection.iterator().next();
                super.updateBeginningOffsets(Collections.singletonMap(this.assignedPartition, 0L));
            }
            super.assign(collection);
        }

        public ConsumerRecords<byte[], byte[]> poll(long j) {
            Iterator<ConsumerRecord<byte[], byte[]>> it = this.recordBuffer.iterator();
            while (it.hasNext()) {
                super.addRecord(it.next());
            }
            this.recordBuffer.clear();
            ConsumerRecords<byte[], byte[]> poll = super.poll(j);
            Iterator it2 = poll.records(this.assignedPartition).iterator();
            while (it2.hasNext()) {
                this.currentOffset = ((ConsumerRecord) it2.next()).offset();
            }
            return poll;
        }

        public synchronized long position(TopicPartition topicPartition) {
            if (topicPartition.equals(this.assignedPartition)) {
                return this.currentOffset;
            }
            throw new IllegalStateException("RestoreConsumer: unassigned partition");
        }

        public synchronized void seek(TopicPartition topicPartition, long j) {
            if (j < 0) {
                throw new IllegalArgumentException("RestoreConsumer: offset should not be negative");
            }
            if (this.seekOffset >= 0) {
                throw new IllegalStateException("RestoreConsumer: offset already seeked");
            }
            this.seekPartition = topicPartition;
            this.seekOffset = j;
            this.currentOffset = j;
            super.seek(topicPartition, j);
        }

        public synchronized void seekToBeginning(Collection<TopicPartition> collection) {
            if (collection.size() != 1) {
                throw new IllegalStateException("RestoreConsumer: other than one partition specified");
            }
            Iterator<TopicPartition> it = collection.iterator();
            while (it.hasNext()) {
                if (!it.next().equals(this.assignedPartition)) {
                    throw new IllegalStateException("RestoreConsumer: seek-to-end not on the assigned partition");
                }
            }
            this.seekToBeginingCalled = true;
            this.currentOffset = 0L;
        }

        public synchronized void seekToEnd(Collection<TopicPartition> collection) {
            if (collection.size() != 1) {
                throw new IllegalStateException("RestoreConsumer: other than one partition specified");
            }
            Iterator<TopicPartition> it = collection.iterator();
            while (it.hasNext()) {
                if (!it.next().equals(this.assignedPartition)) {
                    throw new IllegalStateException("RestoreConsumer: seek-to-end not on the assigned partition");
                }
            }
            this.seekToEndCalled = true;
            this.currentOffset = this.endOffset;
        }
    }

    @Before
    public void setup() {
        this.baseDir = TestUtils.tempDirectory();
        this.stateDirectory = new StateDirectory("test-application", this.baseDir.getPath());
    }

    @After
    public void cleanup() {
        Utils.delete(this.baseDir);
    }

    @Test(expected = StreamsException.class)
    public void testNoTopic() throws IOException {
        MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false);
        ProcessorStateManager processorStateManager = new ProcessorStateManager("test-application", new TaskId(0, 1), this.noPartitions, new MockRestoreConsumer(), false, this.stateDirectory, (Map) null, Collections.emptyMap());
        try {
            processorStateManager.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
            processorStateManager.close(Collections.emptyMap());
        } catch (Throwable th) {
            processorStateManager.close(Collections.emptyMap());
            throw th;
        }
    }

    @Test
    public void testRegisterPersistentStore() throws IOException {
        TaskId taskId = new TaskId(0, 2);
        new OffsetCheckpoint(new File(this.stateDirectory.directoryForTask(taskId), ".checkpoint")).write(Collections.singletonMap(new TopicPartition(this.persistentStoreTopicName, 2), 10L));
        MockRestoreConsumer mockRestoreConsumer = new MockRestoreConsumer();
        mockRestoreConsumer.updatePartitions(this.persistentStoreTopicName, Utils.mkList(new PartitionInfo[]{new PartitionInfo(this.persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(this.persistentStoreTopicName, 2, Node.noNode(), new Node[0], new Node[0])}));
        mockRestoreConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(this.persistentStoreTopicName, 2), 13L));
        MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true);
        ProcessorStateManager processorStateManager = new ProcessorStateManager("test-application", taskId, this.noPartitions, mockRestoreConsumer, false, this.stateDirectory, (Map) null, Collections.emptyMap());
        try {
            mockRestoreConsumer.reset();
            ArrayList arrayList = new ArrayList();
            for (int i = 1; i <= 3; i++) {
                int i2 = i * 10;
                arrayList.add(Integer.valueOf(i2));
                mockRestoreConsumer.bufferRecord(new ConsumerRecord<>(this.persistentStoreTopicName, 2, 0L, i, TimestampType.CREATE_TIME, 0L, 0, 0, Integer.valueOf(i2), 0));
            }
            processorStateManager.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
            Assert.assertEquals(new TopicPartition(this.persistentStoreTopicName, 2), mockRestoreConsumer.assignedPartition);
            Assert.assertEquals(10L, mockRestoreConsumer.seekOffset);
            Assert.assertFalse(mockRestoreConsumer.seekToBeginingCalled);
            Assert.assertTrue(mockRestoreConsumer.seekToEndCalled);
            Assert.assertEquals(arrayList, mockStateStore.keys);
            processorStateManager.close(Collections.emptyMap());
        } catch (Throwable th) {
            processorStateManager.close(Collections.emptyMap());
            throw th;
        }
    }

    @Test
    public void testRegisterNonPersistentStore() throws IOException {
        MockRestoreConsumer mockRestoreConsumer = new MockRestoreConsumer();
        new OffsetCheckpoint(new File(this.baseDir, ".checkpoint")).write(Collections.singletonMap(new TopicPartition(this.persistentStoreTopicName, 2), 10L));
        mockRestoreConsumer.updatePartitions(this.nonPersistentStoreTopicName, Utils.mkList(new PartitionInfo[]{new PartitionInfo(this.nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(this.nonPersistentStoreTopicName, 2, Node.noNode(), new Node[0], new Node[0])}));
        mockRestoreConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(this.persistentStoreTopicName, 2), 13L));
        MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false);
        ProcessorStateManager processorStateManager = new ProcessorStateManager("test-application", new TaskId(0, 2), this.noPartitions, mockRestoreConsumer, false, this.stateDirectory, (Map) null, Collections.emptyMap());
        try {
            mockRestoreConsumer.reset();
            ArrayList arrayList = new ArrayList();
            for (int i = 1; i <= 3; i++) {
                arrayList.add(Integer.valueOf(i));
                mockRestoreConsumer.bufferRecord(new ConsumerRecord<>(this.nonPersistentStoreTopicName, 2, 0L, i + 100, TimestampType.CREATE_TIME, 0L, 0, 0, Integer.valueOf(i), 0));
            }
            processorStateManager.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
            Assert.assertEquals(new TopicPartition(this.nonPersistentStoreTopicName, 2), mockRestoreConsumer.assignedPartition);
            Assert.assertEquals(0L, mockRestoreConsumer.seekOffset);
            Assert.assertTrue(mockRestoreConsumer.seekToBeginingCalled);
            Assert.assertTrue(mockRestoreConsumer.seekToEndCalled);
            Assert.assertEquals(arrayList, mockStateStore.keys);
            processorStateManager.close(Collections.emptyMap());
        } catch (Throwable th) {
            processorStateManager.close(Collections.emptyMap());
            throw th;
        }
    }

    @Test
    public void testChangeLogOffsets() throws IOException {
        TaskId taskId = new TaskId(0, 0);
        String storeChangelogTopic = ProcessorStateManager.storeChangelogTopic("test-application", "store1");
        String storeChangelogTopic2 = ProcessorStateManager.storeChangelogTopic("test-application", "store2");
        String storeChangelogTopic3 = ProcessorStateManager.storeChangelogTopic("test-application", "store3");
        new OffsetCheckpoint(new File(this.stateDirectory.directoryForTask(taskId), ".checkpoint")).write(Collections.singletonMap(new TopicPartition(storeChangelogTopic, 0), 10L));
        MockRestoreConsumer mockRestoreConsumer = new MockRestoreConsumer();
        mockRestoreConsumer.updatePartitions(storeChangelogTopic, Utils.mkList(new PartitionInfo[]{new PartitionInfo(storeChangelogTopic, 0, Node.noNode(), new Node[0], new Node[0])}));
        mockRestoreConsumer.updatePartitions(storeChangelogTopic2, Utils.mkList(new PartitionInfo[]{new PartitionInfo(storeChangelogTopic2, 0, Node.noNode(), new Node[0], new Node[0])}));
        mockRestoreConsumer.updatePartitions(storeChangelogTopic3, Utils.mkList(new PartitionInfo[]{new PartitionInfo(storeChangelogTopic3, 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(storeChangelogTopic3, 1, Node.noNode(), new Node[0], new Node[0])}));
        TopicPartition topicPartition = new TopicPartition(storeChangelogTopic, 0);
        TopicPartition topicPartition2 = new TopicPartition(storeChangelogTopic2, 0);
        TopicPartition topicPartition3 = new TopicPartition(storeChangelogTopic3, 1);
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, 13L);
        hashMap.put(topicPartition2, 17L);
        mockRestoreConsumer.updateEndOffsets(hashMap);
        MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("store1", true);
        MockStateStoreSupplier.MockStateStore mockStateStore2 = new MockStateStoreSupplier.MockStateStore("store2", true);
        MockStateStoreSupplier.MockStateStore mockStateStore3 = new MockStateStoreSupplier.MockStateStore("store3", true);
        ProcessorStateManager processorStateManager = new ProcessorStateManager("test-application", taskId, Utils.mkSet(new TopicPartition[]{new TopicPartition(storeChangelogTopic3, 1)}), mockRestoreConsumer, true, this.stateDirectory, (Map) null, Collections.emptyMap());
        try {
            mockRestoreConsumer.reset();
            processorStateManager.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
            processorStateManager.register(mockStateStore2, true, mockStateStore2.stateRestoreCallback);
            processorStateManager.register(mockStateStore3, true, mockStateStore3.stateRestoreCallback);
            Map checkpointedOffsets = processorStateManager.checkpointedOffsets();
            Assert.assertEquals(3L, checkpointedOffsets.size());
            Assert.assertTrue(checkpointedOffsets.containsKey(topicPartition));
            Assert.assertTrue(checkpointedOffsets.containsKey(topicPartition2));
            Assert.assertTrue(checkpointedOffsets.containsKey(topicPartition3));
            Assert.assertEquals(10L, ((Long) checkpointedOffsets.get(topicPartition)).longValue());
            Assert.assertEquals(-1L, ((Long) checkpointedOffsets.get(topicPartition2)).longValue());
            Assert.assertEquals(-1L, ((Long) checkpointedOffsets.get(topicPartition3)).longValue());
            processorStateManager.close(Collections.emptyMap());
        } catch (Throwable th) {
            processorStateManager.close(Collections.emptyMap());
            throw th;
        }
    }

    @Test
    public void testGetStore() throws IOException {
        MockRestoreConsumer mockRestoreConsumer = new MockRestoreConsumer();
        mockRestoreConsumer.updatePartitions(this.nonPersistentStoreTopicName, Utils.mkList(new PartitionInfo[]{new PartitionInfo(this.nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])}));
        MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false);
        ProcessorStateManager processorStateManager = new ProcessorStateManager("test-application", new TaskId(0, 1), this.noPartitions, mockRestoreConsumer, false, this.stateDirectory, (Map) null, Collections.emptyMap());
        try {
            processorStateManager.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
            Assert.assertNull(processorStateManager.getStore("noSuchStore"));
            Assert.assertEquals(mockStateStore, processorStateManager.getStore("nonPersistentStore"));
            processorStateManager.close(Collections.emptyMap());
        } catch (Throwable th) {
            processorStateManager.close(Collections.emptyMap());
            throw th;
        }
    }

    @Test
    public void testFlushAndClose() throws IOException {
        TaskId taskId = new TaskId(0, 1);
        File file = new File(this.stateDirectory.directoryForTask(taskId), ".checkpoint");
        new OffsetCheckpoint(file).write(Collections.emptyMap());
        MockRestoreConsumer mockRestoreConsumer = new MockRestoreConsumer();
        mockRestoreConsumer.updatePartitions(this.persistentStoreTopicName, Utils.mkList(new PartitionInfo[]{new PartitionInfo(this.persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])}));
        mockRestoreConsumer.updatePartitions(this.nonPersistentStoreTopicName, Utils.mkList(new PartitionInfo[]{new PartitionInfo(this.nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])}));
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition(this.persistentStoreTopicName, 1), 123L);
        hashMap.put(new TopicPartition(this.nonPersistentStoreTopicName, 1), 456L);
        hashMap.put(new TopicPartition(ProcessorStateManager.storeChangelogTopic("test-application", "otherTopic"), 1), 789L);
        MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true);
        MockStateStoreSupplier.MockStateStore mockStateStore2 = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false);
        ProcessorStateManager processorStateManager = new ProcessorStateManager("test-application", taskId, this.noPartitions, mockRestoreConsumer, false, this.stateDirectory, (Map) null, Collections.emptyMap());
        try {
            Assert.assertFalse(file.exists());
            mockRestoreConsumer.reset();
            processorStateManager.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
            mockRestoreConsumer.reset();
            processorStateManager.register(mockStateStore2, true, mockStateStore2.stateRestoreCallback);
            processorStateManager.flush(new MockProcessorContext(StateSerdes.withBuiltinTypes("foo", String.class, String.class), new NoOpRecordCollector()));
            processorStateManager.close(hashMap);
            Assert.assertTrue(mockStateStore.flushed);
            Assert.assertTrue(mockStateStore.closed);
            Assert.assertTrue(mockStateStore2.flushed);
            Assert.assertTrue(mockStateStore2.closed);
            Assert.assertTrue(file.exists());
            Map read = new OffsetCheckpoint(file).read();
            Assert.assertEquals(1L, read.size());
            Assert.assertEquals(new Long(124L), read.get(new TopicPartition(this.persistentStoreTopicName, 1)));
        } catch (Throwable th) {
            processorStateManager.flush(new MockProcessorContext(StateSerdes.withBuiltinTypes("foo", String.class, String.class), new NoOpRecordCollector()));
            processorStateManager.close(hashMap);
            throw th;
        }
    }

    @Test
    public void shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() throws Exception {
        MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false);
        ProcessorStateManager processorStateManager = new ProcessorStateManager("test-application", new TaskId(0, 1), this.noPartitions, new MockRestoreConsumer(), false, this.stateDirectory, (Map) null, Collections.emptyMap());
        processorStateManager.register(mockStateStore, false, mockStateStore.stateRestoreCallback);
        Assert.assertNotNull(processorStateManager.getStore("nonPersistentStore"));
    }
}
