package org.apache.kafka.streams;

import java.io.File;
import java.io.IOException;
import java.lang.Thread;
import java.lang.reflect.Field;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/KafkaStreamsTest.class */
public class KafkaStreamsTest {
    private static final int NUM_THREADS = 2;
    private KafkaStreams globalStreams;
    private Properties props;
    private static final int NUM_BROKERS = 1;

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
    private final StreamsBuilder builder = new StreamsBuilder();

    @Rule
    public TestName testName = new TestName();

    /* loaded from: input_file:org/apache/kafka/streams/KafkaStreamsTest$StateListenerStub.class */
    public static class StateListenerStub implements KafkaStreams.StateListener {
        KafkaStreams.State oldState;
        KafkaStreams.State newState;
        int numChanges = 0;
        public Map<KafkaStreams.State, Long> mapStates = new HashMap();

        public void onChange(KafkaStreams.State state, KafkaStreams.State state2) {
            long longValue = this.mapStates.containsKey(state) ? this.mapStates.get(state).longValue() : 0L;
            this.numChanges += KafkaStreamsTest.NUM_BROKERS;
            this.oldState = state2;
            this.newState = state;
            this.mapStates.put(state, Long.valueOf(longValue + 1));
        }
    }

    @Before
    public void before() {
        this.props = new Properties();
        this.props.put("application.id", "appId");
        this.props.put("client.id", "clientId");
        this.props.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.props.put("metric.reporters", MockMetricsReporter.class.getName());
        this.props.put("state.dir", TestUtils.tempDirectory().getPath());
        this.props.put("num.stream.threads", Integer.valueOf(NUM_THREADS));
        this.globalStreams = new KafkaStreams(this.builder.build(), this.props);
    }

    @After
    public void cleanup() {
        if (this.globalStreams != null) {
            this.globalStreams.close();
        }
    }

    @Test
    public void testOsDefaultSocketBufferSizes() {
        this.props.put("send.buffer.bytes", -1);
        this.props.put("receive.buffer.bytes", -1);
        new KafkaStreams(this.builder.build(), this.props).close();
    }

    @Test(expected = KafkaException.class)
    public void testInvalidSocketSendBufferSize() {
        this.props.put("send.buffer.bytes", -2);
        new KafkaStreams(this.builder.build(), this.props).close();
    }

    @Test(expected = KafkaException.class)
    public void testInvalidSocketReceiveBufferSize() {
        this.props.put("receive.buffer.bytes", -2);
        new KafkaStreams(this.builder.build(), this.props).close();
    }

