package org.apache.kafka.streams;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.StreamsNotStartedException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StandbyUpdateListener;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger;
import org.apache.kafka.streams.utils.TestUtils;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.MockProcessorSupplier;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.MockedConstruction;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@Timeout(600)
@ExtendWith({MockitoExtension.class})
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
/* loaded from: input_file:org/apache/kafka/streams/KafkaStreamsTest.class */
public class KafkaStreamsTest {
    private static final int NUM_THREADS = 2;
    private static final String APPLICATION_ID = "appId-";
    private static final String CLIENT_ID = "test-client";
    private static final Duration DEFAULT_DURATION = Duration.ofSeconds(30);
    private MockClientSupplier supplier;
    private MockTime time;
    private Properties props;
    private MockAdminClient adminClient;
    private StateListenerStub streamsStateListener;

    @Mock
    private StreamThread streamThreadOne;

    @Mock
    private StreamThread streamThreadTwo;

    @Captor
    private ArgumentCaptor<StreamThread.StateListener> threadStateListenerCapture;
    private MockedStatic<ClientMetrics> clientMetricsMockedStatic;
    private MockedStatic<StreamThread> streamThreadMockedStatic;
    private MockedStatic<StreamsConfigUtils> streamsConfigUtils;
    private MockedConstruction<GlobalStreamThread> globalStreamThreadMockedConstruction;
    private MockedConstruction<Metrics> metricsMockedConstruction;

    /* loaded from: input_file:org/apache/kafka/streams/KafkaStreamsTest$StateListenerStub.class */
    public static class StateListenerStub implements KafkaStreams.StateListener {
        KafkaStreams.State oldState;
        KafkaStreams.State newState;
        int numChanges = 0;
        public Map<KafkaStreams.State, Long> mapStates = new HashMap();

        public void onChange(KafkaStreams.State state, KafkaStreams.State state2) {
            long longValue = this.mapStates.containsKey(state) ? this.mapStates.get(state).longValue() : 0L;
            this.numChanges++;
            this.oldState = state2;
            this.newState = state;
            this.mapStates.put(state, Long.valueOf(longValue + 1));
        }
    }

    @BeforeEach
    public void before(TestInfo testInfo) throws Exception {
        this.time = new MockTime();
        this.supplier = new MockClientSupplier();
        this.supplier.setCluster(Cluster.bootstrap(Collections.singletonList(new InetSocketAddress("localhost", 9999))));
        this.adminClient = this.supplier.getAdmin(null);
        this.streamsStateListener = new StateListenerStub();
        this.props = new Properties();
        this.props.put("application.id", "appId-" + TestUtils.safeUniqueTestName(testInfo));
        this.props.put("client.id", CLIENT_ID);
        this.props.put("bootstrap.servers", "localhost:2018");
        this.props.put("metric.reporters", MockMetricsReporter.class.getName());
        this.props.put("state.dir", org.apache.kafka.test.TestUtils.tempDirectory().getPath());
        this.props.put("num.stream.threads", Integer.valueOf(NUM_THREADS));
    }

    @AfterEach
    public void tearDown() {
        if (this.clientMetricsMockedStatic != null) {
            this.clientMetricsMockedStatic.close();
        }
        if (this.streamThreadMockedStatic != null) {
            this.streamThreadMockedStatic.close();
        }
        if (this.globalStreamThreadMockedConstruction != null) {
            this.globalStreamThreadMockedConstruction.close();
        }
        if (this.metricsMockedConstruction != null) {
            this.metricsMockedConstruction.close();
        }
        if (this.streamsConfigUtils != null) {
            this.streamsConfigUtils.close();
        }
        if (this.adminClient != null) {
            this.adminClient.close();
        }
        Mockito.reset(new StreamThread[]{this.streamThreadOne, this.streamThreadTwo});
    }

    private void prepareStreams() {
        this.metricsMockedConstruction = Mockito.mockConstruction(Metrics.class, (metrics, context) -> {
            Assertions.assertEquals(4, context.arguments().size());
            List list = (List) context.arguments().get(1);
            Iterator it = list.iterator();
            while (it.hasNext()) {
                ((MetricsReporter) it.next()).init(Collections.emptyList());
            }
            ((Metrics) Mockito.doAnswer(invocationOnMock -> {
                Iterator it2 = list.iterator();
                while (it2.hasNext()) {
                    ((MetricsReporter) it2.next()).close();
                }
                return null;
            }).when(metrics)).close();
        });
        this.clientMetricsMockedStatic = Mockito.mockStatic(ClientMetrics.class);
        this.clientMetricsMockedStatic.when(ClientMetrics::version).thenReturn("1.56");
        this.clientMetricsMockedStatic.when(ClientMetrics::commitId).thenReturn("1a2b3c4d5e");
        ClientMetrics.addVersionMetric((StreamsMetricsImpl) ArgumentMatchers.any(StreamsMetricsImpl.class));
        ClientMetrics.addCommitIdMetric((StreamsMetricsImpl) ArgumentMatchers.any(StreamsMetricsImpl.class));
        ClientMetrics.addApplicationIdMetric((StreamsMetricsImpl) ArgumentMatchers.any(StreamsMetricsImpl.class), (String) Mockito.eq(APPLICATION_ID));
        ClientMetrics.addTopologyDescriptionMetric((StreamsMetricsImpl) ArgumentMatchers.any(StreamsMetricsImpl.class), (Gauge) ArgumentMatchers.any());
        ClientMetrics.addStateMetric((StreamsMetricsImpl) ArgumentMatchers.any(StreamsMetricsImpl.class), (Gauge) ArgumentMatchers.any());
        ClientMetrics.addNumAliveStreamThreadMetric((StreamsMetricsImpl) ArgumentMatchers.any(StreamsMetricsImpl.class), (Gauge) ArgumentMatchers.any());
        this.streamThreadMockedStatic = Mockito.mockStatic(StreamThread.class);
        this.streamThreadMockedStatic.when(() -> {
            StreamThread.create((TopologyMetadata) ArgumentMatchers.any(TopologyMetadata.class), (StreamsConfig) ArgumentMatchers.any(StreamsConfig.class), (KafkaClientSupplier) ArgumentMatchers.any(KafkaClientSupplier.class), (Admin) ArgumentMatchers.any(Admin.class), (UUID) ArgumentMatchers.any(UUID.class), (String) ArgumentMatchers.any(String.class), (StreamsMetricsImpl) ArgumentMatchers.any(StreamsMetricsImpl.class), (Time) ArgumentMatchers.any(Time.class), (StreamsMetadataState) ArgumentMatchers.any(StreamsMetadataState.class), Mockito.anyLong(), (StateDirectory) ArgumentMatchers.any(StateDirectory.class), (StateRestoreListener) ArgumentMatchers.any(StateRestoreListener.class), (StandbyUpdateListener) ArgumentMatchers.any(StandbyUpdateListener.class), Mockito.anyInt(), (Runnable) ArgumentMatchers.any(Runnable.class), (BiConsumer) ArgumentMatchers.any());
        }).thenReturn(this.streamThreadOne).thenReturn(this.streamThreadTwo);
        this.streamsConfigUtils = Mockito.mockStatic(StreamsConfigUtils.class);
        this.streamsConfigUtils.when(() -> {
            StreamsConfigUtils.processingMode((StreamsConfig) ArgumentMatchers.any(StreamsConfig.class));
        }).thenReturn(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE);
        this.streamsConfigUtils.when(() -> {
            StreamsConfigUtils.eosEnabled((StreamsConfig) ArgumentMatchers.any(StreamsConfig.class));
        }).thenReturn(false);
        this.streamsConfigUtils.when(() -> {
            StreamsConfigUtils.totalCacheSize((StreamsConfig) ArgumentMatchers.any(StreamsConfig.class));
        }).thenReturn(10485760L);
        AtomicReference atomicReference = new AtomicReference(GlobalStreamThread.State.CREATED);
        this.globalStreamThreadMockedConstruction = Mockito.mockConstruction(GlobalStreamThread.class, (globalStreamThread, context2) -> {
            Mockito.when(globalStreamThread.state()).thenAnswer(invocationOnMock -> {
                return atomicReference.get();
            });
            ((GlobalStreamThread) Mockito.doNothing().when(globalStreamThread)).setStateListener((StreamThread.StateListener) this.threadStateListenerCapture.capture());
            ((GlobalStreamThread) Mockito.doAnswer(invocationOnMock2 -> {
                atomicReference.set(GlobalStreamThread.State.RUNNING);
                ((StreamThread.StateListener) this.threadStateListenerCapture.getValue()).onChange(globalStreamThread, GlobalStreamThread.State.RUNNING, GlobalStreamThread.State.CREATED);
                return null;
            }).when(globalStreamThread)).start();
            ((GlobalStreamThread) Mockito.doAnswer(invocationOnMock3 -> {
                this.supplier.restoreConsumer.close();
                Iterator<MockProducer<byte[], byte[]>> it = this.supplier.producers.iterator();
                while (it.hasNext()) {
                    it.next().close();
                }
                atomicReference.set(GlobalStreamThread.State.DEAD);
                ((StreamThread.StateListener) this.threadStateListenerCapture.getValue()).onChange(globalStreamThread, GlobalStreamThread.State.PENDING_SHUTDOWN, GlobalStreamThread.State.RUNNING);
                ((StreamThread.StateListener) this.threadStateListenerCapture.getValue()).onChange(globalStreamThread, GlobalStreamThread.State.DEAD, GlobalStreamThread.State.PENDING_SHUTDOWN);
                return null;
            }).when(globalStreamThread)).shutdown();
            Mockito.when(Boolean.valueOf(globalStreamThread.stillRunning())).thenReturn(Boolean.valueOf(atomicReference.get() == GlobalStreamThread.State.RUNNING));
        });
    }

