package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThreadTest.class */
public class StreamThreadTest {
    private InternalTopologyBuilder internalTopologyBuilder;
    private StreamsMetadataState streamsMetadataState;
    private final String clientId = "clientId";
    private final String applicationId = "stream-thread-test";
    private final MockTime mockTime = new MockTime();
    private final Metrics metrics = new Metrics();
    private final MockClientSupplier clientSupplier = new MockClientSupplier();
    private UUID processId = UUID.randomUUID();
    private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder());
    private final StreamsConfig config = new StreamsConfig(configProps(false));
    private final String stateDir = TestUtils.tempDirectory().getPath();
    private final StateDirectory stateDirectory = new StateDirectory(this.config, this.mockTime);
    private final ConsumedInternal<Object, Object> consumed = new ConsumedInternal<>();
    private final String topic1 = "topic1";
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final TopicPartition t1p2 = new TopicPartition("topic1", 2);
    private final TaskId task1 = new TaskId(0, 1);
    private final TaskId task2 = new TaskId(0, 2);

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThreadTest$StateListenerStub.class */
    private static class StateListenerStub implements StreamThread.StateListener {
        int numChanges;
        ThreadStateTransitionValidator oldState;
        ThreadStateTransitionValidator newState;

        private StateListenerStub() {
            this.numChanges = 0;
            this.oldState = null;
            this.newState = null;
        }

        public void onChange(Thread thread, ThreadStateTransitionValidator threadStateTransitionValidator, ThreadStateTransitionValidator threadStateTransitionValidator2) {
            this.numChanges++;
            if (this.newState != null && this.newState != threadStateTransitionValidator2) {
                throw new RuntimeException("State mismatch " + threadStateTransitionValidator2 + " different from " + this.newState);
            }
            this.oldState = threadStateTransitionValidator2;
            this.newState = threadStateTransitionValidator;
        }
    }

    @Before
    public void setUp() {
        this.processId = UUID.randomUUID();
        this.internalTopologyBuilder = InternalStreamsBuilderTest.internalTopologyBuilder(this.internalStreamsBuilder);
        this.internalTopologyBuilder.setApplicationId("stream-thread-test");
        this.streamsMetadataState = new StreamsMetadataState(this.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST);
    }

    private Properties configProps(final boolean z) {
        return new Properties() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.1
            {
                setProperty("application.id", "stream-thread-test");
                setProperty("bootstrap.servers", "localhost:2171");
                setProperty("buffered.records.per.partition", "3");
                setProperty("default.timestamp.extractor", MockTimestampExtractor.class.getName());
                setProperty("state.dir", TestUtils.tempDirectory().getAbsolutePath());
                if (z) {
                    setProperty("processing.guarantee", "exactly_once");
                }
            }
        };
    }

    @Test
    public void testPartitionAssignmentChangeForSingleGroup() {
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        StreamThread streamThread = getStreamThread();
        StateListenerStub stateListenerStub = new StateListenerStub();
        streamThread.setStateListener(stateListenerStub);
        Assert.assertEquals(streamThread.state(), StreamThread.State.CREATED);
        ConsumerRebalanceListener consumerRebalanceListener = streamThread.rebalanceListener;
        streamThread.setState(StreamThread.State.RUNNING);
        consumerRebalanceListener.onPartitionsRevoked(Collections.emptyList());
        Assert.assertEquals(streamThread.state(), StreamThread.State.PARTITIONS_REVOKED);
        List singletonList = Collections.singletonList(this.t1p1);
        streamThread.taskManager().setAssignmentMetadata(Collections.emptyMap(), Collections.emptyMap());
        MockConsumer mockConsumer = streamThread.consumer;
        mockConsumer.assign(singletonList);
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        consumerRebalanceListener.onPartitionsAssigned(singletonList);
        streamThread.runOnce(-1L);
        Assert.assertEquals(streamThread.state(), StreamThread.State.RUNNING);
        Assert.assertEquals(4L, stateListenerStub.numChanges);
        Assert.assertEquals(StreamThread.State.PARTITIONS_ASSIGNED, stateListenerStub.oldState);
        streamThread.shutdown();
        Assert.assertTrue(streamThread.state() == StreamThread.State.PENDING_SHUTDOWN);
    }

    @Test
    public void testStateChangeStartClose() throws InterruptedException {
        final StreamThread createStreamThread = createStreamThread("clientId", this.config, false);
        createStreamThread.setStateListener(new StateListenerStub());
        createStreamThread.start();
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.2
            public boolean conditionMet() {
                return createStreamThread.state() == StreamThread.State.RUNNING;
            }
        }, 10000L, "Thread never started.");
        createStreamThread.shutdown();
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.3
            public boolean conditionMet() {
                return createStreamThread.state() == StreamThread.State.DEAD;
            }
        }, 10000L, "Thread never shut down.");
        createStreamThread.shutdown();
        Assert.assertEquals(createStreamThread.state(), StreamThread.State.DEAD);
    }

    private Cluster createCluster(int i) {
        HashMap hashMap = new HashMap();
        for (int i2 = 0; i2 < i; i2++) {
            hashMap.put(Integer.valueOf(i2), new Node(i2, "localhost", 8121 + i2));
        }
        return new Cluster("mockClusterId", hashMap.values(), Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), (Node) hashMap.get(0));
    }

    private StreamThread createStreamThread(String str, StreamsConfig streamsConfig, boolean z) {
        if (z) {
            this.clientSupplier.setApplicationIdForProducer("stream-thread-test");
        }
        this.clientSupplier.setClusterForAdminClient(createCluster(1));
        return StreamThread.create(this.internalTopologyBuilder, streamsConfig, this.clientSupplier, this.clientSupplier.getAdminClient(streamsConfig.getAdminConfigs(str)), this.processId, str, this.metrics, this.mockTime, this.streamsMetadataState, 0L, this.stateDirectory, new MockStateRestoreListener());
    }

    @Test
    public void testMetrics() {
        StreamThread createStreamThread = createStreamThread("clientId", this.config, false);
        String str = "thread." + createStreamThread.getName();
        Map singletonMap = Collections.singletonMap("client-id", createStreamThread.getName());
        Assert.assertNotNull(this.metrics.getSensor(str + ".commit-latency"));
        Assert.assertNotNull(this.metrics.getSensor(str + ".poll-latency"));
        Assert.assertNotNull(this.metrics.getSensor(str + ".process-latency"));
        Assert.assertNotNull(this.metrics.getSensor(str + ".punctuate-latency"));
        Assert.assertNotNull(this.metrics.getSensor(str + ".task-created"));
        Assert.assertNotNull(this.metrics.getSensor(str + ".task-closed"));
        Assert.assertNotNull(this.metrics.getSensor(str + ".skipped-records"));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-latency-avg", "stream-metrics", "The average commit time in ms", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-latency-max", "stream-metrics", "The maximum commit time in ms", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-rate", "stream-metrics", "The average per-second number of commit calls", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-latency-avg", "stream-metrics", "The average poll time in ms", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-latency-max", "stream-metrics", "The maximum poll time in ms", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-rate", "stream-metrics", "The average per-second number of record-poll calls", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-latency-avg", "stream-metrics", "The average process time in ms", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-latency-max", "stream-metrics", "The maximum process time in ms", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-rate", "stream-metrics", "The average per-second number of process calls", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("punctuate-latency-avg", "stream-metrics", "The average punctuate time in ms", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("punctuate-latency-max", "stream-metrics", "The maximum punctuate time in ms", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("punctuate-rate", "stream-metrics", "The average per-second number of punctuate calls", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("task-created-rate", "stream-metrics", "The average per-second number of newly created tasks", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("task-closed-rate", "stream-metrics", "The average per-second number of closed tasks", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("skipped-records-rate", "stream-metrics", "The average per-second number of skipped records.", singletonMap)));
    }

    @Test
    public void shouldNotCommitBeforeTheCommitInterval() {
        Properties configProps = configProps(false);
        configProps.setProperty("state.dir", this.stateDir);
        configProps.setProperty("commit.interval.ms", Long.toString(1000L));
        StreamsConfig streamsConfig = new StreamsConfig(configProps);
        Consumer<byte[], byte[]> consumer = (Consumer) EasyMock.createNiceMock(Consumer.class);
        TaskManager mockTaskManagerCommit = mockTaskManagerCommit(consumer, 1, 1);
        StreamThread streamThread = new StreamThread(this.mockTime, streamsConfig, consumer, consumer, (String) null, mockTaskManagerCommit, new StreamThread.StreamsMetricsThreadImpl(this.metrics, "", "", Collections.emptyMap()), this.internalTopologyBuilder, "clientId", new LogContext(""));
        streamThread.maybeCommit(this.mockTime.milliseconds());
        this.mockTime.sleep(990L);
        streamThread.maybeCommit(this.mockTime.milliseconds());
        EasyMock.verify(new Object[]{mockTaskManagerCommit});
    }

    @Test
    public void shouldNotCauseExceptionIfNothingCommited() {
        Properties configProps = configProps(false);
        configProps.setProperty("state.dir", this.stateDir);
        configProps.setProperty("commit.interval.ms", Long.toString(1000L));
        StreamsConfig streamsConfig = new StreamsConfig(configProps);
        Consumer<byte[], byte[]> consumer = (Consumer) EasyMock.createNiceMock(Consumer.class);
        TaskManager mockTaskManagerCommit = mockTaskManagerCommit(consumer, 1, 0);
        StreamThread streamThread = new StreamThread(this.mockTime, streamsConfig, consumer, consumer, (String) null, mockTaskManagerCommit, new StreamThread.StreamsMetricsThreadImpl(this.metrics, "", "", Collections.emptyMap()), this.internalTopologyBuilder, "clientId", new LogContext(""));
        streamThread.maybeCommit(this.mockTime.milliseconds());
        this.mockTime.sleep(990L);
        streamThread.maybeCommit(this.mockTime.milliseconds());
        EasyMock.verify(new Object[]{mockTaskManagerCommit});
    }

    @Test
    public void shouldCommitAfterTheCommitInterval() {
        Properties configProps = configProps(false);
        configProps.setProperty("state.dir", this.stateDir);
        configProps.setProperty("commit.interval.ms", Long.toString(1000L));
        StreamsConfig streamsConfig = new StreamsConfig(configProps);
        Consumer<byte[], byte[]> consumer = (Consumer) EasyMock.createNiceMock(Consumer.class);
        TaskManager mockTaskManagerCommit = mockTaskManagerCommit(consumer, 2, 1);
        StreamThread streamThread = new StreamThread(this.mockTime, streamsConfig, consumer, consumer, (String) null, mockTaskManagerCommit, new StreamThread.StreamsMetricsThreadImpl(this.metrics, "", "", Collections.emptyMap()), this.internalTopologyBuilder, "clientId", new LogContext(""));
        streamThread.maybeCommit(this.mockTime.milliseconds());
        this.mockTime.sleep(1001L);
        streamThread.maybeCommit(this.mockTime.milliseconds());
        EasyMock.verify(new Object[]{mockTaskManagerCommit});
    }

    private TaskManager mockTaskManagerCommit(Consumer<byte[], byte[]> consumer, int i, int i2) {
        TaskManager taskManager = (TaskManager) EasyMock.createNiceMock(TaskManager.class);
        EasyMock.expect(Integer.valueOf(taskManager.commitAll())).andReturn(Integer.valueOf(i2)).times(i);
        EasyMock.replay(new Object[]{taskManager, consumer});
        return taskManager;
    }

    @Test
    public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled() {
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        StreamThread createStreamThread = createStreamThread("clientId", this.config, false);
        createStreamThread.setState(StreamThread.State.RUNNING);
        createStreamThread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.t1p1);
        arrayList.add(this.t1p2);
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        hashMap.put(this.task2, Collections.singleton(this.t1p2));
        createStreamThread.taskManager().setAssignmentMetadata(hashMap, Collections.emptyMap());
        MockConsumer mockConsumer = createStreamThread.consumer;
        mockConsumer.assign(arrayList);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.t1p1, 0L);
        hashMap2.put(this.t1p2, 0L);
        mockConsumer.updateBeginningOffsets(hashMap2);
        createStreamThread.rebalanceListener.onPartitionsAssigned(new HashSet(arrayList));
        Assert.assertEquals(1L, this.clientSupplier.producers.size());
        Producer producer = this.clientSupplier.producers.get(0);
        Iterator it = createStreamThread.tasks().values().iterator();
        while (it.hasNext()) {
            Assert.assertSame(producer, ((Task) it.next()).recordCollector().producer());
        }
        Assert.assertSame(this.clientSupplier.consumer, createStreamThread.consumer);
        Assert.assertSame(this.clientSupplier.restoreConsumer, createStreamThread.restoreConsumer);
    }

    @Test
    public void shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable() {
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        StreamThread createStreamThread = createStreamThread("clientId", new StreamsConfig(configProps(true)), true);
        createStreamThread.setState(StreamThread.State.RUNNING);
        createStreamThread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.t1p1);
        arrayList.add(this.t1p2);
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        hashMap.put(this.task2, Collections.singleton(this.t1p2));
        createStreamThread.taskManager().setAssignmentMetadata(hashMap, Collections.emptyMap());
        MockConsumer mockConsumer = createStreamThread.consumer;
        mockConsumer.assign(arrayList);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.t1p1, 0L);
        hashMap2.put(this.t1p2, 0L);
        mockConsumer.updateBeginningOffsets(hashMap2);
        createStreamThread.rebalanceListener.onPartitionsAssigned(new HashSet(arrayList));
        createStreamThread.runOnce(-1L);
        Assert.assertEquals(createStreamThread.tasks().size(), this.clientSupplier.producers.size());
        Assert.assertSame(this.clientSupplier.consumer, createStreamThread.consumer);
        Assert.assertSame(this.clientSupplier.restoreConsumer, createStreamThread.restoreConsumer);
    }

    @Test
    public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() {
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        StreamThread createStreamThread = createStreamThread("clientId", new StreamsConfig(configProps(true)), true);
        createStreamThread.setState(StreamThread.State.RUNNING);
        createStreamThread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.t1p1);
        arrayList.add(this.t1p2);
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        hashMap.put(this.task2, Collections.singleton(this.t1p2));
        createStreamThread.taskManager().setAssignmentMetadata(hashMap, Collections.emptyMap());
        MockConsumer mockConsumer = createStreamThread.consumer;
        mockConsumer.assign(arrayList);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.t1p1, 0L);
        hashMap2.put(this.t1p2, 0L);
        mockConsumer.updateBeginningOffsets(hashMap2);
        createStreamThread.rebalanceListener.onPartitionsAssigned(arrayList);
        createStreamThread.shutdown();
        createStreamThread.run();
        Iterator it = createStreamThread.tasks().values().iterator();
        while (it.hasNext()) {
            Assert.assertTrue(((Task) it.next()).recordCollector().producer().closed());
        }
    }

    @Test
    public void shouldShutdownTaskManagerOnClose() {
        Consumer consumer = (Consumer) EasyMock.createNiceMock(Consumer.class);
        TaskManager taskManager = (TaskManager) EasyMock.createNiceMock(TaskManager.class);
        EasyMock.expect(taskManager.activeTasks()).andReturn(Collections.emptyMap());
        EasyMock.expect(taskManager.standbyTasks()).andReturn(Collections.emptyMap());
        taskManager.shutdown(true);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{taskManager, consumer});
        final StreamThread streamThread = new StreamThread(this.mockTime, this.config, consumer, consumer, (String) null, taskManager, new StreamThread.StreamsMetricsThreadImpl(this.metrics, "", "", Collections.emptyMap()), this.internalTopologyBuilder, "clientId", new LogContext(""));
        streamThread.setStateListener(new StreamThread.StateListener() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.4
            public void onChange(Thread thread, ThreadStateTransitionValidator threadStateTransitionValidator, ThreadStateTransitionValidator threadStateTransitionValidator2) {
                if (threadStateTransitionValidator2 == StreamThread.State.CREATED && threadStateTransitionValidator == StreamThread.State.RUNNING) {
                    streamThread.shutdown();
                }
            }
        });
        streamThread.run();
        EasyMock.verify(new Object[]{taskManager});
    }

    @Test
    public void shouldShutdownTaskManagerOnCloseWithoutStart() {
        Consumer consumer = (Consumer) EasyMock.createNiceMock(Consumer.class);
        TaskManager taskManager = (TaskManager) EasyMock.createNiceMock(TaskManager.class);
        taskManager.shutdown(true);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{taskManager, consumer});
        new StreamThread(this.mockTime, this.config, consumer, consumer, (String) null, taskManager, new StreamThread.StreamsMetricsThreadImpl(this.metrics, "", "", Collections.emptyMap()), this.internalTopologyBuilder, "clientId", new LogContext("")).shutdown();
        EasyMock.verify(new Object[]{taskManager});
    }

    @Test
    public void shouldOnlyShutdownOnce() {
        Consumer consumer = (Consumer) EasyMock.createNiceMock(Consumer.class);
        TaskManager taskManager = (TaskManager) EasyMock.createNiceMock(TaskManager.class);
        taskManager.shutdown(true);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{taskManager, consumer});
        StreamThread streamThread = new StreamThread(this.mockTime, this.config, consumer, consumer, (String) null, taskManager, new StreamThread.StreamsMetricsThreadImpl(this.metrics, "", "", Collections.emptyMap()), this.internalTopologyBuilder, "clientId", new LogContext(""));
        streamThread.shutdown();
        streamThread.run();
        EasyMock.verify(new Object[]{taskManager});
    }

    @Test
    public void shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology() {
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "name", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic"});
        this.internalTopologyBuilder.addSink("out", "output", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[0]);
        StreamThread createStreamThread = createStreamThread("clientId", this.config, false);
        createStreamThread.setState(StreamThread.State.RUNNING);
        createStreamThread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        HashMap hashMap = new HashMap();
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        createStreamThread.taskManager().setAssignmentMetadata(Collections.emptyMap(), hashMap);
        createStreamThread.taskManager().createTasks(Collections.emptyList());
        createStreamThread.rebalanceListener.onPartitionsAssigned(Collections.emptyList());
    }

    @Test
    public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWhileProcessing() throws Exception {
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.internalTopologyBuilder.addSink("sink", "dummyTopic", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[]{"source"});
        final StreamThread createStreamThread = createStreamThread("clientId", new StreamsConfig(configProps(true)), true);
        MockConsumer<byte[], byte[]> mockConsumer = this.clientSupplier.consumer;
        mockConsumer.updatePartitions("topic1", Collections.singletonList(new PartitionInfo("topic1", 1, (Node) null, (Node[]) null, (Node[]) null)));
        createStreamThread.setState(StreamThread.State.RUNNING);
        createStreamThread.rebalanceListener.onPartitionsRevoked((Collection) null);
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.t1p1);
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        createStreamThread.taskManager().setAssignmentMetadata(hashMap, Collections.emptyMap());
        MockConsumer mockConsumer2 = createStreamThread.consumer;
        mockConsumer2.assign(arrayList);
        mockConsumer2.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        createStreamThread.rebalanceListener.onPartitionsAssigned(arrayList);
        createStreamThread.runOnce(-1L);
        Assert.assertThat(Integer.valueOf(createStreamThread.tasks().size()), CoreMatchers.equalTo(1));
        final MockProducer mockProducer = this.clientSupplier.producers.get(0);
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(arrayList.iterator().next(), 0L));
        mockConsumer.unsubscribe();
        mockConsumer.assign(new HashSet(arrayList));
        mockConsumer.addRecord(new ConsumerRecord("topic1", 1, 0L, new byte[0], new byte[0]));
        this.mockTime.sleep(this.config.getLong("commit.interval.ms").longValue() + 1);
        createStreamThread.runOnce(-1L);
        Assert.assertThat(Integer.valueOf(mockProducer.history().size()), CoreMatchers.equalTo(1));
        Assert.assertFalse(mockProducer.transactionCommitted());
        this.mockTime.sleep(this.config.getLong("commit.interval.ms").longValue() + 1);
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.5
            public boolean conditionMet() {
                return mockProducer.commitCount() == 1;
            }
        }, "StreamsThread did not commit transaction.");
        mockProducer.fenceProducer();
        this.mockTime.sleep(this.config.getLong("commit.interval.ms").longValue() + 1);
        mockConsumer.addRecord(new ConsumerRecord("topic1", 1, 0L, new byte[0], new byte[0]));
        try {
            createStreamThread.runOnce(-1L);
            Assert.fail("Should have thrown TaskMigratedException");
        } catch (TaskMigratedException e) {
        }
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.6
            public boolean conditionMet() {
                return createStreamThread.tasks().isEmpty();
            }
        }, "StreamsThread did not remove fenced zombie task.");
        Assert.assertThat(Long.valueOf(mockProducer.commitCount()), CoreMatchers.equalTo(1L));
    }

    @Test
    public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedAtBeginTransactionWhenTaskIsResumed() {
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "name", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        this.internalTopologyBuilder.addSink("out", "output", (Serializer) null, (Serializer) null, (StreamPartitioner) null, new String[0]);
        StreamThread createStreamThread = createStreamThread("clientId", new StreamsConfig(configProps(true)), true);
        createStreamThread.setState(StreamThread.State.RUNNING);
        createStreamThread.rebalanceListener.onPartitionsRevoked((Collection) null);
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.t1p1);
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        createStreamThread.taskManager().setAssignmentMetadata(hashMap, Collections.emptyMap());
        MockConsumer mockConsumer = createStreamThread.consumer;
        mockConsumer.assign(arrayList);
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        createStreamThread.rebalanceListener.onPartitionsAssigned(arrayList);
        createStreamThread.runOnce(-1L);
        Assert.assertThat(Integer.valueOf(createStreamThread.tasks().size()), CoreMatchers.equalTo(1));
        createStreamThread.rebalanceListener.onPartitionsRevoked((Collection) null);
        this.clientSupplier.producers.get(0).fenceProducer();
        createStreamThread.rebalanceListener.onPartitionsAssigned(arrayList);
        try {
            createStreamThread.runOnce(-1L);
            Assert.fail("Should have thrown TaskMigratedException");
        } catch (TaskMigratedException e) {
        }
        Assert.assertTrue(createStreamThread.tasks().isEmpty());
    }

    private StreamThread getStreamThread() {
        return createStreamThread("clientId", this.config, false);
    }

    @Test
    public void shouldReturnActiveTaskMetadataWhileRunningState() {
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        StreamThread createStreamThread = createStreamThread("clientId", this.config, false);
        createStreamThread.setState(StreamThread.State.RUNNING);
        createStreamThread.rebalanceListener.onPartitionsRevoked((Collection) null);
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.t1p1);
        hashMap.put(this.task1, Collections.singleton(this.t1p1));
        createStreamThread.taskManager().setAssignmentMetadata(hashMap, Collections.emptyMap());
        MockConsumer mockConsumer = createStreamThread.consumer;
        mockConsumer.assign(arrayList);
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        createStreamThread.rebalanceListener.onPartitionsAssigned(arrayList);
        createStreamThread.runOnce(-1L);
        ThreadMetadata threadMetadata = createStreamThread.threadMetadata();
        Assert.assertEquals(StreamThread.State.RUNNING.name(), threadMetadata.threadState());
        Assert.assertTrue(threadMetadata.activeTasks().contains(new TaskMetadata(this.task1.toString(), Utils.mkSet(new TopicPartition[]{this.t1p1}))));
        Assert.assertTrue(threadMetadata.standbyTasks().isEmpty());
    }

    @Test
    public void shouldReturnStandbyTaskMetadataWhileRunningState() {
        this.internalStreamsBuilder.stream(Collections.singleton("topic1"), this.consumed).groupByKey().count(Materialized.as("count-one"));
        StreamThread createStreamThread = createStreamThread("clientId", this.config, false);
        MockConsumer<byte[], byte[]> mockConsumer = this.clientSupplier.restoreConsumer;
        mockConsumer.updatePartitions("stream-thread-test-count-one-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-count-one-changelog", 0, (Node) null, new Node[0], new Node[0])));
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition("stream-thread-test-count-one-changelog", 1), 0L);
        mockConsumer.updateEndOffsets(hashMap);
        mockConsumer.updateBeginningOffsets(hashMap);
        createStreamThread.setState(StreamThread.State.RUNNING);
        createStreamThread.rebalanceListener.onPartitionsRevoked((Collection) null);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.task1, Collections.singleton(this.t1p1));
        createStreamThread.taskManager().setAssignmentMetadata(Collections.emptyMap(), hashMap2);
        createStreamThread.rebalanceListener.onPartitionsAssigned(Collections.emptyList());
        createStreamThread.runOnce(-1L);
        ThreadMetadata threadMetadata = createStreamThread.threadMetadata();
        Assert.assertEquals(StreamThread.State.RUNNING.name(), threadMetadata.threadState());
        Assert.assertTrue(threadMetadata.standbyTasks().contains(new TaskMetadata(this.task1.toString(), Utils.mkSet(new TopicPartition[]{this.t1p1}))));
        Assert.assertTrue(threadMetadata.activeTasks().isEmpty());
    }

    @Test
    public void shouldAlwaysUpdateTasksMetadataAfterChangingState() {
        StreamThread createStreamThread = createStreamThread("clientId", this.config, false);
        Assert.assertEquals(StreamThread.State.CREATED.name(), createStreamThread.threadMetadata().threadState());
        createStreamThread.setState(StreamThread.State.RUNNING);
        Assert.assertEquals(StreamThread.State.RUNNING.name(), createStreamThread.threadMetadata().threadState());
    }

    @Test
    public void shouldAlwaysReturnEmptyTasksMetadataWhileRebalancingStateAndTasksNotRunning() {
        this.internalStreamsBuilder.stream(Collections.singleton("topic1"), this.consumed).groupByKey().count(Materialized.as("count-one"));
        StreamThread createStreamThread = createStreamThread("clientId", this.config, false);
        MockConsumer<byte[], byte[]> mockConsumer = this.clientSupplier.restoreConsumer;
        mockConsumer.updatePartitions("stream-thread-test-count-one-changelog", Utils.mkList(new PartitionInfo[]{new PartitionInfo("stream-thread-test-count-one-changelog", 0, (Node) null, new Node[0], new Node[0]), new PartitionInfo("stream-thread-test-count-one-changelog", 1, (Node) null, new Node[0], new Node[0])}));
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition("stream-thread-test-count-one-changelog", 0), 0L);
        hashMap.put(new TopicPartition("stream-thread-test-count-one-changelog", 1), 0L);
        mockConsumer.updateEndOffsets(hashMap);
        mockConsumer.updateBeginningOffsets(hashMap);
        this.clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        createStreamThread.setState(StreamThread.State.RUNNING);
        ArrayList arrayList = new ArrayList();
        createStreamThread.rebalanceListener.onPartitionsRevoked(arrayList);
        assertThreadMetadataHasEmptyTasksWithState(createStreamThread.threadMetadata(), StreamThread.State.PARTITIONS_REVOKED);
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        arrayList.add(this.t1p1);
        hashMap2.put(this.task1, Collections.singleton(this.t1p1));
        hashMap3.put(this.task2, Collections.singleton(this.t1p2));
        createStreamThread.taskManager().setAssignmentMetadata(hashMap2, hashMap3);
        createStreamThread.rebalanceListener.onPartitionsAssigned(arrayList);
        assertThreadMetadataHasEmptyTasksWithState(createStreamThread.threadMetadata(), StreamThread.State.PARTITIONS_ASSIGNED);
    }

    @Test
    public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore() throws Exception {
        this.internalStreamsBuilder.stream(Collections.singleton("topic"), this.consumed).groupByKey().count(Materialized.as("count"));
        final StreamThread createStreamThread = createStreamThread("cliendId", this.config, false);
        MockConsumer mockConsumer = createStreamThread.consumer;
        final MockConsumer mockConsumer2 = createStreamThread.restoreConsumer;
        TopicPartition topicPartition = new TopicPartition("topic", 0);
        final Set singleton = Collections.singleton(topicPartition);
        HashMap hashMap = new HashMap();
        hashMap.put(new TaskId(0, 0), singleton);
        createStreamThread.taskManager().setAssignmentMetadata(hashMap, Collections.emptyMap());
        mockConsumer.updatePartitions("topic", new ArrayList<PartitionInfo>() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.7
            {
                add(new PartitionInfo("topic", 0, (Node) null, new Node[0], new Node[0]));
            }
        });
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L));
        mockConsumer2.updatePartitions("stream-thread-test-count-changelog", new ArrayList<PartitionInfo>() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.8
            {
                add(new PartitionInfo("stream-thread-test-count-changelog", 0, (Node) null, new Node[0], new Node[0]));
            }
        });
        final TopicPartition topicPartition2 = new TopicPartition("stream-thread-test-count-changelog", 0);
        final Set singleton2 = Collections.singleton(topicPartition2);
        mockConsumer2.updateBeginningOffsets(Collections.singletonMap(topicPartition2, 0L));
        mockConsumer2.updateEndOffsets(Collections.singletonMap(topicPartition2, 2L));
        mockConsumer.schedulePollTask(new Runnable() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.9
            @Override // java.lang.Runnable
            public void run() {
                createStreamThread.setState(StreamThread.State.PARTITIONS_REVOKED);
                createStreamThread.rebalanceListener.onPartitionsAssigned(singleton);
            }
        });
        try {
            createStreamThread.start();
            TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.10
                public boolean conditionMet() {
                    return mockConsumer2.assignment().size() == 1;
                }
            }, "Never restore first record");
            mockConsumer2.addRecord(new ConsumerRecord("stream-thread-test-count-changelog", 0, 0L, "K1".getBytes(), "V1".getBytes()));
            TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.11
                public boolean conditionMet() {
                    return mockConsumer2.position(topicPartition2) == 1;
                }
            }, "Never restore first record");
            mockConsumer2.setException(new InvalidOffsetException("Try Again!") { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.12
                public Set<TopicPartition> partitions() {
                    return singleton2;
                }
            });
            mockConsumer2.addRecord(new ConsumerRecord("stream-thread-test-count-changelog", 0, 0L, "K1".getBytes(), "V1".getBytes()));
            mockConsumer2.addRecord(new ConsumerRecord("stream-thread-test-count-changelog", 0, 1L, "K2".getBytes(), "V2".getBytes()));
            TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.13
                public boolean conditionMet() {
                    mockConsumer2.assign(singleton2);
                    return mockConsumer2.position(topicPartition2) == 2;
                }
            }, "Never finished restore");
            createStreamThread.shutdown();
            createStreamThread.join(10000L);
        } catch (Throwable th) {
            createStreamThread.shutdown();
            createStreamThread.join(10000L);
            throw th;
        }
    }

    /* JADX WARN: Type inference failed for: r3v8, types: [long, java.lang.String] */
    @Test
    public void shouldReportSkippedRecordsForInvalidTimestamps() {
        this.internalTopologyBuilder.addSource((Topology.AutoOffsetReset) null, "source1", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{"topic1"});
        Properties configProps = configProps(false);
        configProps.setProperty("default.timestamp.extractor", LogAndSkipOnInvalidTimestamp.class.getName());
        StreamThread createStreamThread = createStreamThread("clientId", new StreamsConfig(configProps), false);
        createStreamThread.setState(StreamThread.State.RUNNING);
        createStreamThread.setState(StreamThread.State.PARTITIONS_REVOKED);
        Set singleton = Collections.singleton(this.t1p1);
        createStreamThread.taskManager().setAssignmentMetadata(Collections.singletonMap(new TaskId(0, this.t1p1.partition()), singleton), Collections.emptyMap());
        MockConsumer mockConsumer = createStreamThread.consumer;
        mockConsumer.assign(Collections.singleton(this.t1p1));
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.t1p1, 0L));
        createStreamThread.rebalanceListener.onPartitionsAssigned(singleton);
        createStreamThread.runOnce(-1L);
        MetricName metricName = this.metrics.metricName("skipped-records-total", "stream-metrics", Collections.singletonMap("client-id", createStreamThread.getName()));
        Assert.assertEquals(Double.valueOf(0.0d), this.metrics.metric(metricName).metricValue());
        ?? r3 = this.t1p1.topic();
        mockConsumer.addRecord(new ConsumerRecord((String) r3, this.t1p1.partition(), (-1) + 1, -1L, TimestampType.CREATE_TIME, -1L, -1, -1, new byte[0], new byte[0]));
        long j = r3 + 1;
        mockConsumer.addRecord(new ConsumerRecord(this.t1p1.topic(), this.t1p1.partition(), j, -1L, TimestampType.CREATE_TIME, -1L, -1, -1, new byte[0], new byte[0]));
        createStreamThread.runOnce(-1L);
        Assert.assertEquals(Double.valueOf(2.0d), this.metrics.metric(metricName).metricValue());
        long j2 = j + 1;
        mockConsumer.addRecord(new ConsumerRecord(this.t1p1.topic(), this.t1p1.partition(), j2, -1L, TimestampType.CREATE_TIME, -1L, -1, -1, new byte[0], new byte[0]));
        long j3 = j2 + 1;
        mockConsumer.addRecord(new ConsumerRecord(this.t1p1.topic(), this.t1p1.partition(), j3, -1L, TimestampType.CREATE_TIME, -1L, -1, -1, new byte[0], new byte[0]));
        long j4 = j3 + 1;
        mockConsumer.addRecord(new ConsumerRecord(this.t1p1.topic(), this.t1p1.partition(), j4, -1L, TimestampType.CREATE_TIME, -1L, -1, -1, new byte[0], new byte[0]));
        long j5 = j4 + 1;
        mockConsumer.addRecord(new ConsumerRecord(this.t1p1.topic(), this.t1p1.partition(), j5, -1L, TimestampType.CREATE_TIME, -1L, -1, -1, new byte[0], new byte[0]));
        createStreamThread.runOnce(-1L);
        Assert.assertEquals(Double.valueOf(6.0d), this.metrics.metric(metricName).metricValue());
        long j6 = j5 + 1;
        mockConsumer.addRecord(new ConsumerRecord(this.t1p1.topic(), this.t1p1.partition(), j6, 1L, TimestampType.CREATE_TIME, -1L, -1, -1, new byte[0], new byte[0]));
        mockConsumer.addRecord(new ConsumerRecord(this.t1p1.topic(), this.t1p1.partition(), j6 + 1, 1L, TimestampType.CREATE_TIME, -1L, -1, -1, new byte[0], new byte[0]));
        createStreamThread.runOnce(-1L);
        Assert.assertEquals(Double.valueOf(6.0d), this.metrics.metric(metricName).metricValue());
    }

    private void assertThreadMetadataHasEmptyTasksWithState(ThreadMetadata threadMetadata, StreamThread.State state) {
        Assert.assertEquals(state.name(), threadMetadata.threadState());
        Assert.assertTrue(threadMetadata.activeTasks().isEmpty());
        Assert.assertTrue(threadMetadata.standbyTasks().isEmpty());
    }
}