    @Test
    public void stateShouldTransitToNotRunningIfCloseRightAfterCreated() {
        this.globalStreams.close();
        Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, this.globalStreams.state());
    }

    @Test
    public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws InterruptedException {
        StateListenerStub stateListenerStub = new StateListenerStub();
        this.globalStreams.setStateListener(stateListenerStub);
        Assert.assertEquals(0L, stateListenerStub.numChanges);
        Assert.assertEquals(KafkaStreams.State.CREATED, this.globalStreams.state());
        this.globalStreams.start();
        TestUtils.waitForCondition(() -> {
            return stateListenerStub.numChanges == NUM_THREADS;
        }, "Streams never started.");
        Assert.assertEquals(KafkaStreams.State.RUNNING, this.globalStreams.state());
        StreamThread[] streamThreadArr = this.globalStreams.threads;
        int length = streamThreadArr.length;
        for (int i = 0; i < length; i += NUM_BROKERS) {
            StreamThread streamThread = streamThreadArr[i];
            streamThread.stateListener().onChange(streamThread, StreamThread.State.PARTITIONS_REVOKED, StreamThread.State.RUNNING);
        }
        Assert.assertEquals(3L, stateListenerStub.numChanges);
        Assert.assertEquals(KafkaStreams.State.REBALANCING, this.globalStreams.state());
        StreamThread[] streamThreadArr2 = this.globalStreams.threads;
        int length2 = streamThreadArr2.length;
        for (int i2 = 0; i2 < length2; i2 += NUM_BROKERS) {
            StreamThread streamThread2 = streamThreadArr2[i2];
            streamThread2.stateListener().onChange(streamThread2, StreamThread.State.PARTITIONS_ASSIGNED, StreamThread.State.PARTITIONS_REVOKED);
        }
        Assert.assertEquals(3L, stateListenerStub.numChanges);
        Assert.assertEquals(KafkaStreams.State.REBALANCING, this.globalStreams.state());
        this.globalStreams.threads[NUM_BROKERS].stateListener().onChange(this.globalStreams.threads[NUM_BROKERS], StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.PARTITIONS_ASSIGNED);
        this.globalStreams.threads[NUM_BROKERS].stateListener().onChange(this.globalStreams.threads[NUM_BROKERS], StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN);
        Assert.assertEquals(3L, stateListenerStub.numChanges);
        Assert.assertEquals(KafkaStreams.State.REBALANCING, this.globalStreams.state());
        StreamThread[] streamThreadArr3 = this.globalStreams.threads;
        int length3 = streamThreadArr3.length;
        for (int i3 = 0; i3 < length3; i3 += NUM_BROKERS) {
            StreamThread streamThread3 = streamThreadArr3[i3];
            if (streamThread3 != this.globalStreams.threads[NUM_BROKERS]) {
                streamThread3.stateListener().onChange(streamThread3, StreamThread.State.RUNNING, StreamThread.State.PARTITIONS_ASSIGNED);
            }
        }
        Assert.assertEquals(4L, stateListenerStub.numChanges);
        Assert.assertEquals(KafkaStreams.State.RUNNING, this.globalStreams.state());
        this.globalStreams.close();
        TestUtils.waitForCondition(() -> {
            return stateListenerStub.numChanges == 6;
        }, "Streams never closed.");
        Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, this.globalStreams.state());
    }

    @Test
    public void stateShouldTransitToErrorIfAllThreadsDead() throws InterruptedException {
        StateListenerStub stateListenerStub = new StateListenerStub();
        this.globalStreams.setStateListener(stateListenerStub);
        Assert.assertEquals(0L, stateListenerStub.numChanges);
        Assert.assertEquals(KafkaStreams.State.CREATED, this.globalStreams.state());
        this.globalStreams.start();
        TestUtils.waitForCondition(() -> {
            return stateListenerStub.numChanges == NUM_THREADS;
        }, "Streams never started.");
        Assert.assertEquals(KafkaStreams.State.RUNNING, this.globalStreams.state());
        StreamThread[] streamThreadArr = this.globalStreams.threads;
        int length = streamThreadArr.length;
        for (int i = 0; i < length; i += NUM_BROKERS) {
            StreamThread streamThread = streamThreadArr[i];
            streamThread.stateListener().onChange(streamThread, StreamThread.State.PARTITIONS_REVOKED, StreamThread.State.RUNNING);
        }
        Assert.assertEquals(3L, stateListenerStub.numChanges);
        Assert.assertEquals(KafkaStreams.State.REBALANCING, this.globalStreams.state());
        this.globalStreams.threads[NUM_BROKERS].stateListener().onChange(this.globalStreams.threads[NUM_BROKERS], StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.PARTITIONS_REVOKED);
        this.globalStreams.threads[NUM_BROKERS].stateListener().onChange(this.globalStreams.threads[NUM_BROKERS], StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN);
        Assert.assertEquals(3L, stateListenerStub.numChanges);
        Assert.assertEquals(KafkaStreams.State.REBALANCING, this.globalStreams.state());
        StreamThread[] streamThreadArr2 = this.globalStreams.threads;
        int length2 = streamThreadArr2.length;
        for (int i2 = 0; i2 < length2; i2 += NUM_BROKERS) {
            StreamThread streamThread2 = streamThreadArr2[i2];
            if (streamThread2 != this.globalStreams.threads[NUM_BROKERS]) {
                streamThread2.stateListener().onChange(streamThread2, StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.PARTITIONS_REVOKED);
                streamThread2.stateListener().onChange(streamThread2, StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN);
            }
        }
        Assert.assertEquals(4L, stateListenerStub.numChanges);
        Assert.assertEquals(KafkaStreams.State.ERROR, this.globalStreams.state());
        this.globalStreams.close();
        TestUtils.waitForCondition(() -> {
            return stateListenerStub.numChanges == 6;
        }, "Streams never closed.");
        Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, this.globalStreams.state());
    }

    @Test
    public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception {
        this.builder.globalTable("anyTopic");
        List singletonList = Collections.singletonList(new Node(0, "localhost", 8121));
        Cluster cluster = new Cluster("mockClusterId", singletonList, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), (Node) singletonList.get(0));
        MockClientSupplier mockClientSupplier = new MockClientSupplier();
        mockClientSupplier.setClusterForAdminClient(cluster);
        KafkaStreams kafkaStreams = new KafkaStreams(this.builder.build(), this.props, mockClientSupplier);
        kafkaStreams.close();
        TestUtils.waitForCondition(() -> {
            return kafkaStreams.state() == KafkaStreams.State.NOT_RUNNING;
        }, "Streams never stopped.");
        Assert.assertTrue(mockClientSupplier.consumer.closed());
        Assert.assertTrue(mockClientSupplier.restoreConsumer.closed());
        Iterator<MockProducer> it = mockClientSupplier.producers.iterator();
        while (it.hasNext()) {
            Assert.assertTrue(it.next().closed());
        }
    }

    @Test
    public void testStateThreadClose() throws Exception {
        this.builder.globalTable("anyTopic");
        KafkaStreams kafkaStreams = new KafkaStreams(this.builder.build(), this.props);
        try {
            Field declaredField = kafkaStreams.getClass().getDeclaredField("threads");
            declaredField.setAccessible(true);
            StreamThread[] streamThreadArr = (StreamThread[]) declaredField.get(kafkaStreams);
            Assert.assertEquals(2L, streamThreadArr.length);
            Assert.assertEquals(kafkaStreams.state(), KafkaStreams.State.CREATED);
            kafkaStreams.start();
            TestUtils.waitForCondition(() -> {
                return kafkaStreams.state() == KafkaStreams.State.RUNNING;
            }, "Streams never started.");
            for (int i = 0; i < NUM_THREADS; i += NUM_BROKERS) {
                StreamThread streamThread = streamThreadArr[i];
                streamThread.shutdown();
                TestUtils.waitForCondition(() -> {
                    return streamThread.state() == StreamThread.State.DEAD;
                }, "Thread never stopped.");
                streamThreadArr[i].join();
            }
            TestUtils.waitForCondition(() -> {
                return kafkaStreams.state() == KafkaStreams.State.ERROR;
            }, "Streams never stopped.");
            kafkaStreams.close();
            TestUtils.waitForCondition(() -> {
                return kafkaStreams.state() == KafkaStreams.State.NOT_RUNNING;
            }, "Streams never stopped.");
            Field declaredField2 = kafkaStreams.getClass().getDeclaredField("globalStreamThread");
            declaredField2.setAccessible(true);
            Assert.assertNull((GlobalStreamThread) declaredField2.get(kafkaStreams));
        } catch (Throwable th) {
            kafkaStreams.close();
            throw th;
        }
    }

    @Test
    public void testStateGlobalThreadClose() throws Exception {
        this.builder.globalTable("anyTopic");
        KafkaStreams kafkaStreams = new KafkaStreams(this.builder.build(), this.props);
        try {
            kafkaStreams.start();
            TestUtils.waitForCondition(() -> {
                return kafkaStreams.state() == KafkaStreams.State.RUNNING;
            }, "Streams never started.");
            Field declaredField = kafkaStreams.getClass().getDeclaredField("globalStreamThread");
            declaredField.setAccessible(true);
            GlobalStreamThread globalStreamThread = (GlobalStreamThread) declaredField.get(kafkaStreams);
            globalStreamThread.shutdown();
            TestUtils.waitForCondition(() -> {
                return globalStreamThread.state() == GlobalStreamThread.State.DEAD;
            }, "Thread never stopped.");
            globalStreamThread.join();
            Assert.assertEquals(kafkaStreams.state(), KafkaStreams.State.ERROR);
            kafkaStreams.close();
            Assert.assertEquals(kafkaStreams.state(), KafkaStreams.State.NOT_RUNNING);
        } catch (Throwable th) {
            kafkaStreams.close();
            throw th;
        }
    }

    @Test
    public void globalThreadShouldTimeoutWhenBrokerConnectionCannotBeEstablished() {
        Properties properties = new Properties();
        properties.put("application.id", "appId");
        properties.put("bootstrap.servers", "localhost:1");
        properties.put("metric.reporters", MockMetricsReporter.class.getName());
        properties.put("state.dir", TestUtils.tempDirectory().getPath());
        properties.put("num.stream.threads", Integer.valueOf(NUM_THREADS));
        properties.put("default.api.timeout.ms", 200);
        this.builder.globalTable("anyTopic");
        try {
            KafkaStreams kafkaStreams = new KafkaStreams(this.builder.build(), properties);
            Throwable th = null;
            try {
                try {
                    kafkaStreams.start();
                    Assert.fail("expected start() to time out and throw an exception.");
                    if (kafkaStreams != null) {
                        if (0 != 0) {
                            try {
                                kafkaStreams.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            kafkaStreams.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (StreamsException e) {
        }
    }

    @Test
    public void testLocalThreadCloseWithoutConnectingToBroker() {
        Properties properties = new Properties();
        properties.setProperty("application.id", "appId");
        properties.setProperty("bootstrap.servers", "localhost:1");
        properties.setProperty("metric.reporters", MockMetricsReporter.class.getName());
        properties.setProperty("state.dir", TestUtils.tempDirectory().getPath());
        properties.put("num.stream.threads", Integer.valueOf(NUM_THREADS));
        this.builder.table("anyTopic");
        KafkaStreams kafkaStreams = new KafkaStreams(this.builder.build(), properties);
        Throwable th = null;
        try {
            try {
                kafkaStreams.start();
                if (kafkaStreams != null) {
                    if (0 == 0) {
                        kafkaStreams.close();
                        return;
                    }
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaStreams != null) {
                if (th != null) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testInitializesAndDestroysMetricsReporters() {
        int i = MockMetricsReporter.INIT_COUNT.get();
        KafkaStreams kafkaStreams = new KafkaStreams(this.builder.build(), this.props);
        Throwable th = null;
        try {
            try {
                Assert.assertTrue("some reporters should be initialized by calling on construction", MockMetricsReporter.INIT_COUNT.get() - i > 0);
                kafkaStreams.start();
                int i2 = MockMetricsReporter.CLOSE_COUNT.get();
                kafkaStreams.close();
                Assert.assertEquals(i2 + r0, MockMetricsReporter.CLOSE_COUNT.get());
                if (kafkaStreams != null) {
                    if (0 == 0) {
                        kafkaStreams.close();
                        return;
                    }
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaStreams != null) {
                if (th != null) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testCloseIsIdempotent() {
        this.globalStreams.close();
        int i = MockMetricsReporter.CLOSE_COUNT.get();
        this.globalStreams.close();
        Assert.assertEquals("subsequent close() calls should do nothing", i, MockMetricsReporter.CLOSE_COUNT.get());
    }

    @Test
    public void testCannotStartOnceClosed() {
        this.globalStreams.start();
        try {
            this.globalStreams.start();
            Assert.fail("Should have throw IllegalStateException");
        } catch (IllegalStateException e) {
        } finally {
            this.globalStreams.close();
        }
    }

    @Test
    public void testCannotStartTwice() {
        this.globalStreams.start();
        try {
            this.globalStreams.start();
            Assert.fail("Should throw an IllegalStateException");
        } catch (IllegalStateException e) {
        } finally {
            this.globalStreams.close();
        }
    }

    @Test
    public void shouldNotSetGlobalRestoreListenerAfterStarting() {
        this.globalStreams.start();
        try {
            this.globalStreams.setGlobalStateRestoreListener(new MockStateRestoreListener());
            Assert.fail("Should throw an IllegalStateException");
        } catch (IllegalStateException e) {
        } finally {
            this.globalStreams.close();
        }
    }

    @Test
    public void shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState() {
        this.globalStreams.start();
        try {
            this.globalStreams.setUncaughtExceptionHandler((Thread.UncaughtExceptionHandler) null);
            Assert.fail("Should throw IllegalStateException");
        } catch (IllegalStateException e) {
        }
    }

    @Test
    public void shouldThrowExceptionSettingStateListenerNotInCreateState() {
        this.globalStreams.start();
        try {
            this.globalStreams.setStateListener((KafkaStreams.StateListener) null);
            Assert.fail("Should throw IllegalStateException");
        } catch (IllegalStateException e) {
        }
    }

    @Test
    public void testIllegalMetricsConfig() {
        this.props.setProperty("metrics.recording.level", "illegalConfig");
        try {
            new KafkaStreams(this.builder.build(), this.props);
            Assert.fail("Should have throw ConfigException");
        } catch (ConfigException e) {
        }
    }

    @Test
    public void testLegalMetricsConfig() {
        this.props.setProperty("metrics.recording.level", Sensor.RecordingLevel.INFO.toString());
        new KafkaStreams(this.builder.build(), this.props).close();
        this.props.setProperty("metrics.recording.level", Sensor.RecordingLevel.DEBUG.toString());
        new KafkaStreams(this.builder.build(), this.props).close();
    }

    @Test(expected = IllegalStateException.class)
    public void shouldNotGetAllTasksWhenNotRunning() {
        this.globalStreams.allMetadata();
    }

    @Test(expected = IllegalStateException.class)
    public void shouldNotGetAllTasksWithStoreWhenNotRunning() {
        this.globalStreams.allMetadataForStore("store");
    }

    @Test(expected = IllegalStateException.class)
    public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() {
        this.globalStreams.metadataForKey("store", "key", Serdes.String().serializer());
    }

    @Test(expected = IllegalStateException.class)
    public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() {
        this.globalStreams.metadataForKey("store", "key", (str, str2, obj, i) -> {
            return 0;
        });
    }

    @Test
    public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() throws Exception {
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        KafkaStreams kafkaStreams = null;
        try {
            StreamsBuilder streamsBuilder = new StreamsBuilder();
            CountDownLatch countDownLatch = new CountDownLatch(NUM_BROKERS);
            CLUSTER.createTopics("input");
            streamsBuilder.stream("input", Consumed.with(Serdes.String(), Serdes.String())).foreach((str, str2) -> {
                try {
                    countDownLatch.countDown();
                    while (atomicBoolean.get()) {
                        Thread.sleep(10L);
                    }
                } catch (InterruptedException e) {
                }
            });
            kafkaStreams = new KafkaStreams(streamsBuilder.build(), this.props);
            kafkaStreams.start();
            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp("input", Collections.singletonList(new KeyValue("A", "A")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties()), Long.valueOf(System.currentTimeMillis()));
            Assert.assertTrue("Timed out waiting to receive single message", countDownLatch.await(30L, TimeUnit.SECONDS));
            Assert.assertFalse(kafkaStreams.close(Duration.ofMillis(10L)));
            atomicBoolean.set(false);
            if (kafkaStreams != null) {
                kafkaStreams.close();
            }
        } catch (Throwable th) {
            atomicBoolean.set(false);
            if (kafkaStreams != null) {
                kafkaStreams.close();
            }
            throw th;
        }
    }

    @Test
    public void shouldReturnThreadMetadata() {
        this.globalStreams.start();
        Set<ThreadMetadata> localThreadsMetadata = this.globalStreams.localThreadsMetadata();
        Assert.assertNotNull(localThreadsMetadata);
        Assert.assertEquals(2L, localThreadsMetadata.size());
        for (ThreadMetadata threadMetadata : localThreadsMetadata) {
            Assert.assertTrue("#threadState() was: " + threadMetadata.threadState() + "; expected either RUNNING, STARTING, PARTITIONS_REVOKED, PARTITIONS_ASSIGNED, or CREATED", Arrays.asList("RUNNING", "STARTING", "PARTITIONS_REVOKED", "PARTITIONS_ASSIGNED", "CREATED").contains(threadMetadata.threadState()));
            Assert.assertEquals(0L, threadMetadata.standbyTasks().size());
            Assert.assertEquals(0L, threadMetadata.activeTasks().size());
            String threadName = threadMetadata.threadName();
            Assert.assertTrue(threadName.startsWith("clientId-StreamThread-"));
            Assert.assertEquals(threadName + "-consumer", threadMetadata.consumerClientId());
            Assert.assertEquals(threadName + "-restore-consumer", threadMetadata.restoreConsumerClientId());
            Assert.assertEquals(Collections.singleton(threadName + "-producer"), threadMetadata.producerClientIds());
            Assert.assertEquals("clientId-admin", threadMetadata.adminClientId());
        }
    }

    @Test
    public void shouldAllowCleanupBeforeStartAndAfterClose() {
        try {
            this.globalStreams.cleanUp();
            this.globalStreams.start();
            this.globalStreams.cleanUp();
        } finally {
            this.globalStreams.close();
        }
    }

    @Test
    public void shouldThrowOnCleanupWhileRunning() throws InterruptedException {
        this.globalStreams.start();
        TestUtils.waitForCondition(() -> {
            return this.globalStreams.state() == KafkaStreams.State.RUNNING;
        }, "Streams never started.");
        try {
            this.globalStreams.cleanUp();
            Assert.fail("Should have thrown IllegalStateException");
        } catch (IllegalStateException e) {
            Assert.assertEquals("Cannot clean up while running.", e.getMessage());
        }
    }

    @Test
    public void shouldCleanupOldStateDirs() throws InterruptedException {
        this.props.setProperty("state.cleanup.delay.ms", "1");
        CLUSTER.createTopic("topic");
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table("topic", Materialized.as("store"));
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                CountDownLatch countDownLatch = new CountDownLatch(NUM_BROKERS);
                kafkaStreams.setStateListener((state, state2) -> {
                    if (state == KafkaStreams.State.RUNNING && state2 == KafkaStreams.State.REBALANCING) {
                        countDownLatch.countDown();
                    }
                });
                String str = this.props.getProperty("state.dir") + File.separator + this.props.getProperty("application.id");
                File file = new File(str, "10_1");
                Assert.assertTrue(file.mkdirs());
                kafkaStreams.start();
                countDownLatch.await(30L, TimeUnit.SECONDS);
                verifyCleanupStateDir(str, file);
                Assert.assertTrue(file.mkdirs());
                verifyCleanupStateDir(str, file);
                if (kafkaStreams != null) {
                    if (0 == 0) {
                        kafkaStreams.close();
                        return;
                    }
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaStreams != null) {
                if (th != null) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldThrowOnNegativeTimeoutForClose() {
        try {
            KafkaStreams kafkaStreams = new KafkaStreams(this.builder.build(), this.props);
            Throwable th = null;
            try {
                kafkaStreams.close(Duration.ofMillis(-1L));
                Assert.fail("should not accept negative close parameter");
                if (kafkaStreams != null) {
                    if (0 != 0) {
                        try {
                            kafkaStreams.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaStreams.close();
                    }
                }
            } finally {
            }
        } catch (IllegalArgumentException e) {
        }
    }

    @Test
    public void shouldNotBlockInCloseForZeroDuration() throws InterruptedException {
        KafkaStreams kafkaStreams = new KafkaStreams(this.builder.build(), this.props);
        Thread thread = new Thread(() -> {
            kafkaStreams.close(Duration.ofMillis(0L));
        });
        thread.start();
        try {
            thread.join(30000L);
            Assert.assertFalse(thread.isAlive());
        } finally {
            kafkaStreams.close();
        }
    }

    @Test
    public void statelessTopologyShouldNotCreateStateDirectory() throws Exception {
        String str = this.testName.getMethodName() + "-input";
        String str2 = this.testName.getMethodName() + "-output";
        CLUSTER.createTopics(str, str2);
        Topology topology = new Topology();
        topology.addSource("source", Serdes.String().deserializer(), Serdes.String().deserializer(), new String[]{str}).addProcessor("process", () -> {
            return new AbstractProcessor<String, String>() { // from class: org.apache.kafka.streams.KafkaStreamsTest.1
                public void process(String str3, String str4) {
                    if (str4.length() % KafkaStreamsTest.NUM_THREADS == 0) {
                        context().forward(str3, str3 + str4);
                    }
                }
            };
        }, new String[]{"source"}).addSink("sink", str2, new StringSerializer(), new StringSerializer(), new String[]{"process"});
        startStreamsAndCheckDirExists(topology, Collections.singleton(str), str2, false);
    }

    @Test
    public void inMemoryStatefulTopologyShouldNotCreateStateDirectory() throws Exception {
        String str = this.testName.getMethodName() + "-input";
        String str2 = this.testName.getMethodName() + "-output";
        String str3 = this.testName.getMethodName() + "-global";
        startStreamsAndCheckDirExists(getStatefulTopology(str, str2, str3, this.testName.getMethodName() + "-counts", this.testName.getMethodName() + "-globalStore", false), Arrays.asList(str, str3), str2, false);
    }

    @Test
    public void statefulTopologyShouldCreateStateDirectory() throws Exception {
        String str = this.testName.getMethodName() + "-input";
        String str2 = this.testName.getMethodName() + "-output";
        String str3 = this.testName.getMethodName() + "-global";
        startStreamsAndCheckDirExists(getStatefulTopology(str, str2, str3, this.testName.getMethodName() + "-counts", this.testName.getMethodName() + "-globalStore", true), Arrays.asList(str, str3), str2, true);
    }

    private Topology getStatefulTopology(String str, String str2, String str3, String str4, String str5, boolean z) throws Exception {
        CLUSTER.createTopics(str, str2, str3);
        StoreBuilder keyValueStoreBuilder = Stores.keyValueStoreBuilder(z ? Stores.persistentKeyValueStore(str4) : Stores.inMemoryKeyValueStore(str4), Serdes.String(), Serdes.Long());
        Topology topology = new Topology();
        topology.addSource("source", Serdes.String().deserializer(), Serdes.String().deserializer(), new String[]{str}).addProcessor("process", () -> {
            return new AbstractProcessor<String, String>() { // from class: org.apache.kafka.streams.KafkaStreamsTest.2
                public void process(String str6, String str7) {
                    context().getStateStore(str4).put(str6, 5L);
                    context().forward(str6, "5");
                    context().commit();
                }
            };
        }, new String[]{"source"}).addStateStore(keyValueStoreBuilder, new String[]{"process"}).addSink("sink", str2, new StringSerializer(), new StringSerializer(), new String[]{"process"});
        topology.addGlobalStore(Stores.keyValueStoreBuilder(z ? Stores.persistentKeyValueStore(str5) : Stores.inMemoryKeyValueStore(str5), Serdes.String(), Serdes.String()).withLoggingDisabled(), "global", Serdes.String().deserializer(), Serdes.String().deserializer(), str3, str3 + "-processor", new MockProcessorSupplier());
        return topology;
    }

    private void startStreamsAndCheckDirExists(Topology topology, Collection<String> collection, String str, boolean z) throws Exception {
        File file = new File(TestUtils.IO_TMP_DIR + File.separator + "kafka-" + TestUtils.randomString(5));
        Path path = file.toPath();
        if (!file.exists()) {
            Files.createDirectory(path, new FileAttribute[0]);
        }
        Properties properties = new Properties();
        properties.putAll(this.props);
        properties.put("state.dir", file.getAbsolutePath());
        KafkaStreams kafkaStreams = new KafkaStreams(topology, properties);
        kafkaStreams.start();
        Iterator<String> it = collection.iterator();
        while (it.hasNext()) {
            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(it.next(), Collections.singletonList(new KeyValue("A", "A")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties()), Long.valueOf(System.currentTimeMillis()));
        }
        IntegrationTestUtils.readKeyValues(str, TestUtils.consumerConfig(CLUSTER.bootstrapServers(), str + "-group", StringDeserializer.class, StringDeserializer.class), 5000L, NUM_BROKERS);
        try {
            try {
                List list = (List) Files.find(path, 999, (path2, basicFileAttributes) -> {
                    return !path2.equals(path);
                }, new FileVisitOption[0]).collect(Collectors.toList());
                if (z && list.isEmpty()) {
                    Assert.fail("Files should have existed, but it didn't: " + list);
                }
                if (!z && !list.isEmpty()) {
                    Assert.fail("Files should not have existed, but it did: " + list);
                }
            } catch (IOException e) {
                Assert.fail("Couldn't read the state directory : " + file.getPath());
                kafkaStreams.close();
                kafkaStreams.cleanUp();
                Utils.delete(file);
            }
        } finally {
            kafkaStreams.close();
            kafkaStreams.cleanUp();
            Utils.delete(file);
        }
    }

    private void verifyCleanupStateDir(String str, File file) throws InterruptedException {
        File file2 = new File(str, "0_0");
        TestUtils.waitForCondition(() -> {
            return !file.exists() && file2.exists();
        }, "cleanup has not successfully run");
        Assert.assertTrue(file2.exists());
    }
}