    private AtomicReference<StreamThread.State> prepareStreamThread(StreamThread streamThread, int i) {
        Mockito.when(Long.valueOf(streamThread.getId())).thenReturn(Long.valueOf(i));
        AtomicReference<StreamThread.State> atomicReference = new AtomicReference<>(StreamThread.State.CREATED);
        Mockito.when(streamThread.state()).thenAnswer(invocationOnMock -> {
            return atomicReference.get();
        });
        ((StreamThread) Mockito.doNothing().when(streamThread)).setStateListener((StreamThread.StateListener) this.threadStateListenerCapture.capture());
        Mockito.when(streamThread.getName()).thenReturn("processId-StreamThread-" + i);
        return atomicReference;
    }

    private void prepareConsumer(StreamThread streamThread, AtomicReference<StreamThread.State> atomicReference) {
        ((StreamThread) Mockito.doAnswer(invocationOnMock -> {
            this.supplier.consumer.close();
            this.supplier.restoreConsumer.close();
            Iterator<MockProducer<byte[], byte[]>> it = this.supplier.producers.iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            atomicReference.set(StreamThread.State.DEAD);
            ((StreamThread.StateListener) this.threadStateListenerCapture.getValue()).onChange(streamThread, StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.RUNNING);
            ((StreamThread.StateListener) this.threadStateListenerCapture.getValue()).onChange(streamThread, StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN);
            return null;
        }).when(streamThread)).shutdown();
    }

    private void prepareThreadLock(StreamThread streamThread) {
        Mockito.when(streamThread.getStateLock()).thenReturn(new Object());
    }

    private void prepareThreadState(StreamThread streamThread, AtomicReference<StreamThread.State> atomicReference) {
        ((StreamThread) Mockito.doAnswer(invocationOnMock -> {
            atomicReference.set(StreamThread.State.STARTING);
            ((StreamThread.StateListener) this.threadStateListenerCapture.getValue()).onChange(streamThread, StreamThread.State.STARTING, StreamThread.State.CREATED);
            ((StreamThread.StateListener) this.threadStateListenerCapture.getValue()).onChange(streamThread, StreamThread.State.PARTITIONS_REVOKED, StreamThread.State.STARTING);
            ((StreamThread.StateListener) this.threadStateListenerCapture.getValue()).onChange(streamThread, StreamThread.State.PARTITIONS_ASSIGNED, StreamThread.State.PARTITIONS_REVOKED);
            ((StreamThread.StateListener) this.threadStateListenerCapture.getValue()).onChange(streamThread, StreamThread.State.RUNNING, StreamThread.State.PARTITIONS_ASSIGNED);
            return null;
        }).when(streamThread)).start();
    }

    private void prepareTerminableThread(StreamThread streamThread) throws InterruptedException {
        ((StreamThread) Mockito.doAnswer(invocationOnMock -> {
            Thread.sleep(2000L);
            return null;
        }).when(streamThread)).join();
    }

