package org.apache.kafka.streams;

import java.io.File;
import java.lang.Thread;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/KafkaStreamsTest.class */
public class KafkaStreamsTest {
    private static final int NUM_THREADS = 2;
    private final StreamsBuilder builder = new StreamsBuilder();
    private KafkaStreams streams;
    private Properties props;
    private static final int NUM_BROKERS = 1;

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);

    /* loaded from: input_file:org/apache/kafka/streams/KafkaStreamsTest$StateListenerStub.class */
    public static class StateListenerStub implements KafkaStreams.StateListener {
        KafkaStreams.State oldState;
        KafkaStreams.State newState;
        int numChanges = 0;
        public Map<KafkaStreams.State, Long> mapStates = new HashMap();

        public void onChange(KafkaStreams.State state, KafkaStreams.State state2) {
            long longValue = this.mapStates.containsKey(state) ? this.mapStates.get(state).longValue() : 0L;
            this.numChanges += KafkaStreamsTest.NUM_BROKERS;
            this.oldState = state2;
            this.newState = state;
            this.mapStates.put(state, Long.valueOf(longValue + 1));
        }
    }

    @Before
    public void before() {
        this.props = new Properties();
        this.props.setProperty("application.id", "appId");
        this.props.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        this.props.setProperty("metric.reporters", MockMetricsReporter.class.getName());
        this.props.setProperty("state.dir", TestUtils.tempDirectory().getPath());
        this.props.put("num.stream.threads", Integer.valueOf(NUM_THREADS));
        this.streams = new KafkaStreams(this.builder.build(), this.props);
    }

    @Test
    public void testStateChanges() throws InterruptedException {
        final KafkaStreams kafkaStreams = new KafkaStreams(new StreamsBuilder().build(), this.props);
        kafkaStreams.setStateListener(new StateListenerStub());
        Assert.assertEquals(kafkaStreams.state(), KafkaStreams.State.CREATED);
        Assert.assertEquals(r0.numChanges, 0L);
        kafkaStreams.start();
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.KafkaStreamsTest.1
            public boolean conditionMet() {
                return kafkaStreams.state() == KafkaStreams.State.RUNNING;
            }
        }, 10000L, "Streams never started.");
        kafkaStreams.close();
        Assert.assertEquals(kafkaStreams.state(), KafkaStreams.State.NOT_RUNNING);
    }

    @Test
    public void testStateCloseAfterCreate() {
        KafkaStreams kafkaStreams = new KafkaStreams(new StreamsBuilder().build(), this.props);
        kafkaStreams.setStateListener(new StateListenerStub());
        kafkaStreams.close();
        Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, kafkaStreams.state());
    }

    @Test
    public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.globalTable("anyTopic");
        List asList = Arrays.asList(new Node(0, "localhost", 8121));
        Cluster cluster = new Cluster("mockClusterId", asList, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), (Node) asList.get(0));
        MockClientSupplier mockClientSupplier = new MockClientSupplier();
        mockClientSupplier.setClusterForAdminClient(cluster);
        final KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), new StreamsConfig(this.props), mockClientSupplier);
        kafkaStreams.close();
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.KafkaStreamsTest.2
            public boolean conditionMet() {
                return kafkaStreams.state() == KafkaStreams.State.NOT_RUNNING;
            }
        }, 10000L, "Streams never stopped.");
        Assert.assertTrue(mockClientSupplier.consumer.closed());
        Assert.assertTrue(mockClientSupplier.restoreConsumer.closed());
        Iterator<MockProducer> it = mockClientSupplier.producers.iterator();
        while (it.hasNext()) {
            Assert.assertTrue(it.next().closed());
        }
    }

    @Test
    public void testStateThreadClose() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.globalTable("anyTopic");
        final KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), this.props);
        Field declaredField = kafkaStreams.getClass().getDeclaredField("threads");
        declaredField.setAccessible(true);
        StreamThread[] streamThreadArr = (StreamThread[]) declaredField.get(kafkaStreams);
        Assert.assertEquals(2L, streamThreadArr.length);
        Assert.assertEquals(kafkaStreams.state(), KafkaStreams.State.CREATED);
        kafkaStreams.start();
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.KafkaStreamsTest.3
            public boolean conditionMet() {
                return kafkaStreams.state() == KafkaStreams.State.RUNNING;
            }
        }, 10000L, "Streams never started.");
        for (int i = 0; i < NUM_THREADS; i += NUM_BROKERS) {
            final StreamThread streamThread = streamThreadArr[i];
            streamThread.shutdown();
            TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.KafkaStreamsTest.4
                public boolean conditionMet() {
                    return streamThread.state() == StreamThread.State.DEAD;
                }
            }, 10000L, "Thread never stopped.");
            streamThreadArr[i].join();
        }
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.KafkaStreamsTest.5
            public boolean conditionMet() {
                return kafkaStreams.state() == KafkaStreams.State.ERROR;
            }
        }, 10000L, "Streams never stopped.");
        kafkaStreams.close();
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.KafkaStreamsTest.6
            public boolean conditionMet() {
                return kafkaStreams.state() == KafkaStreams.State.NOT_RUNNING;
            }
        }, 10000L, "Streams never stopped.");
        Field declaredField2 = kafkaStreams.getClass().getDeclaredField("globalStreamThread");
        declaredField2.setAccessible(true);
        Assert.assertEquals((GlobalStreamThread) declaredField2.get(kafkaStreams), (Object) null);
    }

    @Test
    public void testStateGlobalThreadClose() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.globalTable("anyTopic");
        final KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), this.props);
        kafkaStreams.start();
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.KafkaStreamsTest.7
            public boolean conditionMet() {
                return kafkaStreams.state() == KafkaStreams.State.RUNNING;
            }
        }, 10000L, "Streams never started.");
        Field declaredField = kafkaStreams.getClass().getDeclaredField("globalStreamThread");
        declaredField.setAccessible(true);
        final GlobalStreamThread globalStreamThread = (GlobalStreamThread) declaredField.get(kafkaStreams);
        globalStreamThread.shutdown();
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.KafkaStreamsTest.8
            public boolean conditionMet() {
                return globalStreamThread.state() == GlobalStreamThread.State.DEAD;
            }
        }, 10000L, "Thread never stopped.");
        globalStreamThread.join();
        Assert.assertEquals(kafkaStreams.state(), KafkaStreams.State.ERROR);
        kafkaStreams.close();
        Assert.assertEquals(kafkaStreams.state(), KafkaStreams.State.NOT_RUNNING);
    }

    @Test
    public void testInitializesAndDestroysMetricsReporters() {
        int i = MockMetricsReporter.INIT_COUNT.get();
        KafkaStreams kafkaStreams = new KafkaStreams(new StreamsBuilder().build(), this.props);
        Assert.assertTrue("some reporters should be initialized by calling on construction", MockMetricsReporter.INIT_COUNT.get() - i > 0);
        kafkaStreams.start();
        int i2 = MockMetricsReporter.CLOSE_COUNT.get();
        kafkaStreams.close();
        Assert.assertEquals(i2 + r0, MockMetricsReporter.CLOSE_COUNT.get());
    }

    @Test
    public void testCloseIsIdempotent() {
        this.streams.close();
        int i = MockMetricsReporter.CLOSE_COUNT.get();
        this.streams.close();
        Assert.assertEquals("subsequent close() calls should do nothing", i, MockMetricsReporter.CLOSE_COUNT.get());
    }

    @Test
    public void testCannotStartOnceClosed() {
        this.streams.start();
        this.streams.close();
        try {
            this.streams.start();
            Assert.fail("Should have throw IllegalStateException");
            this.streams.close();
        } catch (IllegalStateException e) {
            this.streams.close();
        } catch (Throwable th) {
            this.streams.close();
            throw th;
        }
    }

    @Test
    public void testCannotStartTwice() {
        this.streams.start();
        try {
            this.streams.start();
            this.streams.close();
        } catch (IllegalStateException e) {
            this.streams.close();
        } catch (Throwable th) {
            this.streams.close();
            throw th;
        }
    }

    @Test
    public void shouldNotSetGlobalRestoreListenerAfterStarting() {
        this.streams.start();
        try {
            this.streams.setGlobalStateRestoreListener(new MockStateRestoreListener());
            Assert.fail("Should throw an IllegalStateException");
            this.streams.close();
        } catch (IllegalStateException e) {
            this.streams.close();
        } catch (Throwable th) {
            this.streams.close();
            throw th;
        }
    }

    @Test
    public void shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState() {
        this.streams.start();
        try {
            this.streams.setUncaughtExceptionHandler((Thread.UncaughtExceptionHandler) null);
            Assert.fail("Should throw IllegalStateException");
            this.streams.close();
        } catch (IllegalStateException e) {
            this.streams.close();
        } catch (Throwable th) {
            this.streams.close();
            throw th;
        }
    }

    @Test
    public void shouldThrowExceptionSettingStateListenerNotInCreateState() {
        this.streams.start();
        try {
            this.streams.setStateListener((KafkaStreams.StateListener) null);
            Assert.fail("Should throw IllegalStateException");
            this.streams.close();
        } catch (IllegalStateException e) {
            this.streams.close();
        } catch (Throwable th) {
            this.streams.close();
            throw th;
        }
    }

    @Test
    public void testNumberDefaultMetrics() {
        this.props.put("num.stream.threads", "1");
        Assert.assertEquals(23L, new KafkaStreams(new StreamsBuilder().build(), this.props).metrics().size());
    }

    @Test
    public void testIllegalMetricsConfig() {
        this.props.setProperty("metrics.recording.level", "illegalConfig");
        try {
            new KafkaStreams(new StreamsBuilder().build(), this.props);
            Assert.fail("Should have throw ConfigException");
        } catch (ConfigException e) {
        }
    }

    @Test
    public void testLegalMetricsConfig() {
        this.props.setProperty("metrics.recording.level", Sensor.RecordingLevel.INFO.toString());
        new KafkaStreams(new StreamsBuilder().build(), this.props).close();
        this.props.setProperty("metrics.recording.level", Sensor.RecordingLevel.DEBUG.toString());
        new KafkaStreams(new StreamsBuilder().build(), this.props);
    }

    @Test(expected = IllegalStateException.class)
    public void shouldNotGetAllTasksWhenNotRunning() {
        this.streams.allMetadata();
    }

    @Test(expected = IllegalStateException.class)
    public void shouldNotGetAllTasksWithStoreWhenNotRunning() {
        this.streams.allMetadataForStore("store");
    }

    @Test(expected = IllegalStateException.class)
    public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() {
        this.streams.metadataForKey("store", "key", Serdes.String().serializer());
    }

    @Test(expected = IllegalStateException.class)
    public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() {
        this.streams.metadataForKey("store", "key", new StreamPartitioner<String, Object>() { // from class: org.apache.kafka.streams.KafkaStreamsTest.9
            public Integer partition(String str, Object obj, int i) {
                return 0;
            }
        });
    }

    @Test
    public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() throws Exception {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        try {
            StreamsBuilder streamsBuilder = new StreamsBuilder();
            final CountDownLatch countDownLatch = new CountDownLatch(NUM_BROKERS);
            CLUSTER.createTopic("input");
            streamsBuilder.stream("input", Consumed.with(Serdes.String(), Serdes.String())).foreach(new ForeachAction<String, String>() { // from class: org.apache.kafka.streams.KafkaStreamsTest.10
                public void apply(String str, String str2) {
                    try {
                        countDownLatch.countDown();
                        while (atomicBoolean.get()) {
                            Thread.sleep(10L);
                        }
                    } catch (InterruptedException e) {
                    }
                }
            });
            KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), this.props);
            kafkaStreams.start();
            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp("input", Collections.singletonList(new KeyValue("A", "A")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties()), Long.valueOf(System.currentTimeMillis()));
            Assert.assertTrue("Timed out waiting to receive single message", countDownLatch.await(30L, TimeUnit.SECONDS));
            Assert.assertFalse(kafkaStreams.close(10L, TimeUnit.MILLISECONDS));
            atomicBoolean.set(false);
        } catch (Throwable th) {
            atomicBoolean.set(false);
            throw th;
        }
    }

    @Test
    public void shouldReturnThreadMetadata() {
        this.streams.start();
        Set<ThreadMetadata> localThreadsMetadata = this.streams.localThreadsMetadata();
        Assert.assertNotNull(localThreadsMetadata);
        Assert.assertEquals(2L, localThreadsMetadata.size());
        for (ThreadMetadata threadMetadata : localThreadsMetadata) {
            Assert.assertTrue("#threadState() was: " + threadMetadata.threadState() + "; expected either RUNNING, PARTITIONS_REVOKED, PARTITIONS_ASSIGNED, or CREATED", Utils.mkList(new String[]{"RUNNING", "PARTITIONS_REVOKED", "PARTITIONS_ASSIGNED", "CREATED"}).contains(threadMetadata.threadState()));
            Assert.assertEquals(0L, threadMetadata.standbyTasks().size());
            Assert.assertEquals(0L, threadMetadata.activeTasks().size());
        }
        this.streams.close();
    }

    @Test
    public void testCleanup() {
        KafkaStreams kafkaStreams = new KafkaStreams(new StreamsBuilder().build(), this.props);
        kafkaStreams.cleanUp();
        kafkaStreams.start();
        kafkaStreams.close();
        kafkaStreams.cleanUp();
    }

    @Test
    public void testCannotCleanupWhileRunning() throws InterruptedException {
        final KafkaStreams kafkaStreams = new KafkaStreams(new StreamsBuilder().build(), this.props);
        kafkaStreams.start();
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.KafkaStreamsTest.11
            public boolean conditionMet() {
                return kafkaStreams.state() == KafkaStreams.State.RUNNING;
            }
        }, 10000L, "Streams never started.");
        try {
            try {
                kafkaStreams.cleanUp();
                Assert.fail("Should have thrown IllegalStateException");
                kafkaStreams.close();
            } catch (IllegalStateException e) {
                Assert.assertEquals("Cannot clean up while running.", e.getMessage());
                kafkaStreams.close();
            }
        } catch (Throwable th) {
            kafkaStreams.close();
            throw th;
        }
    }

    @Test
    public void testToString() {
        this.streams.start();
        String kafkaStreams = this.streams.toString();
        this.streams.close();
        String trim = kafkaStreams.split("\\n")[NUM_BROKERS].split(":")[NUM_BROKERS].trim();
        Assert.assertNotEquals("streamString should not be empty", "", kafkaStreams);
        Assert.assertNotNull("streamString should not be null", kafkaStreams);
        Assert.assertNotEquals("streamString contains non-empty appId", "", trim);
        Assert.assertNotNull("streamString contains non-null appId", trim);
    }

    @Test
    public void shouldCleanupOldStateDirs() throws InterruptedException {
        this.props.setProperty("state.cleanup.delay.ms", "1");
        CLUSTER.createTopic("topic");
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table("topic", Consumed.with(Serdes.String(), Serdes.String()));
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), this.props);
        final CountDownLatch countDownLatch = new CountDownLatch(NUM_BROKERS);
        kafkaStreams.setStateListener(new KafkaStreams.StateListener() { // from class: org.apache.kafka.streams.KafkaStreamsTest.12
            public void onChange(KafkaStreams.State state, KafkaStreams.State state2) {
                if (state == KafkaStreams.State.RUNNING && state2 == KafkaStreams.State.REBALANCING) {
                    countDownLatch.countDown();
                }
            }
        });
        String str = this.props.getProperty("state.dir") + File.separator + this.props.getProperty("application.id");
        File file = new File(str, "10_1");
        Assert.assertTrue(file.mkdirs());
        try {
            kafkaStreams.start();
            countDownLatch.await(30L, TimeUnit.SECONDS);
            verifyCleanupStateDir(str, file);
            Assert.assertTrue(file.mkdirs());
            verifyCleanupStateDir(str, file);
            kafkaStreams.close();
        } catch (Throwable th) {
            kafkaStreams.close();
            throw th;
        }
    }

    private void verifyCleanupStateDir(String str, final File file) throws InterruptedException {
        final File file2 = new File(str, "0_0");
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.KafkaStreamsTest.13
            public boolean conditionMet() {
                return !file.exists() && file2.exists();
            }
        }, IntegrationTestUtils.DEFAULT_TIMEOUT, "cleanup has not successfully run");
        Assert.assertTrue(file2.exists());
    }
}