    @Test
    public void testShouldTransitToNotRunningIfCloseRightAfterCreated() {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.close();
            Assertions.assertEquals(KafkaStreams.State.NOT_RUNNING, kafkaStreams.state());
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws Exception {
        prepareStreams();
        AtomicReference<StreamThread.State> prepareStreamThread = prepareStreamThread(this.streamThreadOne, 1);
        AtomicReference<StreamThread.State> prepareStreamThread2 = prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareThreadState(this.streamThreadOne, prepareStreamThread);
        prepareThreadState(this.streamThreadTwo, prepareStreamThread2);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.setStateListener(this.streamsStateListener);
            Assertions.assertEquals(0, this.streamsStateListener.numChanges);
            Assertions.assertEquals(KafkaStreams.State.CREATED, kafkaStreams.state());
            kafkaStreams.start();
            org.apache.kafka.test.TestUtils.waitForCondition(() -> {
                return this.streamsStateListener.numChanges == NUM_THREADS;
            }, "Streams never started.");
            Assertions.assertEquals(KafkaStreams.State.RUNNING, kafkaStreams.state());
            org.apache.kafka.test.TestUtils.waitForCondition(() -> {
                return this.streamsStateListener.numChanges == NUM_THREADS;
            }, "Streams never started.");
            Assertions.assertEquals(KafkaStreams.State.RUNNING, kafkaStreams.state());
            Iterator it = kafkaStreams.threads.iterator();
            while (it.hasNext()) {
                ((StreamThread.StateListener) this.threadStateListenerCapture.getValue()).onChange((StreamThread) it.next(), StreamThread.State.PARTITIONS_REVOKED, StreamThread.State.RUNNING);
            }
            Assertions.assertEquals(3, this.streamsStateListener.numChanges);
            Assertions.assertEquals(KafkaStreams.State.REBALANCING, kafkaStreams.state());
            Iterator it2 = kafkaStreams.threads.iterator();
            while (it2.hasNext()) {
                ((StreamThread.StateListener) this.threadStateListenerCapture.getValue()).onChange((StreamThread) it2.next(), StreamThread.State.PARTITIONS_ASSIGNED, StreamThread.State.PARTITIONS_REVOKED);
            }
            Assertions.assertEquals(3, this.streamsStateListener.numChanges);
            Assertions.assertEquals(KafkaStreams.State.REBALANCING, kafkaStreams.state());
            ((StreamThread.StateListener) this.threadStateListenerCapture.getValue()).onChange((Thread) kafkaStreams.threads.get(1), StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.PARTITIONS_ASSIGNED);
            ((StreamThread.StateListener) this.threadStateListenerCapture.getValue()).onChange((Thread) kafkaStreams.threads.get(1), StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN);
            Assertions.assertEquals(3, this.streamsStateListener.numChanges);
            Assertions.assertEquals(KafkaStreams.State.REBALANCING, kafkaStreams.state());
            for (StreamThread streamThread : kafkaStreams.threads) {
                if (streamThread != kafkaStreams.threads.get(1)) {
                    ((StreamThread.StateListener) this.threadStateListenerCapture.getValue()).onChange(streamThread, StreamThread.State.RUNNING, StreamThread.State.PARTITIONS_ASSIGNED);
                }
            }
            Assertions.assertEquals(4, this.streamsStateListener.numChanges);
            Assertions.assertEquals(KafkaStreams.State.RUNNING, kafkaStreams.state());
            kafkaStreams.close();
            org.apache.kafka.test.TestUtils.waitForCondition(() -> {
                return this.streamsStateListener.numChanges == 6;
            }, "Streams never closed.");
            Assertions.assertEquals(KafkaStreams.State.NOT_RUNNING, kafkaStreams.state());
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception {
        prepareStreams();
        AtomicReference<StreamThread.State> prepareStreamThread = prepareStreamThread(this.streamThreadOne, 1);
        AtomicReference<StreamThread.State> prepareStreamThread2 = prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareConsumer(this.streamThreadOne, prepareStreamThread);
        prepareConsumer(this.streamThreadTwo, prepareStreamThread2);
        StreamsBuilder builderWithSource = getBuilderWithSource();
        builderWithSource.globalTable("anyTopic");
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(KafkaStreams.class);
        try {
            KafkaStreams kafkaStreams = new KafkaStreams(builderWithSource.build(), this.props, this.supplier, this.time);
            try {
                kafkaStreams.close();
                org.apache.kafka.test.TestUtils.waitForCondition(() -> {
                    return kafkaStreams.state() == KafkaStreams.State.NOT_RUNNING;
                }, "Streams never stopped.");
                MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.not(CoreMatchers.hasItem(Matchers.containsString("ERROR"))));
                kafkaStreams.close();
                if (createAndRegister != null) {
                    createAndRegister.close();
                }
                Assertions.assertTrue(this.supplier.consumer.closed());
                Assertions.assertTrue(this.supplier.restoreConsumer.closed());
                Iterator<MockProducer<byte[], byte[]>> it = this.supplier.producers.iterator();
                while (it.hasNext()) {
                    Assertions.assertTrue(it.next().closed());
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createAndRegister != null) {
                try {
                    createAndRegister.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testStateThreadClose() throws Exception {
        prepareStreams();
        AtomicReference<StreamThread.State> prepareStreamThread = prepareStreamThread(this.streamThreadOne, 1);
        AtomicReference<StreamThread.State> prepareStreamThread2 = prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareThreadState(this.streamThreadOne, prepareStreamThread);
        prepareThreadState(this.streamThreadTwo, prepareStreamThread2);
        prepareConsumer(this.streamThreadOne, prepareStreamThread);
        prepareConsumer(this.streamThreadTwo, prepareStreamThread2);
        prepareThreadLock(this.streamThreadOne);
        prepareThreadLock(this.streamThreadTwo);
        StreamsBuilder builderWithSource = getBuilderWithSource();
        builderWithSource.globalTable("anyTopic");
        KafkaStreams kafkaStreams = new KafkaStreams(builderWithSource.build(), this.props, this.supplier, this.time);
        try {
            Assertions.assertEquals(NUM_THREADS, kafkaStreams.threads.size());
            Assertions.assertEquals(kafkaStreams.state(), KafkaStreams.State.CREATED);
            kafkaStreams.start();
            org.apache.kafka.test.TestUtils.waitForCondition(() -> {
                return kafkaStreams.state() == KafkaStreams.State.RUNNING;
            }, "Streams never started.");
            for (int i = 0; i < NUM_THREADS; i++) {
                StreamThread streamThread = (StreamThread) kafkaStreams.threads.get(i);
                streamThread.shutdown();
                org.apache.kafka.test.TestUtils.waitForCondition(() -> {
                    return streamThread.state() == StreamThread.State.DEAD;
                }, "Thread never stopped.");
                ((StreamThread) kafkaStreams.threads.get(i)).join();
            }
            org.apache.kafka.test.TestUtils.waitForCondition(() -> {
                return kafkaStreams.metadataForLocalThreads().stream().allMatch(threadMetadata -> {
                    return threadMetadata.threadState().equals("DEAD");
                });
            }, "Streams never stopped");
            kafkaStreams.close();
            org.apache.kafka.test.TestUtils.waitForCondition(() -> {
                return kafkaStreams.state() == KafkaStreams.State.NOT_RUNNING;
            }, "Streams never stopped.");
            Assertions.assertNull(kafkaStreams.globalStreamThread);
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testStateGlobalThreadClose() throws Exception {
        prepareStreams();
        AtomicReference<StreamThread.State> prepareStreamThread = prepareStreamThread(this.streamThreadOne, 1);
        AtomicReference<StreamThread.State> prepareStreamThread2 = prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareThreadState(this.streamThreadOne, prepareStreamThread);
        prepareThreadState(this.streamThreadTwo, prepareStreamThread2);
        StreamsBuilder builderWithSource = getBuilderWithSource();
        builderWithSource.globalTable("anyTopic");
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(KafkaStreams.class);
        try {
            KafkaStreams kafkaStreams = new KafkaStreams(builderWithSource.build(), this.props, this.supplier, this.time);
            try {
                kafkaStreams.start();
                org.apache.kafka.test.TestUtils.waitForCondition(() -> {
                    return kafkaStreams.state() == KafkaStreams.State.RUNNING;
                }, "Streams never started.");
                GlobalStreamThread globalStreamThread = kafkaStreams.globalStreamThread;
                globalStreamThread.shutdown();
                org.apache.kafka.test.TestUtils.waitForCondition(() -> {
                    return globalStreamThread.state() == GlobalStreamThread.State.DEAD;
                }, "Thread never stopped.");
                org.apache.kafka.test.TestUtils.waitForCondition(() -> {
                    return kafkaStreams.state() == KafkaStreams.State.ERROR;
                }, "Thread never stopped.");
                kafkaStreams.close();
                Assertions.assertEquals(kafkaStreams.state(), KafkaStreams.State.ERROR, "KafkaStreams should remain in ERROR state after close.");
                MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem(Matchers.containsString("State transition from RUNNING to PENDING_ERROR")));
                MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem(Matchers.containsString("State transition from PENDING_ERROR to ERROR")));
                MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem(Matchers.containsString("Streams client is already in the terminal ERROR state")));
                kafkaStreams.close();
                if (createAndRegister != null) {
                    createAndRegister.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createAndRegister != null) {
                try {
                    createAndRegister.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testInitializesAndDestroysMetricsReporters() {
        prepareStreams();
        AtomicReference<StreamThread.State> prepareStreamThread = prepareStreamThread(this.streamThreadOne, 1);
        AtomicReference<StreamThread.State> prepareStreamThread2 = prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareThreadState(this.streamThreadOne, prepareStreamThread);
        prepareThreadState(this.streamThreadTwo, prepareStreamThread2);
        int i = MockMetricsReporter.INIT_COUNT.get();
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            int i2 = MockMetricsReporter.INIT_COUNT.get() - i;
            Assertions.assertEquals(1, i2, "some reporters including MockMetricsReporter should be initialized by calling on construction");
            kafkaStreams.start();
            int i3 = MockMetricsReporter.CLOSE_COUNT.get();
            kafkaStreams.close();
            Assertions.assertEquals(kafkaStreams.state(), KafkaStreams.State.NOT_RUNNING);
            Assertions.assertEquals(i3 + i2, MockMetricsReporter.CLOSE_COUNT.get());
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testCloseIsIdempotent() {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.close();
            int i = MockMetricsReporter.CLOSE_COUNT.get();
            kafkaStreams.close();
            Assertions.assertEquals(i, MockMetricsReporter.CLOSE_COUNT.get(), "subsequent close() calls should do nothing");
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testPauseResume() {
        prepareStreams();
        AtomicReference<StreamThread.State> prepareStreamThread = prepareStreamThread(this.streamThreadOne, 1);
        AtomicReference<StreamThread.State> prepareStreamThread2 = prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareThreadState(this.streamThreadOne, prepareStreamThread);
        prepareThreadState(this.streamThreadTwo, prepareStreamThread2);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.start();
            kafkaStreams.pause();
            Assertions.assertTrue(kafkaStreams.isPaused());
            kafkaStreams.resume();
            Assertions.assertFalse(kafkaStreams.isPaused());
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testStartingPaused() {
        prepareStreams();
        AtomicReference<StreamThread.State> prepareStreamThread = prepareStreamThread(this.streamThreadOne, 1);
        AtomicReference<StreamThread.State> prepareStreamThread2 = prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareThreadState(this.streamThreadOne, prepareStreamThread);
        prepareThreadState(this.streamThreadTwo, prepareStreamThread2);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.pause();
            kafkaStreams.start();
            Assertions.assertTrue(kafkaStreams.isPaused());
            kafkaStreams.resume();
            Assertions.assertFalse(kafkaStreams.isPaused());
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testShowPauseResumeAreIdempotent() {
        prepareStreams();
        AtomicReference<StreamThread.State> prepareStreamThread = prepareStreamThread(this.streamThreadOne, 1);
        AtomicReference<StreamThread.State> prepareStreamThread2 = prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareThreadState(this.streamThreadOne, prepareStreamThread);
        prepareThreadState(this.streamThreadTwo, prepareStreamThread2);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.start();
            kafkaStreams.pause();
            Assertions.assertTrue(kafkaStreams.isPaused());
            kafkaStreams.pause();
            Assertions.assertTrue(kafkaStreams.isPaused());
            kafkaStreams.resume();
            Assertions.assertFalse(kafkaStreams.isPaused());
            kafkaStreams.resume();
            Assertions.assertFalse(kafkaStreams.isPaused());
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldAddThreadWhenRunning() throws Exception {
        prepareStreams();
        AtomicReference<StreamThread.State> prepareStreamThread = prepareStreamThread(this.streamThreadOne, 1);
        AtomicReference<StreamThread.State> prepareStreamThread2 = prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareThreadState(this.streamThreadOne, prepareStreamThread);
        prepareThreadState(this.streamThreadTwo, prepareStreamThread2);
        this.props.put("num.stream.threads", 1);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.start();
            int size = kafkaStreams.threads.size();
            org.apache.kafka.test.TestUtils.waitForCondition(() -> {
                return kafkaStreams.state() == KafkaStreams.State.RUNNING;
            }, 15L, "wait until running");
            MatcherAssert.assertThat(kafkaStreams.addStreamThread(), Matchers.equalTo(Optional.of("processId-StreamThread-2")));
            MatcherAssert.assertThat(Integer.valueOf(kafkaStreams.threads.size()), Matchers.equalTo(Integer.valueOf(size + 1)));
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldNotAddThreadWhenCreated() {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            int size = kafkaStreams.threads.size();
            MatcherAssert.assertThat(kafkaStreams.addStreamThread(), Matchers.equalTo(Optional.empty()));
            MatcherAssert.assertThat(Integer.valueOf(kafkaStreams.threads.size()), Matchers.equalTo(Integer.valueOf(size)));
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldNotAddThreadWhenClosed() {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            int size = kafkaStreams.threads.size();
            kafkaStreams.close();
            MatcherAssert.assertThat(kafkaStreams.addStreamThread(), Matchers.equalTo(Optional.empty()));
            MatcherAssert.assertThat(Integer.valueOf(kafkaStreams.threads.size()), Matchers.equalTo(Integer.valueOf(size)));
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldNotAddThreadWhenError() {
        prepareStreams();
        AtomicReference<StreamThread.State> prepareStreamThread = prepareStreamThread(this.streamThreadOne, 1);
        AtomicReference<StreamThread.State> prepareStreamThread2 = prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareThreadState(this.streamThreadOne, prepareStreamThread);
        prepareThreadState(this.streamThreadTwo, prepareStreamThread2);
        StreamsBuilder builderWithSource = getBuilderWithSource();
        builderWithSource.globalTable("anyTopic");
        KafkaStreams kafkaStreams = new KafkaStreams(builderWithSource.build(), this.props, this.supplier, this.time);
        try {
            int size = kafkaStreams.threads.size();
            kafkaStreams.start();
            kafkaStreams.globalStreamThread.shutdown();
            MatcherAssert.assertThat(kafkaStreams.addStreamThread(), Matchers.equalTo(Optional.empty()));
            MatcherAssert.assertThat(Integer.valueOf(kafkaStreams.threads.size()), Matchers.equalTo(Integer.valueOf(size)));
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldNotReturnDeadThreads() {
        prepareStreams();
        AtomicReference<StreamThread.State> prepareStreamThread = prepareStreamThread(this.streamThreadOne, 1);
        AtomicReference<StreamThread.State> prepareStreamThread2 = prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareThreadState(this.streamThreadOne, prepareStreamThread);
        prepareThreadState(this.streamThreadTwo, prepareStreamThread2);
        prepareThreadLock(this.streamThreadOne);
        prepareThreadLock(this.streamThreadTwo);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.start();
            this.streamThreadOne.shutdown();
            Set metadataForLocalThreads = kafkaStreams.metadataForLocalThreads();
            MatcherAssert.assertThat(Integer.valueOf(metadataForLocalThreads.size()), Matchers.equalTo(1));
            MatcherAssert.assertThat(metadataForLocalThreads, CoreMatchers.hasItem(this.streamThreadTwo.threadMetadata()));
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldRemoveThread() throws Exception {
        prepareStreams();
        AtomicReference<StreamThread.State> prepareStreamThread = prepareStreamThread(this.streamThreadOne, 1);
        AtomicReference<StreamThread.State> prepareStreamThread2 = prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareThreadState(this.streamThreadOne, prepareStreamThread);
        prepareThreadState(this.streamThreadTwo, prepareStreamThread2);
        Mockito.when(this.streamThreadOne.groupInstanceID()).thenReturn(Optional.empty());
        Mockito.when(Boolean.valueOf(this.streamThreadOne.waitOnThreadState((StreamThread.State) Mockito.isA(StreamThread.State.class), Mockito.anyLong()))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.streamThreadOne.isThreadAlive())).thenReturn(true);
        this.props.put("num.stream.threads", Integer.valueOf(NUM_THREADS));
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.start();
            int size = kafkaStreams.threads.size();
            org.apache.kafka.test.TestUtils.waitForCondition(() -> {
                return kafkaStreams.state() == KafkaStreams.State.RUNNING;
            }, 15L, "Kafka Streams client did not reach state RUNNING");
            MatcherAssert.assertThat(kafkaStreams.removeStreamThread(), Matchers.equalTo(Optional.of("processId-StreamThread-1")));
            MatcherAssert.assertThat(Integer.valueOf(kafkaStreams.threads.size()), Matchers.equalTo(Integer.valueOf(size - 1)));
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldNotRemoveThreadWhenNotRunning() {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        this.props.put("num.stream.threads", 1);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            MatcherAssert.assertThat(kafkaStreams.removeStreamThread(), Matchers.equalTo(Optional.empty()));
            MatcherAssert.assertThat(Integer.valueOf(kafkaStreams.threads.size()), Matchers.equalTo(1));
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testCannotStartOnceClosed() {
        prepareStreams();
        AtomicReference<StreamThread.State> prepareStreamThread = prepareStreamThread(this.streamThreadOne, 1);
        AtomicReference<StreamThread.State> prepareStreamThread2 = prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareThreadState(this.streamThreadOne, prepareStreamThread);
        prepareThreadState(this.streamThreadTwo, prepareStreamThread2);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.start();
            kafkaStreams.close();
            try {
                kafkaStreams.start();
                Assertions.fail("Should have throw IllegalStateException");
            } catch (IllegalStateException e) {
            }
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldNotSetGlobalRestoreListenerAfterStarting() {
        prepareStreams();
        AtomicReference<StreamThread.State> prepareStreamThread = prepareStreamThread(this.streamThreadOne, 1);
        AtomicReference<StreamThread.State> prepareStreamThread2 = prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareThreadState(this.streamThreadOne, prepareStreamThread);
        prepareThreadState(this.streamThreadTwo, prepareStreamThread2);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.start();
            try {
                kafkaStreams.setGlobalStateRestoreListener((StateRestoreListener) null);
                Assertions.fail("Should throw an IllegalStateException");
            } catch (IllegalStateException e) {
            }
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState() {
        prepareStreams();
        AtomicReference<StreamThread.State> prepareStreamThread = prepareStreamThread(this.streamThreadOne, 1);
        AtomicReference<StreamThread.State> prepareStreamThread2 = prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareThreadState(this.streamThreadOne, prepareStreamThread);
        prepareThreadState(this.streamThreadTwo, prepareStreamThread2);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.start();
            Assertions.assertThrows(IllegalStateException.class, () -> {
                kafkaStreams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler) null);
            });
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldThrowExceptionSettingStreamsUncaughtExceptionHandlerNotInCreateState() {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.start();
            Assertions.assertThrows(IllegalStateException.class, () -> {
                kafkaStreams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler) null);
            });
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldThrowNullPointerExceptionSettingStreamsUncaughtExceptionHandlerIfNull() {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            Assertions.assertThrows(NullPointerException.class, () -> {
                kafkaStreams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler) null);
            });
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldThrowExceptionSettingStateListenerNotInCreateState() {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.start();
            try {
                kafkaStreams.setStateListener((KafkaStreams.StateListener) null);
                Assertions.fail("Should throw IllegalStateException");
            } catch (IllegalStateException e) {
            }
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldAllowCleanupBeforeStartAndAfterClose() {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            try {
                kafkaStreams.cleanUp();
                kafkaStreams.start();
                kafkaStreams.close();
                kafkaStreams.cleanUp();
                kafkaStreams.close();
            } catch (Throwable th) {
                kafkaStreams.close();
                kafkaStreams.cleanUp();
                throw th;
            }
        } catch (Throwable th2) {
            try {
                kafkaStreams.close();
            } catch (Throwable th3) {
                th2.addSuppressed(th3);
            }
            throw th2;
        }
    }

    @Test
    public void shouldThrowOnCleanupWhileRunning() throws Exception {
        prepareStreams();
        AtomicReference<StreamThread.State> prepareStreamThread = prepareStreamThread(this.streamThreadOne, 1);
        AtomicReference<StreamThread.State> prepareStreamThread2 = prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareThreadState(this.streamThreadOne, prepareStreamThread);
        prepareThreadState(this.streamThreadTwo, prepareStreamThread2);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.start();
            org.apache.kafka.test.TestUtils.waitForCondition(() -> {
                return kafkaStreams.state() == KafkaStreams.State.RUNNING;
            }, "Streams never started.");
            try {
                kafkaStreams.cleanUp();
                Assertions.fail("Should have thrown IllegalStateException");
            } catch (IllegalStateException e) {
                Assertions.assertEquals("Cannot clean up while running.", e.getMessage());
            }
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldThrowOnCleanupWhilePaused() throws Exception {
        prepareStreams();
        AtomicReference<StreamThread.State> prepareStreamThread = prepareStreamThread(this.streamThreadOne, 1);
        AtomicReference<StreamThread.State> prepareStreamThread2 = prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareThreadState(this.streamThreadOne, prepareStreamThread);
        prepareThreadState(this.streamThreadTwo, prepareStreamThread2);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.start();
            org.apache.kafka.test.TestUtils.waitForCondition(() -> {
                return kafkaStreams.state() == KafkaStreams.State.RUNNING;
            }, "Streams never started.");
            kafkaStreams.pause();
            Objects.requireNonNull(kafkaStreams);
            org.apache.kafka.test.TestUtils.waitForCondition(kafkaStreams::isPaused, "Streams did not pause.");
            Objects.requireNonNull(kafkaStreams);
            Assertions.assertThrows(IllegalStateException.class, kafkaStreams::cleanUp, "Cannot clean up while running.");
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldThrowOnCleanupWhileShuttingDown() throws Exception {
        prepareStreams();
        AtomicReference<StreamThread.State> prepareStreamThread = prepareStreamThread(this.streamThreadOne, 1);
        AtomicReference<StreamThread.State> prepareStreamThread2 = prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareThreadState(this.streamThreadOne, prepareStreamThread);
        prepareThreadState(this.streamThreadTwo, prepareStreamThread2);
        prepareTerminableThread(this.streamThreadOne);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.start();
            org.apache.kafka.test.TestUtils.waitForCondition(() -> {
                return kafkaStreams.state() == KafkaStreams.State.RUNNING;
            }, "Streams never started.");
            kafkaStreams.close(Duration.ZERO);
            MatcherAssert.assertThat(Boolean.valueOf(kafkaStreams.state() == KafkaStreams.State.PENDING_SHUTDOWN), Matchers.equalTo(true));
            Objects.requireNonNull(kafkaStreams);
            Assertions.assertThrows(IllegalStateException.class, kafkaStreams::cleanUp);
            MatcherAssert.assertThat(Boolean.valueOf(kafkaStreams.state() == KafkaStreams.State.PENDING_SHUTDOWN), Matchers.equalTo(true));
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldThrowOnCleanupWhileShuttingDownStreamClosedWithCloseOptionLeaveGroupFalse() throws Exception {
        prepareStreams();
        AtomicReference<StreamThread.State> prepareStreamThread = prepareStreamThread(this.streamThreadOne, 1);
        AtomicReference<StreamThread.State> prepareStreamThread2 = prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareThreadState(this.streamThreadOne, prepareStreamThread);
        prepareThreadState(this.streamThreadTwo, prepareStreamThread2);
        prepareTerminableThread(this.streamThreadOne);
        MockClientSupplier mockClientSupplier = (MockClientSupplier) Mockito.spy(MockClientSupplier.class);
        Mockito.when(mockClientSupplier.getAdmin((Map) ArgumentMatchers.any())).thenReturn(this.adminClient);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, mockClientSupplier, this.time);
        try {
            kafkaStreams.start();
            org.apache.kafka.test.TestUtils.waitForCondition(() -> {
                return kafkaStreams.state() == KafkaStreams.State.RUNNING;
            }, "Streams never started.");
            KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
            closeOptions.timeout(Duration.ZERO);
            closeOptions.leaveGroup(true);
            kafkaStreams.close(closeOptions);
            MatcherAssert.assertThat(Boolean.valueOf(kafkaStreams.state() == KafkaStreams.State.PENDING_SHUTDOWN), Matchers.equalTo(true));
            Objects.requireNonNull(kafkaStreams);
            Assertions.assertThrows(IllegalStateException.class, kafkaStreams::cleanUp);
            MatcherAssert.assertThat(Boolean.valueOf(kafkaStreams.state() == KafkaStreams.State.PENDING_SHUTDOWN), Matchers.equalTo(true));
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldThrowOnCleanupWhileShuttingDownStreamClosedWithCloseOptionLeaveGroupTrue() throws Exception {
        prepareStreams();
        AtomicReference<StreamThread.State> prepareStreamThread = prepareStreamThread(this.streamThreadOne, 1);
        AtomicReference<StreamThread.State> prepareStreamThread2 = prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareThreadState(this.streamThreadOne, prepareStreamThread);
        prepareThreadState(this.streamThreadTwo, prepareStreamThread2);
        prepareTerminableThread(this.streamThreadOne);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.start();
            org.apache.kafka.test.TestUtils.waitForCondition(() -> {
                return kafkaStreams.state() == KafkaStreams.State.RUNNING;
            }, "Streams never started.");
            KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
            closeOptions.timeout(Duration.ZERO);
            kafkaStreams.close(closeOptions);
            MatcherAssert.assertThat(Boolean.valueOf(kafkaStreams.state() == KafkaStreams.State.PENDING_SHUTDOWN), Matchers.equalTo(true));
            Objects.requireNonNull(kafkaStreams);
            Assertions.assertThrows(IllegalStateException.class, kafkaStreams::cleanUp);
            MatcherAssert.assertThat(Boolean.valueOf(kafkaStreams.state() == KafkaStreams.State.PENDING_SHUTDOWN), Matchers.equalTo(true));
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldNotGetAllTasksWhenNotRunning() throws Exception {
        prepareStreams();
        AtomicReference<StreamThread.State> prepareStreamThread = prepareStreamThread(this.streamThreadOne, 1);
        AtomicReference<StreamThread.State> prepareStreamThread2 = prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareThreadState(this.streamThreadOne, prepareStreamThread);
        prepareThreadState(this.streamThreadTwo, prepareStreamThread2);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            Objects.requireNonNull(kafkaStreams);
            Assertions.assertThrows(StreamsNotStartedException.class, kafkaStreams::metadataForAllStreamsClients);
            kafkaStreams.start();
            TestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
            kafkaStreams.close();
            TestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
            Objects.requireNonNull(kafkaStreams);
            Assertions.assertThrows(IllegalStateException.class, kafkaStreams::metadataForAllStreamsClients);
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldNotGetAllTasksWithStoreWhenNotRunning() throws Exception {
        prepareStreams();
        AtomicReference<StreamThread.State> prepareStreamThread = prepareStreamThread(this.streamThreadOne, 1);
        AtomicReference<StreamThread.State> prepareStreamThread2 = prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareThreadState(this.streamThreadOne, prepareStreamThread);
        prepareThreadState(this.streamThreadTwo, prepareStreamThread2);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            Assertions.assertThrows(StreamsNotStartedException.class, () -> {
                kafkaStreams.streamsMetadataForStore("store");
            });
            kafkaStreams.start();
            TestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
            kafkaStreams.close();
            TestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
            Assertions.assertThrows(IllegalStateException.class, () -> {
                kafkaStreams.streamsMetadataForStore("store");
            });
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldNotGetQueryMetadataWithSerializerWhenNotRunningOrRebalancing() throws Exception {
        prepareStreams();
        AtomicReference<StreamThread.State> prepareStreamThread = prepareStreamThread(this.streamThreadOne, 1);
        AtomicReference<StreamThread.State> prepareStreamThread2 = prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareThreadState(this.streamThreadOne, prepareStreamThread);
        prepareThreadState(this.streamThreadTwo, prepareStreamThread2);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            Assertions.assertThrows(StreamsNotStartedException.class, () -> {
                kafkaStreams.queryMetadataForKey("store", "key", new StringSerializer());
            });
            kafkaStreams.start();
            TestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
            kafkaStreams.close();
            TestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
            Assertions.assertThrows(IllegalStateException.class, () -> {
                kafkaStreams.queryMetadataForKey("store", "key", new StringSerializer());
            });
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldGetQueryMetadataWithSerializerWhenRunningOrRebalancing() {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.start();
            Assertions.assertEquals(KeyQueryMetadata.NOT_AVAILABLE, kafkaStreams.queryMetadataForKey("store", "key", new StringSerializer()));
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldNotGetQueryMetadataWithPartitionerWhenNotRunningOrRebalancing() throws Exception {
        prepareStreams();
        AtomicReference<StreamThread.State> prepareStreamThread = prepareStreamThread(this.streamThreadOne, 1);
        AtomicReference<StreamThread.State> prepareStreamThread2 = prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareThreadState(this.streamThreadOne, prepareStreamThread);
        prepareThreadState(this.streamThreadTwo, prepareStreamThread2);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            Assertions.assertThrows(StreamsNotStartedException.class, () -> {
                kafkaStreams.queryMetadataForKey("store", "key", (str, str2, obj, i) -> {
                    return Optional.of(Collections.singleton(0));
                });
            });
            kafkaStreams.start();
            TestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
            kafkaStreams.close();
            TestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
            Assertions.assertThrows(IllegalStateException.class, () -> {
                kafkaStreams.queryMetadataForKey("store", "key", (str, str2, obj, i) -> {
                    return Optional.of(Collections.singleton(0));
                });
            });
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldThrowUnknownStateStoreExceptionWhenStoreNotExist() throws Exception {
        prepareStreams();
        AtomicReference<StreamThread.State> prepareStreamThread = prepareStreamThread(this.streamThreadOne, 1);
        AtomicReference<StreamThread.State> prepareStreamThread2 = prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareThreadState(this.streamThreadOne, prepareStreamThread);
        prepareThreadState(this.streamThreadTwo, prepareStreamThread2);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.start();
            TestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
            Assertions.assertThrows(UnknownStateStoreException.class, () -> {
                kafkaStreams.store(StoreQueryParameters.fromNameAndType("unknown-store", QueryableStoreTypes.keyValueStore()));
            });
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldNotGetStoreWhenWhenNotRunningOrRebalancing() throws Exception {
        prepareStreams();
        AtomicReference<StreamThread.State> prepareStreamThread = prepareStreamThread(this.streamThreadOne, 1);
        AtomicReference<StreamThread.State> prepareStreamThread2 = prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareThreadState(this.streamThreadOne, prepareStreamThread);
        prepareThreadState(this.streamThreadTwo, prepareStreamThread2);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            Assertions.assertThrows(StreamsNotStartedException.class, () -> {
                kafkaStreams.store(StoreQueryParameters.fromNameAndType("store", QueryableStoreTypes.keyValueStore()));
            });
            kafkaStreams.start();
            TestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
            kafkaStreams.close();
            TestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
            Assertions.assertThrows(IllegalStateException.class, () -> {
                kafkaStreams.store(StoreQueryParameters.fromNameAndType("store", QueryableStoreTypes.keyValueStore()));
            });
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldReturnEmptyLocalStorePartitionLags() {
        prepareStreams();
        AtomicReference<StreamThread.State> prepareStreamThread = prepareStreamThread(this.streamThreadOne, 1);
        AtomicReference<StreamThread.State> prepareStreamThread2 = prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareThreadState(this.streamThreadOne, prepareStreamThread);
        prepareThreadState(this.streamThreadTwo, prepareStreamThread2);
        new KafkaFutureImpl().complete(Collections.emptyMap());
        MockAdminClient mockAdminClient = (MockAdminClient) Mockito.spy(MockAdminClient.class);
        MockClientSupplier mockClientSupplier = (MockClientSupplier) Mockito.spy(MockClientSupplier.class);
        Mockito.when(mockClientSupplier.getAdmin((Map) ArgumentMatchers.any())).thenReturn(mockAdminClient);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, mockClientSupplier, this.time);
        try {
            kafkaStreams.start();
            Assertions.assertEquals(0, kafkaStreams.allLocalStorePartitionLags().size());
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() throws Exception {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareTerminableThread(this.streamThreadOne);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier);
        try {
            Assertions.assertFalse(kafkaStreams.close(Duration.ofMillis(10L)));
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldThrowOnNegativeTimeoutForClose() throws Exception {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareTerminableThread(this.streamThreadOne);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            Assertions.assertThrows(IllegalArgumentException.class, () -> {
                kafkaStreams.close(Duration.ofMillis(-1L));
            });
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldNotBlockInCloseForZeroDuration() throws Exception {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareTerminableThread(this.streamThreadOne);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            Assertions.assertFalse(kafkaStreams.close(Duration.ZERO));
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldReturnFalseOnCloseWithCloseOptionWithLeaveGroupFalseWhenThreadsHaventTerminated() throws Exception {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareTerminableThread(this.streamThreadOne);
        KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
        closeOptions.timeout(Duration.ofMillis(10L));
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier);
        try {
            Assertions.assertFalse(kafkaStreams.close(closeOptions));
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldThrowOnNegativeTimeoutForCloseWithCloseOptionLeaveGroupFalse() throws Exception {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareTerminableThread(this.streamThreadOne);
        KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
        closeOptions.timeout(Duration.ofMillis(-1L));
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            Assertions.assertThrows(IllegalArgumentException.class, () -> {
                kafkaStreams.close(closeOptions);
            });
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldNotBlockInCloseWithCloseOptionLeaveGroupFalseForZeroDuration() throws Exception {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareTerminableThread(this.streamThreadOne);
        KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
        closeOptions.timeout(Duration.ZERO);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier);
        try {
            Assertions.assertFalse(kafkaStreams.close(closeOptions));
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldReturnFalseOnCloseWithCloseOptionWithLeaveGroupTrueWhenThreadsHaventTerminated() throws Exception {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareTerminableThread(this.streamThreadOne);
        MockClientSupplier mockClientSupplier = (MockClientSupplier) Mockito.spy(MockClientSupplier.class);
        Mockito.when(mockClientSupplier.getAdmin((Map) ArgumentMatchers.any())).thenReturn(this.adminClient);
        KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
        closeOptions.timeout(Duration.ofMillis(10L));
        closeOptions.leaveGroup(true);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, mockClientSupplier);
        try {
            Assertions.assertFalse(kafkaStreams.close(closeOptions));
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldThrowOnNegativeTimeoutForCloseWithCloseOptionLeaveGroupTrue() throws Exception {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareTerminableThread(this.streamThreadOne);
        MockClientSupplier mockClientSupplier = (MockClientSupplier) Mockito.spy(MockClientSupplier.class);
        Mockito.when(mockClientSupplier.getAdmin((Map) ArgumentMatchers.any())).thenReturn(this.adminClient);
        KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
        closeOptions.timeout(Duration.ofMillis(-1L));
        closeOptions.leaveGroup(true);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, mockClientSupplier, this.time);
        try {
            Assertions.assertThrows(IllegalArgumentException.class, () -> {
                kafkaStreams.close(closeOptions);
            });
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldNotBlockInCloseWithCloseOptionLeaveGroupTrueForZeroDuration() throws Exception {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareTerminableThread(this.streamThreadOne);
        MockClientSupplier mockClientSupplier = (MockClientSupplier) Mockito.spy(MockClientSupplier.class);
        Mockito.when(mockClientSupplier.getAdmin((Map) ArgumentMatchers.any())).thenReturn(this.adminClient);
        KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
        closeOptions.timeout(Duration.ZERO);
        closeOptions.leaveGroup(true);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, mockClientSupplier);
        try {
            Assertions.assertFalse(kafkaStreams.close(closeOptions));
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsDebug() throws Exception {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareTerminableThread(this.streamThreadOne);
        MockedStatic mockStatic = Mockito.mockStatic(Executors.class);
        try {
            ScheduledExecutorService scheduledExecutorService = (ScheduledExecutorService) Mockito.mock(ScheduledExecutorService.class);
            ScheduledExecutorService scheduledExecutorService2 = (ScheduledExecutorService) Mockito.mock(ScheduledExecutorService.class);
            mockStatic.when(() -> {
                Executors.newSingleThreadScheduledExecutor((ThreadFactory) ArgumentMatchers.any(ThreadFactory.class));
            }).thenReturn(scheduledExecutorService, new Object[]{scheduledExecutorService2});
            new StreamsBuilder().table(AssignmentTestUtils.TOPIC_PREFIX, Materialized.as("store"));
            this.props.setProperty("metrics.recording.level", Sensor.RecordingLevel.DEBUG.name());
            KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
            try {
                kafkaStreams.start();
                kafkaStreams.close();
                mockStatic.verify(() -> {
                    Executors.newSingleThreadScheduledExecutor((ThreadFactory) ArgumentMatchers.any(ThreadFactory.class));
                }, Mockito.times(NUM_THREADS));
                ((ScheduledExecutorService) Mockito.verify(scheduledExecutorService2)).scheduleAtFixedRate((Runnable) ArgumentMatchers.any(RocksDBMetricsRecordingTrigger.class), Mockito.eq(0L), Mockito.eq(1L), (TimeUnit) Mockito.eq(TimeUnit.MINUTES));
                ((ScheduledExecutorService) Mockito.verify(scheduledExecutorService2)).shutdownNow();
                if (mockStatic != null) {
                    mockStatic.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (mockStatic != null) {
                try {
                    mockStatic.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void shouldGetClientSupplierFromConfigForConstructor() throws Exception {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareTerminableThread(this.streamThreadOne);
        prepareTerminableThread(this.streamThreadTwo);
        StreamsConfig streamsConfig = (StreamsConfig) Mockito.spy(new StreamsConfig(this.props));
        Mockito.when(streamsConfig.getKafkaClientSupplier()).thenReturn(this.supplier);
        new KafkaStreams(getBuilderWithSource().build(), streamsConfig).close();
        ((StreamsConfig) Mockito.verify(streamsConfig, Mockito.times(NUM_THREADS))).getKafkaClientSupplier();
    }

    @Test
    public void shouldGetClientSupplierFromConfigForConstructorWithTime() throws Exception {
        prepareStreams();
        AtomicReference<StreamThread.State> prepareStreamThread = prepareStreamThread(this.streamThreadOne, 1);
        AtomicReference<StreamThread.State> prepareStreamThread2 = prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareThreadState(this.streamThreadOne, prepareStreamThread);
        prepareThreadState(this.streamThreadTwo, prepareStreamThread2);
        StreamsConfig streamsConfig = (StreamsConfig) Mockito.spy(new StreamsConfig(this.props));
        Mockito.when(streamsConfig.getKafkaClientSupplier()).thenReturn(this.supplier);
        new KafkaStreams(getBuilderWithSource().build(), streamsConfig, this.time).close();
        ((StreamsConfig) Mockito.verify(streamsConfig, Mockito.times(NUM_THREADS))).getKafkaClientSupplier();
    }

    @Test
    public void shouldUseProvidedClientSupplier() throws Exception {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        prepareTerminableThread(this.streamThreadOne);
        prepareTerminableThread(this.streamThreadTwo);
        StreamsConfig streamsConfig = (StreamsConfig) Mockito.spy(new StreamsConfig(this.props));
        new KafkaStreams(getBuilderWithSource().build(), streamsConfig, this.supplier).close();
        ((StreamsConfig) Mockito.verify(streamsConfig, Mockito.times(0))).getKafkaClientSupplier();
    }

    @Test
    public void shouldNotTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsInfo() {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        MockedStatic mockStatic = Mockito.mockStatic(Executors.class);
        try {
            mockStatic.when(() -> {
                Executors.newSingleThreadScheduledExecutor((ThreadFactory) ArgumentMatchers.any(ThreadFactory.class));
            }).thenReturn((ScheduledExecutorService) Mockito.mock(ScheduledExecutorService.class));
            new StreamsBuilder().table(AssignmentTestUtils.TOPIC_PREFIX, Materialized.as("store"));
            this.props.setProperty("metrics.recording.level", Sensor.RecordingLevel.INFO.name());
            KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
            try {
                kafkaStreams.start();
                kafkaStreams.close();
                mockStatic.verify(() -> {
                    Executors.newSingleThreadScheduledExecutor((ThreadFactory) ArgumentMatchers.any(ThreadFactory.class));
                });
                if (mockStatic != null) {
                    mockStatic.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (mockStatic != null) {
                try {
                    mockStatic.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void shouldCleanupOldStateDirs() {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        MockedStatic mockStatic = Mockito.mockStatic(Executors.class);
        try {
            ScheduledExecutorService scheduledExecutorService = (ScheduledExecutorService) Mockito.mock(ScheduledExecutorService.class);
            mockStatic.when(() -> {
                Executors.newSingleThreadScheduledExecutor((ThreadFactory) ArgumentMatchers.any(ThreadFactory.class));
            }).thenReturn(scheduledExecutorService);
            MockedConstruction mockConstruction = Mockito.mockConstruction(StateDirectory.class, (stateDirectory, context) -> {
                Mockito.when(stateDirectory.initializeProcessId()).thenReturn(UUID.randomUUID());
            });
            try {
                this.props.setProperty("state.cleanup.delay.ms", "1");
                StreamsBuilder streamsBuilder = new StreamsBuilder();
                streamsBuilder.table(AssignmentTestUtils.TOPIC_PREFIX, Materialized.as("store"));
                KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), this.props, this.supplier, this.time);
                try {
                    kafkaStreams.start();
                    kafkaStreams.close();
                    if (mockConstruction != null) {
                        mockConstruction.close();
                    }
                    ((ScheduledExecutorService) Mockito.verify(scheduledExecutorService)).scheduleAtFixedRate((Runnable) ArgumentMatchers.any(Runnable.class), Mockito.eq(1L), Mockito.eq(1L), (TimeUnit) Mockito.eq(TimeUnit.MILLISECONDS));
                    ((ScheduledExecutorService) Mockito.verify(scheduledExecutorService)).shutdownNow();
                    if (mockStatic != null) {
                        mockStatic.close();
                    }
                } catch (Throwable th) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (mockStatic != null) {
                try {
                    mockStatic.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    public void statelessTopologyShouldNotCreateStateDirectory(TestInfo testInfo) {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        String safeUniqueTestName = TestUtils.safeUniqueTestName(testInfo);
        String str = safeUniqueTestName + "-input";
        String str2 = safeUniqueTestName + "-output";
        Topology topology = new Topology();
        topology.addSource("source", new StringDeserializer(), new StringDeserializer(), new String[]{str}).addProcessor("process", () -> {
            return new Processor<String, String, String, String>() { // from class: org.apache.kafka.streams.KafkaStreamsTest.1
                private ProcessorContext<String, String> context;

                public void init(ProcessorContext<String, String> processorContext) {
                    this.context = processorContext;
                }

                public void process(Record<String, String> record) {
                    if (((String) record.value()).length() % KafkaStreamsTest.NUM_THREADS == 0) {
                        this.context.forward(record.withValue(((String) record.key()) + ((String) record.value())));
                    }
                }
            };
        }, new String[]{"source"}).addSink("sink", str2, new StringSerializer(), new StringSerializer(), new String[]{"process"});
        startStreamsAndCheckDirExists(topology, false);
    }

    @Test
    public void inMemoryStatefulTopologyShouldNotCreateStateDirectory(TestInfo testInfo) {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        String safeUniqueTestName = TestUtils.safeUniqueTestName(testInfo);
        startStreamsAndCheckDirExists(getStatefulTopology(safeUniqueTestName + "-input", safeUniqueTestName + "-output", safeUniqueTestName + "-global", safeUniqueTestName + "-counts", safeUniqueTestName + "-globalStore", false), false);
    }

    @Test
    public void statefulTopologyShouldCreateStateDirectory(TestInfo testInfo) {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        String safeUniqueTestName = TestUtils.safeUniqueTestName(testInfo);
        startStreamsAndCheckDirExists(getStatefulTopology(safeUniqueTestName + "-input", safeUniqueTestName + "-output", safeUniqueTestName + "-global", safeUniqueTestName + "-counts", safeUniqueTestName + "-globalStore", true), true);
    }

    @Test
    public void shouldThrowTopologyExceptionOnEmptyTopology() {
        prepareStreams();
        try {
            KafkaStreams kafkaStreams = new KafkaStreams(new StreamsBuilder().build(), this.props, this.supplier, this.time);
            try {
                Assertions.fail("Should have thrown TopologyException");
                kafkaStreams.close();
            } finally {
            }
        } catch (TopologyException e) {
            MatcherAssert.assertThat(e.getMessage(), Matchers.equalTo("Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table."));
        }
    }

    @Test
    public void shouldNotCreateStreamThreadsForGlobalOnlyTopology() {
        prepareStreams();
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.globalTable("anyTopic");
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), this.props, this.supplier, this.time);
        try {
            MatcherAssert.assertThat(Integer.valueOf(kafkaStreams.threads.size()), Matchers.equalTo(0));
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldTransitToRunningWithGlobalOnlyTopology() throws Exception {
        prepareStreams();
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.globalTable("anyTopic");
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), this.props, this.supplier, this.time);
        try {
            MatcherAssert.assertThat(Integer.valueOf(kafkaStreams.threads.size()), Matchers.equalTo(0));
            Assertions.assertEquals(kafkaStreams.state(), KafkaStreams.State.CREATED);
            kafkaStreams.start();
            org.apache.kafka.test.TestUtils.waitForCondition(() -> {
                return kafkaStreams.state() == KafkaStreams.State.RUNNING;
            }, "Streams never started, state is " + String.valueOf(kafkaStreams.state()));
            kafkaStreams.close();
            org.apache.kafka.test.TestUtils.waitForCondition(() -> {
                return kafkaStreams.state() == KafkaStreams.State.NOT_RUNNING;
            }, "Streams never stopped.");
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldThrowOnClientInstanceIdsWithNegativeTimeout() {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            MatcherAssert.assertThat(((IllegalArgumentException) Assertions.assertThrows(IllegalArgumentException.class, () -> {
                kafkaStreams.clientInstanceIds(Duration.ofMillis(-1L));
            })).getMessage(), Matchers.equalTo("The timeout cannot be negative."));
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldThrowOnClientInstanceIdsWhenNotStarted() {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            MatcherAssert.assertThat(((IllegalStateException) Assertions.assertThrows(IllegalStateException.class, () -> {
                kafkaStreams.clientInstanceIds(Duration.ZERO);
            })).getMessage(), Matchers.equalTo("KafkaStreams has not been started, you can retry after calling start()."));
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldThrowOnClientInstanceIdsWhenClosed() {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.close();
            MatcherAssert.assertThat(((IllegalStateException) Assertions.assertThrows(IllegalStateException.class, () -> {
                kafkaStreams.clientInstanceIds(Duration.ZERO);
            })).getMessage(), Matchers.equalTo("KafkaStreams has been stopped (NOT_RUNNING)."));
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldThrowStreamsExceptionWhenAdminNotInitialized() {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.start();
            StreamsException assertThrows = Assertions.assertThrows(StreamsException.class, () -> {
                kafkaStreams.clientInstanceIds(Duration.ZERO);
            });
            MatcherAssert.assertThat(assertThrows.getMessage(), Matchers.equalTo("Could not retrieve admin client instance id."));
            Throwable cause = assertThrows.getCause();
            MatcherAssert.assertThat(cause, Matchers.instanceOf(UnsupportedOperationException.class));
            MatcherAssert.assertThat(cause.getMessage(), Matchers.equalTo("clientInstanceId not set"));
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldNotCrashButThrowLaterIfAdminTelemetryDisabled() {
        prepareStreams();
        this.adminClient.disableTelemetry();
        this.props.put("num.stream.threads", 0);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.start();
            ClientInstanceIds clientInstanceIds = kafkaStreams.clientInstanceIds(Duration.ZERO);
            Objects.requireNonNull(clientInstanceIds);
            MatcherAssert.assertThat(((IllegalStateException) Assertions.assertThrows(IllegalStateException.class, clientInstanceIds::adminInstanceId)).getMessage(), Matchers.equalTo("Telemetry is not enabled on the admin client. Set config `enable.metrics.push` to `true`."));
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldThrowTimeExceptionWhenAdminTimesOut() {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        this.adminClient.setClientInstanceId(Uuid.randomUuid());
        this.adminClient.injectTimeoutException(1);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.start();
            Assertions.assertThrows(TimeoutException.class, () -> {
                kafkaStreams.clientInstanceIds(Duration.ZERO);
            });
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldReturnAdminInstanceID() {
        prepareStreams();
        Uuid randomUuid = Uuid.randomUuid();
        this.adminClient.setClientInstanceId(randomUuid);
        this.props.put("num.stream.threads", 0);
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.start();
            MatcherAssert.assertThat(kafkaStreams.clientInstanceIds(Duration.ZERO).adminInstanceId(), Matchers.equalTo(randomUuid));
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldReturnProducerAndConsumerInstanceIds() {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        this.props.put("num.stream.threads", 1);
        Uuid randomUuid = Uuid.randomUuid();
        Uuid randomUuid2 = Uuid.randomUuid();
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        KafkaFutureImpl kafkaFutureImpl2 = new KafkaFutureImpl();
        kafkaFutureImpl.complete(randomUuid);
        kafkaFutureImpl2.complete(randomUuid2);
        Uuid randomUuid3 = Uuid.randomUuid();
        this.adminClient.setClientInstanceId(randomUuid3);
        Mockito.when(this.streamThreadOne.clientInstanceIds((Duration) ArgumentMatchers.any())).thenReturn(Map.of("main-consumer", kafkaFutureImpl, "some-thread-producer", kafkaFutureImpl2));
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.start();
            ClientInstanceIds clientInstanceIds = kafkaStreams.clientInstanceIds(Duration.ZERO);
            MatcherAssert.assertThat(Integer.valueOf(clientInstanceIds.consumerInstanceIds().size()), Matchers.equalTo(1));
            MatcherAssert.assertThat((Uuid) clientInstanceIds.consumerInstanceIds().get("main-consumer"), Matchers.equalTo(randomUuid));
            MatcherAssert.assertThat(Integer.valueOf(clientInstanceIds.producerInstanceIds().size()), Matchers.equalTo(1));
            MatcherAssert.assertThat((Uuid) clientInstanceIds.producerInstanceIds().get("some-thread-producer"), Matchers.equalTo(randomUuid2));
            MatcherAssert.assertThat(clientInstanceIds.adminInstanceId(), Matchers.equalTo(randomUuid3));
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldThrowTimeoutExceptionWhenAnyClientFutureDoesNotComplete() {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        Mockito.when(this.streamThreadOne.clientInstanceIds((Duration) ArgumentMatchers.any())).thenReturn(Collections.singletonMap("some-client", new KafkaFutureImpl()));
        this.adminClient.setClientInstanceId(Uuid.randomUuid());
        KafkaStreams kafkaStreams = new KafkaStreams(getBuilderWithSource().build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.start();
            TimeoutException assertThrows = Assertions.assertThrows(TimeoutException.class, () -> {
                kafkaStreams.clientInstanceIds(Duration.ZERO);
            });
            MatcherAssert.assertThat(assertThrows.getMessage(), Matchers.equalTo("Could not retrieve consumer/producer instance id for some-client."));
            MatcherAssert.assertThat(assertThrows.getCause(), Matchers.instanceOf(java.util.concurrent.TimeoutException.class));
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldThrowTimeoutExceptionWhenGlobalConsumerFutureDoesNotComplete() {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        this.adminClient.setClientInstanceId(Uuid.randomUuid());
        StreamsBuilder builderWithSource = getBuilderWithSource();
        builderWithSource.globalTable("anyTopic");
        KafkaStreams kafkaStreams = new KafkaStreams(builderWithSource.build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.start();
            Mockito.when(((GlobalStreamThread) this.globalStreamThreadMockedConstruction.constructed().get(0)).globalConsumerInstanceId((Duration) ArgumentMatchers.any())).thenReturn(new KafkaFutureImpl());
            TimeoutException assertThrows = Assertions.assertThrows(TimeoutException.class, () -> {
                kafkaStreams.clientInstanceIds(Duration.ZERO);
            });
            MatcherAssert.assertThat(assertThrows.getMessage(), Matchers.equalTo("Could not retrieve global consumer client instance id."));
            MatcherAssert.assertThat(assertThrows.getCause(), Matchers.instanceOf(java.util.concurrent.TimeoutException.class));
            kafkaStreams.close();
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldCountDownTimeoutAcrossClient() {
        prepareStreams();
        prepareStreamThread(this.streamThreadOne, 1);
        prepareStreamThread(this.streamThreadTwo, NUM_THREADS);
        this.adminClient.setClientInstanceId(Uuid.randomUuid());
        this.adminClient.advanceTimeOnClientInstanceId(this.time, Duration.ofMillis(10L).toMillis());
        final MockTime mockTime = this.time;
        final AtomicLong atomicLong = new AtomicLong(50L);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        final AtomicBoolean atomicBoolean3 = new AtomicBoolean(false);
        Mockito.when(this.streamThreadOne.clientInstanceIds((Duration) ArgumentMatchers.any())).thenReturn(Collections.singletonMap("any-client-1", new KafkaFutureImpl<Uuid>() { // from class: org.apache.kafka.streams.KafkaStreamsTest.2
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public Uuid m1get(long j, TimeUnit timeUnit) {
                atomicBoolean.set(true);
                MatcherAssert.assertThat(Long.valueOf(j), Matchers.equalTo(Long.valueOf(atomicLong.getAndAdd(-10L))));
                mockTime.sleep(10L);
                return null;
            }
        }));
        Mockito.when(this.streamThreadTwo.clientInstanceIds((Duration) ArgumentMatchers.any())).thenReturn(Collections.singletonMap("any-client-2", new KafkaFutureImpl<Uuid>() { // from class: org.apache.kafka.streams.KafkaStreamsTest.3
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public Uuid m2get(long j, TimeUnit timeUnit) {
                atomicBoolean2.set(true);
                MatcherAssert.assertThat(Long.valueOf(j), Matchers.equalTo(Long.valueOf(atomicLong.getAndAdd(-5L))));
                mockTime.sleep(5L);
                return null;
            }
        }));
        StreamsBuilder builderWithSource = getBuilderWithSource();
        builderWithSource.globalTable("anyTopic");
        KafkaStreams kafkaStreams = new KafkaStreams(builderWithSource.build(), this.props, this.supplier, this.time);
        try {
            kafkaStreams.start();
            Mockito.when(((GlobalStreamThread) this.globalStreamThreadMockedConstruction.constructed().get(0)).globalConsumerInstanceId((Duration) ArgumentMatchers.any())).thenReturn(new KafkaFutureImpl<Uuid>() { // from class: org.apache.kafka.streams.KafkaStreamsTest.4
                /* renamed from: get, reason: merged with bridge method [inline-methods] */
                public Uuid m3get(long j, TimeUnit timeUnit) {
                    atomicBoolean3.set(true);
                    MatcherAssert.assertThat(Long.valueOf(j), Matchers.equalTo(Long.valueOf(atomicLong.getAndAdd(-8L))));
                    mockTime.sleep(8L);
                    return null;
                }
            });
            kafkaStreams.clientInstanceIds(Duration.ofMillis(60L));
            kafkaStreams.close();
            MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean.get()), Matchers.equalTo(true));
            MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean2.get()), Matchers.equalTo(true));
            MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean3.get()), Matchers.equalTo(true));
        } catch (Throwable th) {
            try {
                kafkaStreams.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private Topology getStatefulTopology(String str, String str2, String str3, String str4, String str5, boolean z) {
        StoreBuilder keyValueStoreBuilder = Stores.keyValueStoreBuilder(z ? Stores.persistentKeyValueStore(str4) : Stores.inMemoryKeyValueStore(str4), Serdes.String(), Serdes.Long());
        Topology topology = new Topology();
        topology.addSource("source", new StringDeserializer(), new StringDeserializer(), new String[]{str}).addProcessor("process", () -> {
            return new Processor<String, String, String, String>() { // from class: org.apache.kafka.streams.KafkaStreamsTest.5
                private ProcessorContext<String, String> context;

                public void init(ProcessorContext<String, String> processorContext) {
                    this.context = processorContext;
                }

                public void process(Record<String, String> record) {
                    this.context.getStateStore(str4).put((String) record.key(), 5L);
                    this.context.forward(record.withValue("5"));
                    this.context.commit();
                }
            };
        }, new String[]{"source"}).addStateStore(keyValueStoreBuilder, new String[]{"process"}).addSink("sink", str2, new StringSerializer(), new StringSerializer(), new String[]{"process"});
        topology.addGlobalStore(Stores.keyValueStoreBuilder(z ? Stores.persistentKeyValueStore(str5) : Stores.inMemoryKeyValueStore(str5), Serdes.String(), Serdes.String()).withLoggingDisabled(), "global", new StringDeserializer(), new StringDeserializer(), str3, str3 + "-processor", new MockProcessorSupplier());
        return topology;
    }

    private StreamsBuilder getBuilderWithSource() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("source-topic");
        return streamsBuilder;
    }

    private void startStreamsAndCheckDirExists(Topology topology, boolean z) {
        MockedConstruction mockConstruction = Mockito.mockConstruction(StateDirectory.class, (stateDirectory, context) -> {
            Mockito.when(stateDirectory.initializeProcessId()).thenReturn(UUID.randomUUID());
            Assertions.assertEquals(4, context.arguments().size());
            Assertions.assertEquals(Boolean.valueOf(z), context.arguments().get(NUM_THREADS));
        });
        try {
            KafkaStreams kafkaStreams = new KafkaStreams(topology, this.props, this.supplier, this.time);
            try {
                Assertions.assertFalse(mockConstruction.constructed().isEmpty());
                kafkaStreams.close();
                if (mockConstruction != null) {
                    mockConstruction.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (mockConstruction != null) {
                try {
                    mockConstruction.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
