package org.apache.kafka.streams;

import java.io.File;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/kafka/streams/TopologyTestDriverTest.class */
public class TopologyTestDriverTest {
    private static final String SOURCE_TOPIC_1 = "source-topic-1";
    private static final String SOURCE_TOPIC_2 = "source-topic-2";
    private static final String SINK_TOPIC_1 = "sink-topic-1";
    private static final String SINK_TOPIC_2 = "sink-topic-2";
    private TopologyTestDriver testDriver;
    private KeyValueStore<String, Long> store;
    private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
    private final byte[] key1 = new byte[0];
    private final byte[] value1 = new byte[0];
    private final long timestamp1 = 42;
    private final TestRecord<byte[], byte[]> testRecord1 = new TestRecord<>(this.key1, this.value1, this.headers, 42L);
    private final byte[] key2 = new byte[0];
    private final byte[] value2 = new byte[0];
    private final long timestamp2 = 43;
    private final ConsumerRecordFactory<byte[], byte[]> consumerRecordFactory = new ConsumerRecordFactory<>(new ByteArraySerializer(), new ByteArraySerializer());
    private final ConsumerRecord<byte[], byte[]> consumerRecord1 = this.consumerRecordFactory.create(SOURCE_TOPIC_1, this.key1, this.value1, this.headers, 42);
    private final ConsumerRecord<byte[], byte[]> consumerRecord2 = this.consumerRecordFactory.create(SOURCE_TOPIC_2, this.key2, this.value2, 43);
    private final Properties config = Utils.mkProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("application.id", "test-TopologyTestDriver"), Utils.mkEntry("bootstrap.servers", "dummy:1234"), Utils.mkEntry("state.dir", TestUtils.tempDirectory().getAbsolutePath())}));
    private final StringDeserializer stringDeserializer = new StringDeserializer();
    private final LongDeserializer longDeserializer = new LongDeserializer();
    private final List<MockProcessor> mockProcessors = new ArrayList();

    /* loaded from: input_file:org/apache/kafka/streams/TopologyTestDriverTest$CustomMaxAggregator.class */
    private static class CustomMaxAggregator implements Processor<String, Long> {
        ProcessorContext context;
        private KeyValueStore<String, Long> store;

        private CustomMaxAggregator() {
        }

        public void init(ProcessorContext processorContext) {
            this.context = processorContext;
            processorContext.schedule(Duration.ofMinutes(1L), PunctuationType.WALL_CLOCK_TIME, j -> {
                flushStore();
            });
            processorContext.schedule(Duration.ofSeconds(10L), PunctuationType.STREAM_TIME, j2 -> {
                flushStore();
            });
            this.store = processorContext.getStateStore("aggStore");
        }

        public void process(String str, Long l) {
            Long l2 = (Long) this.store.get(str);
            if (l2 == null || l.longValue() > l2.longValue()) {
                this.store.put(str, l);
            }
        }

        private void flushStore() {
            KeyValueIterator all = this.store.all();
            Throwable th = null;
            while (all.hasNext()) {
                try {
                    try {
                        KeyValue keyValue = (KeyValue) all.next();
                        this.context.forward(keyValue.key, keyValue.value);
                    } catch (Throwable th2) {
                        th = th2;
                        throw th2;
                    }
                } catch (Throwable th3) {
                    if (all != null) {
                        if (th != null) {
                            try {
                                all.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            all.close();
                        }
                    }
                    throw th3;
                }
            }
            if (all != null) {
                if (0 == 0) {
                    all.close();
                    return;
                }
                try {
                    all.close();
                } catch (Throwable th5) {
                    th.addSuppressed(th5);
                }
            }
        }

        public void close() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/TopologyTestDriverTest$CustomMaxAggregatorSupplier.class */
    public static class CustomMaxAggregatorSupplier implements ProcessorSupplier<String, Long> {
        private CustomMaxAggregatorSupplier() {
        }

        public Processor<String, Long> get() {
            return new CustomMaxAggregator();
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/TopologyTestDriverTest$MockProcessor.class */
    private static final class MockProcessor implements Processor<Object, Object> {
        private final Collection<Punctuation> punctuations;
        private ProcessorContext context;
        private boolean initialized = false;
        private boolean closed = false;
        private final List<Record> processedRecords = new ArrayList();

        MockProcessor(Collection<Punctuation> collection) {
            this.punctuations = collection;
        }

        public void init(ProcessorContext processorContext) {
            this.initialized = true;
            this.context = processorContext;
            for (Punctuation punctuation : this.punctuations) {
                this.context.schedule(Duration.ofMillis(punctuation.intervalMs), punctuation.punctuationType, punctuation.callback);
            }
        }

        public void process(Object obj, Object obj2) {
            this.processedRecords.add(new Record(obj, obj2, this.context.headers(), this.context.timestamp(), this.context.offset(), this.context.topic()));
            this.context.forward(obj, obj2);
        }

        public void close() {
            this.closed = true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/TopologyTestDriverTest$MockProcessorSupplier.class */
    public final class MockProcessorSupplier implements ProcessorSupplier<Object, Object> {
        private final Collection<Punctuation> punctuations;

        private MockProcessorSupplier(TopologyTestDriverTest topologyTestDriverTest) {
            this(Collections.emptySet());
        }

        private MockProcessorSupplier(Collection<Punctuation> collection) {
            this.punctuations = collection;
        }

        public Processor<Object, Object> get() {
            MockProcessor mockProcessor = new MockProcessor(this.punctuations);
            TopologyTestDriverTest.this.mockProcessors.add(mockProcessor);
            return mockProcessor;
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/TopologyTestDriverTest$MockPunctuator.class */
    private static final class MockPunctuator implements Punctuator {
        private final List<Long> punctuatedAt;

        private MockPunctuator() {
            this.punctuatedAt = new LinkedList();
        }

        public void punctuate(long j) {
            this.punctuatedAt.add(Long.valueOf(j));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/TopologyTestDriverTest$Punctuation.class */
    public static final class Punctuation {
        private final long intervalMs;
        private final PunctuationType punctuationType;
        private final Punctuator callback;

        Punctuation(long j, PunctuationType punctuationType, Punctuator punctuator) {
            this.intervalMs = j;
            this.punctuationType = punctuationType;
            this.callback = punctuator;
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/TopologyTestDriverTest$Record.class */
    private static final class Record {
        private final Object key;
        private final Object value;
        private final long timestamp;
        private final long offset;
        private final String topic;
        private final Headers headers;

        Record(ConsumerRecord<byte[], byte[]> consumerRecord, long j) {
            this.key = consumerRecord.key();
            this.value = consumerRecord.value();
            this.timestamp = consumerRecord.timestamp();
            this.offset = j;
            this.topic = consumerRecord.topic();
            this.headers = consumerRecord.headers();
        }

        Record(String str, TestRecord<byte[], byte[]> testRecord, long j) {
            this.key = testRecord.key();
            this.value = testRecord.value();
            this.timestamp = testRecord.timestamp().longValue();
            this.offset = j;
            this.topic = str;
            this.headers = testRecord.headers();
        }

        Record(Object obj, Object obj2, Headers headers, long j, long j2, String str) {
            this.key = obj;
            this.value = obj2;
            this.headers = headers;
            this.timestamp = j;
            this.offset = j2;
            this.topic = str;
        }

        public String toString() {
            return "key: " + this.key + ", value: " + this.value + ", timestamp: " + this.timestamp + ", offset: " + this.offset + ", topic: " + this.topic;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            Record record = (Record) obj;
            return this.timestamp == record.timestamp && this.offset == record.offset && Objects.equals(this.key, record.key) && Objects.equals(this.value, record.value) && Objects.equals(this.topic, record.topic) && Objects.equals(this.headers, record.headers);
        }

        public int hashCode() {
            return Objects.hash(this.key, this.value, this.headers, Long.valueOf(this.timestamp), Long.valueOf(this.offset), this.topic);
        }
    }

    @Parameterized.Parameters(name = "Eos enabled = {0}")
    public static Collection<Object[]> data() {
        ArrayList arrayList = new ArrayList();
        Iterator it = Arrays.asList(true, false).iterator();
        while (it.hasNext()) {
            arrayList.add(new Object[]{Boolean.valueOf(((Boolean) it.next()).booleanValue())});
        }
        return arrayList;
    }

    public TopologyTestDriverTest(boolean z) {
        if (z) {
            this.config.put("processing.guarantee", "exactly_once");
        }
    }

    @After
    public void tearDown() {
        if (this.testDriver != null) {
            this.testDriver.close();
        }
    }

    private Topology setupSourceSinkTopology() {
        Topology topology = new Topology();
        topology.addSource("source", new String[]{SOURCE_TOPIC_1});
        topology.addSink("sink", SINK_TOPIC_1, new String[]{"source"});
        return topology;
    }

    private Topology setupTopologyWithTwoSubtopologies() {
        Topology topology = new Topology();
        topology.addSource("source-1", new String[]{SOURCE_TOPIC_1});
        topology.addSink("sink-1", SINK_TOPIC_1, new String[]{"source-1"});
        topology.addSource("source-2", new String[]{SINK_TOPIC_1});
        topology.addSink("sink-2", SINK_TOPIC_2, new String[]{"source-2"});
        return topology;
    }

    private Topology setupSingleProcessorTopology() {
        return setupSingleProcessorTopology(-1L, null, null);
    }

    private Topology setupSingleProcessorTopology(long j, PunctuationType punctuationType, Punctuator punctuator) {
        Set emptySet = (j <= 0 || punctuationType == null || punctuator == null) ? Collections.emptySet() : Collections.singleton(new Punctuation(j, punctuationType, punctuator));
        Topology topology = new Topology();
        topology.addSource("source", new String[]{SOURCE_TOPIC_1});
        topology.addProcessor("processor", new MockProcessorSupplier(emptySet), new String[]{"source"});
        return topology;
    }

    private Topology setupMultipleSourceTopology(String... strArr) {
        Topology topology = new Topology();
        String[] strArr2 = new String[strArr.length];
        int i = 0;
        for (String str : strArr) {
            String str2 = str + "-source";
            String str3 = str + "-processor";
            topology.addSource(str2, new String[]{str});
            int i2 = i;
            i++;
            strArr2[i2] = str3;
            topology.addProcessor(str3, new MockProcessorSupplier(), new String[]{str2});
        }
        topology.addSink("sink-topic", SINK_TOPIC_1, strArr2);
        return topology;
    }

    private Topology setupGlobalStoreTopology(String... strArr) {
        if (strArr.length == 0) {
            throw new IllegalArgumentException("sourceTopicNames cannot be empty");
        }
        Topology topology = new Topology();
        for (String str : strArr) {
            topology.addGlobalStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(str + "-globalStore"), (Serde) null, (Serde) null).withLoggingDisabled(), str, (Deserializer) null, (Deserializer) null, str, str + "-processor", new MockProcessorSupplier());
        }
        return topology;
    }

    private Topology setupTopologyWithInternalTopic(String str, String str2, String str3) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table(SOURCE_TOPIC_2, Materialized.as(str2)).join(streamsBuilder.stream(SOURCE_TOPIC_1).selectKey((obj, obj2) -> {
            return obj2;
        }).groupByKey().count(Materialized.as(str)), obj3 -> {
            return obj3;
        }, (obj4, l) -> {
            return l;
        }, Named.as(str3));
        return streamsBuilder.build(this.config);
    }

    @Test
    public void shouldInitProcessor() {
        this.testDriver = new TopologyTestDriver(setupSingleProcessorTopology(), this.config);
        Assert.assertTrue(this.mockProcessors.get(0).initialized);
    }

    @Test
    public void shouldCloseProcessor() {
        this.testDriver = new TopologyTestDriver(setupSingleProcessorTopology(), this.config);
        this.testDriver.close();
        Assert.assertTrue(this.mockProcessors.get(0).closed);
        this.testDriver = null;
    }

    @Test
    public void shouldThrowForUnknownTopic() {
        this.testDriver = new TopologyTestDriver(new Topology(), this.config);
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            this.testDriver.pipeRecord("unknownTopic", new TestRecord((byte[]) null), new ByteArraySerializer(), new ByteArraySerializer(), Instant.now());
        });
    }

    @Test
    public void shouldThrowForMissingTime() {
        this.testDriver = new TopologyTestDriver(new Topology(), this.config);
        Assert.assertThrows(IllegalStateException.class, () -> {
            this.testDriver.pipeRecord(SINK_TOPIC_1, new TestRecord("value"), new StringSerializer(), new StringSerializer(), (Instant) null);
        });
    }

    @Test
    @Deprecated
    public void shouldThrowForUnknownTopicDeprecated() {
        ConsumerRecordFactory consumerRecordFactory = new ConsumerRecordFactory("unknownTopic", new ByteArraySerializer(), new ByteArraySerializer());
        this.testDriver = new TopologyTestDriver(new Topology(), this.config);
        try {
            this.testDriver.pipeInput(consumerRecordFactory.create((byte[]) null));
            Assert.fail("Should have throw IllegalArgumentException");
        } catch (IllegalArgumentException e) {
            Assert.assertEquals("Unknown topic: unknownTopic", e.getMessage());
        }
    }

    @Test
    public void shouldThrowNoSuchElementExceptionForUnusedOutputTopicWithDynamicRouting() {
        this.testDriver = new TopologyTestDriver(setupSourceSinkTopology(), this.config);
        TestOutputTopic testOutputTopic = new TestOutputTopic(this.testDriver, "unused-topic", new StringDeserializer(), new StringDeserializer());
        Assert.assertTrue(testOutputTopic.isEmpty());
        testOutputTopic.getClass();
        Assert.assertThrows(NoSuchElementException.class, testOutputTopic::readRecord);
    }

    @Test
    public void shouldCaptureSinkTopicNamesIfWrittenInto() {
        this.testDriver = new TopologyTestDriver(setupSourceSinkTopology(), this.config);
        MatcherAssert.assertThat(this.testDriver.producedTopicNames(), CoreMatchers.is(Collections.emptySet()));
        pipeRecord(SOURCE_TOPIC_1, this.testRecord1);
        MatcherAssert.assertThat(this.testDriver.producedTopicNames(), CoreMatchers.hasItem(SINK_TOPIC_1));
    }

    @Test
    public void shouldCaptureInternalTopicNamesIfWrittenInto() {
        this.testDriver = new TopologyTestDriver(setupTopologyWithInternalTopic("table1", "table2", "join"), this.config);
        MatcherAssert.assertThat(this.testDriver.producedTopicNames(), CoreMatchers.is(Collections.emptySet()));
        pipeRecord(SOURCE_TOPIC_1, this.testRecord1);
        MatcherAssert.assertThat(this.testDriver.producedTopicNames(), CoreMatchers.equalTo(Utils.mkSet(new String[]{this.config.getProperty("application.id") + "-table1-repartition", this.config.getProperty("application.id") + "-table1-changelog"})));
        pipeRecord(SOURCE_TOPIC_2, this.testRecord1);
        MatcherAssert.assertThat(this.testDriver.producedTopicNames(), CoreMatchers.equalTo(Utils.mkSet(new String[]{this.config.getProperty("application.id") + "-table1-repartition", this.config.getProperty("application.id") + "-table1-changelog", this.config.getProperty("application.id") + "-table2-changelog", this.config.getProperty("application.id") + "-join-subscription-registration-topic", this.config.getProperty("application.id") + "-join-subscription-store-changelog", this.config.getProperty("application.id") + "-join-subscription-response-topic"})));
    }

    @Test
    public void shouldCaptureGlobalTopicNameIfWrittenInto() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.globalTable(SOURCE_TOPIC_1, Materialized.as("globalTable"));
        streamsBuilder.stream(SOURCE_TOPIC_2).to(SOURCE_TOPIC_1);
        this.testDriver = new TopologyTestDriver(streamsBuilder.build(), this.config);
        MatcherAssert.assertThat(this.testDriver.producedTopicNames(), CoreMatchers.is(Collections.emptySet()));
        pipeRecord(SOURCE_TOPIC_2, this.testRecord1);
        MatcherAssert.assertThat(this.testDriver.producedTopicNames(), CoreMatchers.equalTo(Collections.singleton(SOURCE_TOPIC_1)));
    }

    @Test
    public void shouldProcessRecordForTopic() {
        this.testDriver = new TopologyTestDriver(setupSourceSinkTopology(), this.config);
        pipeRecord(SOURCE_TOPIC_1, this.testRecord1);
        ProducerRecord readRecord = this.testDriver.readRecord(SINK_TOPIC_1);
        Assert.assertEquals(this.key1, readRecord.key());
        Assert.assertEquals(this.value1, readRecord.value());
        Assert.assertEquals(SINK_TOPIC_1, readRecord.topic());
    }

    @Test
    public void shouldSetRecordMetadata() {
        this.testDriver = new TopologyTestDriver(setupSingleProcessorTopology(), this.config);
        pipeRecord(SOURCE_TOPIC_1, this.testRecord1);
        List list = this.mockProcessors.get(0).processedRecords;
        Assert.assertEquals(1L, list.size());
        MatcherAssert.assertThat((Record) list.get(0), CoreMatchers.equalTo(new Record(SOURCE_TOPIC_1, this.testRecord1, 0L)));
    }

    private void pipeRecord(String str, TestRecord<byte[], byte[]> testRecord) {
        this.testDriver.pipeRecord(str, testRecord, new ByteArraySerializer(), new ByteArraySerializer(), (Instant) null);
    }

    @Test
    @Deprecated
    public void shouldSendRecordViaCorrectSourceTopicDeprecated() {
        this.testDriver = new TopologyTestDriver(setupMultipleSourceTopology(SOURCE_TOPIC_1, SOURCE_TOPIC_2), this.config);
        List list = this.mockProcessors.get(0).processedRecords;
        List list2 = this.mockProcessors.get(1).processedRecords;
        this.testDriver.pipeInput(this.consumerRecord1);
        Assert.assertEquals(1L, list.size());
        Assert.assertEquals(0L, list2.size());
        MatcherAssert.assertThat((Record) list.get(0), CoreMatchers.equalTo(new Record(this.consumerRecord1, 0L)));
        this.testDriver.pipeInput(this.consumerRecord2);
        Assert.assertEquals(1L, list.size());
        Assert.assertEquals(1L, list2.size());
        MatcherAssert.assertThat((Record) list2.get(0), CoreMatchers.equalTo(new Record(this.consumerRecord2, 0L)));
    }

    @Test
    @Deprecated
    public void shouldUseSourceSpecificDeserializersDeprecated() {
        Topology topology = new Topology();
        topology.addSource("source-1", Serdes.Long().deserializer(), Serdes.String().deserializer(), new String[]{SOURCE_TOPIC_1});
        topology.addSource("source-2", Serdes.Integer().deserializer(), Serdes.Double().deserializer(), new String[]{SOURCE_TOPIC_2});
        topology.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source-1", "source-2"});
        topology.addSink("sink", SINK_TOPIC_1, (str, obj) -> {
            return obj instanceof Long ? Serdes.Long().serializer().serialize(str, (Long) obj) : Serdes.Integer().serializer().serialize(str, (Integer) obj);
        }, (str2, obj2) -> {
            return obj2 instanceof String ? Serdes.String().serializer().serialize(str2, (String) obj2) : Serdes.Double().serializer().serialize(str2, (Double) obj2);
        }, new String[]{"processor"});
        this.testDriver = new TopologyTestDriver(topology, this.config);
        ConsumerRecordFactory consumerRecordFactory = new ConsumerRecordFactory(SOURCE_TOPIC_1, Serdes.Long().serializer(), Serdes.String().serializer());
        ConsumerRecordFactory consumerRecordFactory2 = new ConsumerRecordFactory(SOURCE_TOPIC_2, Serdes.Integer().serializer(), Serdes.Double().serializer());
        Double valueOf = Double.valueOf(3.14d);
        ConsumerRecord create = consumerRecordFactory.create(42L, "anyString");
        ConsumerRecord create2 = consumerRecordFactory2.create(73, valueOf);
        this.testDriver.pipeInput(create);
        ProducerRecord readOutput = this.testDriver.readOutput(SINK_TOPIC_1, Serdes.Long().deserializer(), Serdes.String().deserializer());
        MatcherAssert.assertThat(readOutput.key(), CoreMatchers.equalTo(42L));
        MatcherAssert.assertThat(readOutput.value(), CoreMatchers.equalTo("anyString"));
        this.testDriver.pipeInput(create2);
        ProducerRecord readOutput2 = this.testDriver.readOutput(SINK_TOPIC_1, Serdes.Integer().deserializer(), Serdes.Double().deserializer());
        MatcherAssert.assertThat(readOutput2.key(), CoreMatchers.equalTo(73));
        MatcherAssert.assertThat(readOutput2.value(), CoreMatchers.equalTo(valueOf));
    }

    @Test
    public void shouldUseSourceSpecificDeserializers() {
        Topology topology = new Topology();
        topology.addSource("source-1", Serdes.Long().deserializer(), Serdes.String().deserializer(), new String[]{SOURCE_TOPIC_1});
        topology.addSource("source-2", Serdes.Integer().deserializer(), Serdes.Double().deserializer(), new String[]{SOURCE_TOPIC_2});
        topology.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source-1", "source-2"});
        topology.addSink("sink", SINK_TOPIC_1, (str, obj) -> {
            return obj instanceof Long ? Serdes.Long().serializer().serialize(str, (Long) obj) : Serdes.Integer().serializer().serialize(str, (Integer) obj);
        }, (str2, obj2) -> {
            return obj2 instanceof String ? Serdes.String().serializer().serialize(str2, (String) obj2) : Serdes.Double().serializer().serialize(str2, (Double) obj2);
        }, new String[]{"processor"});
        this.testDriver = new TopologyTestDriver(topology, this.config);
        Double valueOf = Double.valueOf(3.14d);
        TestRecord testRecord = new TestRecord(42L, "anyString");
        TestRecord testRecord2 = new TestRecord(73, valueOf);
        this.testDriver.pipeRecord(SOURCE_TOPIC_1, testRecord, Serdes.Long().serializer(), Serdes.String().serializer(), Instant.now());
        TestRecord readRecord = this.testDriver.readRecord(SINK_TOPIC_1, Serdes.Long().deserializer(), Serdes.String().deserializer());
        MatcherAssert.assertThat(readRecord.getKey(), CoreMatchers.equalTo(42L));
        MatcherAssert.assertThat(readRecord.getValue(), CoreMatchers.equalTo("anyString"));
        this.testDriver.pipeRecord(SOURCE_TOPIC_2, testRecord2, Serdes.Integer().serializer(), Serdes.Double().serializer(), Instant.now());
        TestRecord readRecord2 = this.testDriver.readRecord(SINK_TOPIC_1, Serdes.Integer().deserializer(), Serdes.Double().deserializer());
        MatcherAssert.assertThat(readRecord2.getKey(), CoreMatchers.equalTo(73));
        MatcherAssert.assertThat(readRecord2.getValue(), CoreMatchers.equalTo(valueOf));
    }

    @Test
    public void shouldPassRecordHeadersIntoSerializersAndDeserializers() {
        this.testDriver = new TopologyTestDriver(setupSourceSinkTopology(), this.config);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        final AtomicBoolean atomicBoolean3 = new AtomicBoolean(false);
        final AtomicBoolean atomicBoolean4 = new AtomicBoolean(false);
        ByteArraySerializer byteArraySerializer = new ByteArraySerializer() { // from class: org.apache.kafka.streams.TopologyTestDriverTest.1
            public byte[] serialize(String str, Headers headers, byte[] bArr) {
                atomicBoolean.set(true);
                return serialize(str, bArr);
            }
        };
        ByteArraySerializer byteArraySerializer2 = new ByteArraySerializer() { // from class: org.apache.kafka.streams.TopologyTestDriverTest.2
            public byte[] serialize(String str, Headers headers, byte[] bArr) {
                atomicBoolean2.set(true);
                return serialize(str, bArr);
            }
        };
        ByteArrayDeserializer byteArrayDeserializer = new ByteArrayDeserializer() { // from class: org.apache.kafka.streams.TopologyTestDriverTest.3
            /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
            public byte[] m1deserialize(String str, Headers headers, byte[] bArr) {
                atomicBoolean3.set(true);
                return deserialize(str, bArr);
            }
        };
        ByteArrayDeserializer byteArrayDeserializer2 = new ByteArrayDeserializer() { // from class: org.apache.kafka.streams.TopologyTestDriverTest.4
            /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
            public byte[] m2deserialize(String str, Headers headers, byte[] bArr) {
                atomicBoolean4.set(true);
                return deserialize(str, bArr);
            }
        };
        TestInputTopic createInputTopic = this.testDriver.createInputTopic(SOURCE_TOPIC_1, byteArraySerializer, byteArraySerializer2);
        TestOutputTopic createOutputTopic = this.testDriver.createOutputTopic(SINK_TOPIC_1, byteArrayDeserializer, byteArrayDeserializer2);
        createInputTopic.pipeInput(this.testRecord1);
        createOutputTopic.readRecord();
        MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean.get()), CoreMatchers.equalTo(true));
        MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean2.get()), CoreMatchers.equalTo(true));
        MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean3.get()), CoreMatchers.equalTo(true));
        MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean4.get()), CoreMatchers.equalTo(true));
    }

    @Test
    public void shouldUseSinkSpecificSerializers() {
        Topology topology = new Topology();
        topology.addSource("source-1", Serdes.Long().deserializer(), Serdes.String().deserializer(), new String[]{SOURCE_TOPIC_1});
        topology.addSource("source-2", Serdes.Integer().deserializer(), Serdes.Double().deserializer(), new String[]{SOURCE_TOPIC_2});
        topology.addSink("sink-1", SINK_TOPIC_1, Serdes.Long().serializer(), Serdes.String().serializer(), new String[]{"source-1"});
        topology.addSink("sink-2", SINK_TOPIC_2, Serdes.Integer().serializer(), Serdes.Double().serializer(), new String[]{"source-2"});
        this.testDriver = new TopologyTestDriver(topology, this.config);
        Double valueOf = Double.valueOf(3.14d);
        TestRecord testRecord = new TestRecord(42L, "anyString");
        TestRecord testRecord2 = new TestRecord(73, valueOf);
        this.testDriver.pipeRecord(SOURCE_TOPIC_1, testRecord, Serdes.Long().serializer(), Serdes.String().serializer(), Instant.now());
        TestRecord readRecord = this.testDriver.readRecord(SINK_TOPIC_1, Serdes.Long().deserializer(), Serdes.String().deserializer());
        MatcherAssert.assertThat(readRecord.getKey(), CoreMatchers.equalTo(42L));
        MatcherAssert.assertThat(readRecord.getValue(), CoreMatchers.equalTo("anyString"));
        this.testDriver.pipeRecord(SOURCE_TOPIC_2, testRecord2, Serdes.Integer().serializer(), Serdes.Double().serializer(), Instant.now());
        TestRecord readRecord2 = this.testDriver.readRecord(SINK_TOPIC_2, Serdes.Integer().deserializer(), Serdes.Double().deserializer());
        MatcherAssert.assertThat(readRecord2.getKey(), CoreMatchers.equalTo(73));
        MatcherAssert.assertThat(readRecord2.getValue(), CoreMatchers.equalTo(valueOf));
    }

    @Test
    @Deprecated
    public void shouldProcessConsumerRecordList() {
        this.testDriver = new TopologyTestDriver(setupMultipleSourceTopology(SOURCE_TOPIC_1, SOURCE_TOPIC_2), this.config);
        List list = this.mockProcessors.get(0).processedRecords;
        List list2 = this.mockProcessors.get(1).processedRecords;
        ArrayList arrayList = new ArrayList(2);
        arrayList.add(this.consumerRecord1);
        arrayList.add(this.consumerRecord2);
        this.testDriver.pipeInput(arrayList);
        Assert.assertEquals(1L, list.size());
        Assert.assertEquals(1L, list2.size());
        MatcherAssert.assertThat((Record) list.get(0), CoreMatchers.equalTo(new Record(this.consumerRecord1, 0L)));
        MatcherAssert.assertThat((Record) list2.get(0), CoreMatchers.equalTo(new Record(this.consumerRecord2, 0L)));
    }

    @Test
    public void shouldForwardRecordsFromSubtopologyToSubtopology() {
        this.testDriver = new TopologyTestDriver(setupTopologyWithTwoSubtopologies(), this.config);
        pipeRecord(SOURCE_TOPIC_1, this.testRecord1);
        ProducerRecord readRecord = this.testDriver.readRecord(SINK_TOPIC_1);
        Assert.assertEquals(this.key1, readRecord.key());
        Assert.assertEquals(this.value1, readRecord.value());
        Assert.assertEquals(SINK_TOPIC_1, readRecord.topic());
        ProducerRecord readRecord2 = this.testDriver.readRecord(SINK_TOPIC_2);
        Assert.assertEquals(this.key1, readRecord2.key());
        Assert.assertEquals(this.value1, readRecord2.value());
        Assert.assertEquals(SINK_TOPIC_2, readRecord2.topic());
    }

    @Test
    public void shouldPopulateGlobalStore() {
        this.testDriver = new TopologyTestDriver(setupGlobalStoreTopology(SOURCE_TOPIC_1), this.config);
        Assert.assertNotNull(this.testDriver.getKeyValueStore("source-topic-1-globalStore"));
        Assert.assertNotNull(this.testDriver.getAllStateStores().get("source-topic-1-globalStore"));
        pipeRecord(SOURCE_TOPIC_1, this.testRecord1);
        List list = this.mockProcessors.get(0).processedRecords;
        Assert.assertEquals(1L, list.size());
        MatcherAssert.assertThat((Record) list.get(0), CoreMatchers.equalTo(new Record(SOURCE_TOPIC_1, this.testRecord1, 0L)));
    }

    @Test
    public void shouldPunctuateOnStreamsTime() {
        MockPunctuator mockPunctuator = new MockPunctuator();
        this.testDriver = new TopologyTestDriver(setupSingleProcessorTopology(10L, PunctuationType.STREAM_TIME, mockPunctuator), this.config);
        LinkedList linkedList = new LinkedList();
        linkedList.add(42L);
        pipeRecord(SOURCE_TOPIC_1, new TestRecord<>(this.key1, this.value1, (Headers) null, 42L));
        MatcherAssert.assertThat(mockPunctuator.punctuatedAt, CoreMatchers.equalTo(linkedList));
        pipeRecord(SOURCE_TOPIC_1, new TestRecord<>(this.key1, this.value1, (Headers) null, 42L));
        MatcherAssert.assertThat(mockPunctuator.punctuatedAt, CoreMatchers.equalTo(linkedList));
        linkedList.add(51L);
        pipeRecord(SOURCE_TOPIC_1, new TestRecord<>(this.key1, this.value1, (Headers) null, 51L));
        MatcherAssert.assertThat(mockPunctuator.punctuatedAt, CoreMatchers.equalTo(linkedList));
        pipeRecord(SOURCE_TOPIC_1, new TestRecord<>(this.key1, this.value1, (Headers) null, 52L));
        MatcherAssert.assertThat(mockPunctuator.punctuatedAt, CoreMatchers.equalTo(linkedList));
        linkedList.add(61L);
        pipeRecord(SOURCE_TOPIC_1, new TestRecord<>(this.key1, this.value1, (Headers) null, 61L));
        MatcherAssert.assertThat(mockPunctuator.punctuatedAt, CoreMatchers.equalTo(linkedList));
        pipeRecord(SOURCE_TOPIC_1, new TestRecord<>(this.key1, this.value1, (Headers) null, 65L));
        MatcherAssert.assertThat(mockPunctuator.punctuatedAt, CoreMatchers.equalTo(linkedList));
        linkedList.add(71L);
        pipeRecord(SOURCE_TOPIC_1, new TestRecord<>(this.key1, this.value1, (Headers) null, 71L));
        MatcherAssert.assertThat(mockPunctuator.punctuatedAt, CoreMatchers.equalTo(linkedList));
        pipeRecord(SOURCE_TOPIC_1, new TestRecord<>(this.key1, this.value1, (Headers) null, 72L));
        MatcherAssert.assertThat(mockPunctuator.punctuatedAt, CoreMatchers.equalTo(linkedList));
        linkedList.add(95L);
        pipeRecord(SOURCE_TOPIC_1, new TestRecord<>(this.key1, this.value1, (Headers) null, 95L));
        MatcherAssert.assertThat(mockPunctuator.punctuatedAt, CoreMatchers.equalTo(linkedList));
        linkedList.add(101L);
        pipeRecord(SOURCE_TOPIC_1, new TestRecord<>(this.key1, this.value1, (Headers) null, 101L));
        MatcherAssert.assertThat(mockPunctuator.punctuatedAt, CoreMatchers.equalTo(linkedList));
        pipeRecord(SOURCE_TOPIC_1, new TestRecord<>(this.key1, this.value1, (Headers) null, 102L));
        MatcherAssert.assertThat(mockPunctuator.punctuatedAt, CoreMatchers.equalTo(linkedList));
    }

    @Test
    public void shouldPunctuateOnWallClockTimeDeprecated() {
        MockPunctuator mockPunctuator = new MockPunctuator();
        this.testDriver = new TopologyTestDriver(setupSingleProcessorTopology(10L, PunctuationType.WALL_CLOCK_TIME, mockPunctuator), this.config, 0L);
        LinkedList linkedList = new LinkedList();
        this.testDriver.advanceWallClockTime(5L);
        MatcherAssert.assertThat(mockPunctuator.punctuatedAt, CoreMatchers.equalTo(linkedList));
        linkedList.add(14L);
        this.testDriver.advanceWallClockTime(9L);
        MatcherAssert.assertThat(mockPunctuator.punctuatedAt, CoreMatchers.equalTo(linkedList));
        this.testDriver.advanceWallClockTime(1L);
        MatcherAssert.assertThat(mockPunctuator.punctuatedAt, CoreMatchers.equalTo(linkedList));
        linkedList.add(35L);
        this.testDriver.advanceWallClockTime(20L);
        MatcherAssert.assertThat(mockPunctuator.punctuatedAt, CoreMatchers.equalTo(linkedList));
        linkedList.add(40L);
        this.testDriver.advanceWallClockTime(5L);
        MatcherAssert.assertThat(mockPunctuator.punctuatedAt, CoreMatchers.equalTo(linkedList));
    }

    @Test
    public void shouldPunctuateOnWallClockTime() {
        MockPunctuator mockPunctuator = new MockPunctuator();
        this.testDriver = new TopologyTestDriver(setupSingleProcessorTopology(10L, PunctuationType.WALL_CLOCK_TIME, mockPunctuator), this.config, Instant.ofEpochMilli(0L));
        LinkedList linkedList = new LinkedList();
        this.testDriver.advanceWallClockTime(Duration.ofMillis(5L));
        MatcherAssert.assertThat(mockPunctuator.punctuatedAt, CoreMatchers.equalTo(linkedList));
        linkedList.add(14L);
        this.testDriver.advanceWallClockTime(Duration.ofMillis(9L));
        MatcherAssert.assertThat(mockPunctuator.punctuatedAt, CoreMatchers.equalTo(linkedList));
        this.testDriver.advanceWallClockTime(Duration.ofMillis(1L));
        MatcherAssert.assertThat(mockPunctuator.punctuatedAt, CoreMatchers.equalTo(linkedList));
        linkedList.add(35L);
        this.testDriver.advanceWallClockTime(Duration.ofMillis(20L));
        MatcherAssert.assertThat(mockPunctuator.punctuatedAt, CoreMatchers.equalTo(linkedList));
        linkedList.add(40L);
        this.testDriver.advanceWallClockTime(Duration.ofMillis(5L));
        MatcherAssert.assertThat(mockPunctuator.punctuatedAt, CoreMatchers.equalTo(linkedList));
    }

    @Test
    public void shouldReturnAllStores() {
        Topology topology = setupSourceSinkTopology();
        topology.addProcessor("processor", () -> {
            return null;
        }, new String[]{"source"});
        topology.addStateStore(new KeyValueStoreBuilder(Stores.inMemoryKeyValueStore("store"), Serdes.ByteArray(), Serdes.ByteArray(), new SystemTime()), new String[]{"processor"});
        topology.addGlobalStore(new KeyValueStoreBuilder(Stores.inMemoryKeyValueStore("globalStore"), Serdes.ByteArray(), Serdes.ByteArray(), new SystemTime()).withLoggingDisabled(), "sourceProcessorName", Serdes.ByteArray().deserializer(), Serdes.ByteArray().deserializer(), "globalTopicName", "globalProcessorName", () -> {
            return null;
        });
        this.testDriver = new TopologyTestDriver(topology, this.config);
        HashSet hashSet = new HashSet();
        hashSet.add("store");
        hashSet.add("globalStore");
        Map allStateStores = this.testDriver.getAllStateStores();
        MatcherAssert.assertThat(allStateStores.keySet(), CoreMatchers.equalTo(hashSet));
        Iterator it = allStateStores.values().iterator();
        while (it.hasNext()) {
            Assert.assertNotNull((StateStore) it.next());
        }
    }

    @Test
    public void shouldReturnCorrectPersistentStoreTypeOnly() {
        shouldReturnCorrectStoreTypeOnly(true);
    }

    @Test
    public void shouldReturnCorrectInMemoryStoreTypeOnly() {
        shouldReturnCorrectStoreTypeOnly(false);
    }

    private void shouldReturnCorrectStoreTypeOnly(boolean z) {
        Topology topology = setupSingleProcessorTopology();
        addStoresToTopology(topology, z, "keyValueStore", "keyValueTimestampStore", "windowStore", "windowTimestampStore", "sessionStore", "globalKeyValueStore", "globalKeyValueTimestampStore");
        this.testDriver = new TopologyTestDriver(topology, this.config);
        Assert.assertNotNull(this.testDriver.getKeyValueStore("keyValueStore"));
        Assert.assertNull(this.testDriver.getTimestampedKeyValueStore("keyValueStore"));
        Assert.assertNull(this.testDriver.getWindowStore("keyValueStore"));
        Assert.assertNull(this.testDriver.getTimestampedWindowStore("keyValueStore"));
        Assert.assertNull(this.testDriver.getSessionStore("keyValueStore"));
        Assert.assertNotNull(this.testDriver.getKeyValueStore("keyValueTimestampStore"));
        Assert.assertNotNull(this.testDriver.getTimestampedKeyValueStore("keyValueTimestampStore"));
        Assert.assertNull(this.testDriver.getWindowStore("keyValueTimestampStore"));
        Assert.assertNull(this.testDriver.getTimestampedWindowStore("keyValueTimestampStore"));
        Assert.assertNull(this.testDriver.getSessionStore("keyValueTimestampStore"));
        Assert.assertNull(this.testDriver.getKeyValueStore("windowStore"));
        Assert.assertNull(this.testDriver.getTimestampedKeyValueStore("windowStore"));
        Assert.assertNotNull(this.testDriver.getWindowStore("windowStore"));
        Assert.assertNull(this.testDriver.getTimestampedWindowStore("windowStore"));
        Assert.assertNull(this.testDriver.getSessionStore("windowStore"));
        Assert.assertNull(this.testDriver.getKeyValueStore("windowTimestampStore"));
        Assert.assertNull(this.testDriver.getTimestampedKeyValueStore("windowTimestampStore"));
        Assert.assertNotNull(this.testDriver.getWindowStore("windowTimestampStore"));
        Assert.assertNotNull(this.testDriver.getTimestampedWindowStore("windowTimestampStore"));
        Assert.assertNull(this.testDriver.getSessionStore("windowTimestampStore"));
        Assert.assertNull(this.testDriver.getKeyValueStore("sessionStore"));
        Assert.assertNull(this.testDriver.getTimestampedKeyValueStore("sessionStore"));
        Assert.assertNull(this.testDriver.getWindowStore("sessionStore"));
        Assert.assertNull(this.testDriver.getTimestampedWindowStore("sessionStore"));
        Assert.assertNotNull(this.testDriver.getSessionStore("sessionStore"));
        Assert.assertNotNull(this.testDriver.getKeyValueStore("globalKeyValueStore"));
        Assert.assertNull(this.testDriver.getTimestampedKeyValueStore("globalKeyValueStore"));
        Assert.assertNull(this.testDriver.getWindowStore("globalKeyValueStore"));
        Assert.assertNull(this.testDriver.getTimestampedWindowStore("globalKeyValueStore"));
        Assert.assertNull(this.testDriver.getSessionStore("globalKeyValueStore"));
        Assert.assertNotNull(this.testDriver.getKeyValueStore("globalKeyValueTimestampStore"));
        Assert.assertNotNull(this.testDriver.getTimestampedKeyValueStore("globalKeyValueTimestampStore"));
        Assert.assertNull(this.testDriver.getWindowStore("globalKeyValueTimestampStore"));
        Assert.assertNull(this.testDriver.getTimestampedWindowStore("globalKeyValueTimestampStore"));
        Assert.assertNull(this.testDriver.getSessionStore("globalKeyValueTimestampStore"));
    }

    @Test
    public void shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod() {
        shouldThrowIfBuiltInStoreIsAccessedWithUntypedMethod(false);
    }

    @Test
    public void shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod() {
        shouldThrowIfBuiltInStoreIsAccessedWithUntypedMethod(true);
    }

    private void shouldThrowIfBuiltInStoreIsAccessedWithUntypedMethod(boolean z) {
        Topology topology = setupSingleProcessorTopology();
        addStoresToTopology(topology, z, "keyValueStore", "keyValueTimestampStore", "windowStore", "windowTimestampStore", "sessionStore", "globalKeyValueStore", "globalKeyValueTimestampStore");
        this.testDriver = new TopologyTestDriver(topology, this.config);
        MatcherAssert.assertThat(((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
            this.testDriver.getStateStore("keyValueStore");
        })).getMessage(), CoreMatchers.equalTo("Store keyValueStore is a key-value store and should be accessed via `getKeyValueStore()`"));
        MatcherAssert.assertThat(((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
            this.testDriver.getStateStore("keyValueTimestampStore");
        })).getMessage(), CoreMatchers.equalTo("Store keyValueTimestampStore is a timestamped key-value store and should be accessed via `getTimestampedKeyValueStore()`"));
        MatcherAssert.assertThat(((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
            this.testDriver.getStateStore("windowStore");
        })).getMessage(), CoreMatchers.equalTo("Store windowStore is a window store and should be accessed via `getWindowStore()`"));
        MatcherAssert.assertThat(((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
            this.testDriver.getStateStore("windowTimestampStore");
        })).getMessage(), CoreMatchers.equalTo("Store windowTimestampStore is a timestamped window store and should be accessed via `getTimestampedWindowStore()`"));
        MatcherAssert.assertThat(((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
            this.testDriver.getStateStore("sessionStore");
        })).getMessage(), CoreMatchers.equalTo("Store sessionStore is a session store and should be accessed via `getSessionStore()`"));
        MatcherAssert.assertThat(((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
            this.testDriver.getStateStore("globalKeyValueStore");
        })).getMessage(), CoreMatchers.equalTo("Store globalKeyValueStore is a key-value store and should be accessed via `getKeyValueStore()`"));
        MatcherAssert.assertThat(((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
            this.testDriver.getStateStore("globalKeyValueTimestampStore");
        })).getMessage(), CoreMatchers.equalTo("Store globalKeyValueTimestampStore is a timestamped key-value store and should be accessed via `getTimestampedKeyValueStore()`"));
    }

    private void addStoresToTopology(Topology topology, boolean z, String str, String str2, String str3, String str4, String str5, String str6, String str7) {
        topology.addStateStore(Stores.keyValueStoreBuilder(z ? Stores.persistentKeyValueStore(str) : Stores.inMemoryKeyValueStore(str), Serdes.ByteArray(), Serdes.ByteArray()), new String[]{"processor"});
        topology.addStateStore(Stores.timestampedKeyValueStoreBuilder(z ? Stores.persistentTimestampedKeyValueStore(str2) : Stores.inMemoryKeyValueStore(str2), Serdes.ByteArray(), Serdes.ByteArray()), new String[]{"processor"});
        topology.addStateStore(Stores.windowStoreBuilder(z ? Stores.persistentWindowStore(str3, Duration.ofMillis(1000L), Duration.ofMillis(100L), false) : Stores.inMemoryWindowStore(str3, Duration.ofMillis(1000L), Duration.ofMillis(100L), false), Serdes.ByteArray(), Serdes.ByteArray()), new String[]{"processor"});
        topology.addStateStore(Stores.timestampedWindowStoreBuilder(z ? Stores.persistentTimestampedWindowStore(str4, Duration.ofMillis(1000L), Duration.ofMillis(100L), false) : Stores.inMemoryWindowStore(str4, Duration.ofMillis(1000L), Duration.ofMillis(100L), false), Serdes.ByteArray(), Serdes.ByteArray()), new String[]{"processor"});
        topology.addStateStore(z ? Stores.sessionStoreBuilder(Stores.persistentSessionStore(str5, Duration.ofMillis(1000L)), Serdes.ByteArray(), Serdes.ByteArray()) : Stores.sessionStoreBuilder(Stores.inMemorySessionStore(str5, Duration.ofMillis(1000L)), Serdes.ByteArray(), Serdes.ByteArray()), new String[]{"processor"});
        topology.addGlobalStore(z ? Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(str6), Serdes.ByteArray(), Serdes.ByteArray()).withLoggingDisabled() : Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(str6), Serdes.ByteArray(), Serdes.ByteArray()).withLoggingDisabled(), "sourceDummy1", Serdes.ByteArray().deserializer(), Serdes.ByteArray().deserializer(), "topicDummy1", "processorDummy1", () -> {
            return null;
        });
        topology.addGlobalStore(z ? Stores.timestampedKeyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(str7), Serdes.ByteArray(), Serdes.ByteArray()).withLoggingDisabled() : Stores.timestampedKeyValueStoreBuilder(Stores.inMemoryKeyValueStore(str7), Serdes.ByteArray(), Serdes.ByteArray()).withLoggingDisabled(), "sourceDummy2", Serdes.ByteArray().deserializer(), Serdes.ByteArray().deserializer(), "topicDummy2", "processorDummy2", () -> {
            return null;
        });
    }

    @Test
    public void shouldReturnAllStoresNames() {
        Topology topology = setupSourceSinkTopology();
        topology.addStateStore(new KeyValueStoreBuilder(Stores.inMemoryKeyValueStore("store"), Serdes.ByteArray(), Serdes.ByteArray(), new SystemTime()), new String[0]);
        topology.addGlobalStore(new KeyValueStoreBuilder(Stores.inMemoryKeyValueStore("globalStore"), Serdes.ByteArray(), Serdes.ByteArray(), new SystemTime()).withLoggingDisabled(), "sourceProcessorName", Serdes.ByteArray().deserializer(), Serdes.ByteArray().deserializer(), "globalTopicName", "globalProcessorName", () -> {
            return null;
        });
        this.testDriver = new TopologyTestDriver(topology, this.config);
        HashSet hashSet = new HashSet();
        hashSet.add("store");
        hashSet.add("globalStore");
        MatcherAssert.assertThat(this.testDriver.getAllStateStores().keySet(), CoreMatchers.equalTo(hashSet));
    }

    private void setup() {
        setup(Stores.inMemoryKeyValueStore("aggStore"));
    }

    private void setup(KeyValueBytesStoreSupplier keyValueBytesStoreSupplier) {
        Topology topology = new Topology();
        topology.addSource("sourceProcessor", new String[]{"input-topic"});
        topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), new String[]{"sourceProcessor"});
        topology.addStateStore(Stores.keyValueStoreBuilder(keyValueBytesStoreSupplier, Serdes.String(), Serdes.Long()), new String[]{"aggregator"});
        topology.addSink("sinkProcessor", "result-topic", new String[]{"aggregator"});
        this.config.setProperty("default.key.serde", Serdes.String().getClass().getName());
        this.config.setProperty("default.value.serde", Serdes.Long().getClass().getName());
        this.testDriver = new TopologyTestDriver(topology, this.config);
        this.store = this.testDriver.getKeyValueStore("aggStore");
        this.store.put("a", 21L);
    }

    private void pipeInput(String str, String str2, Long l, Long l2) {
        this.testDriver.pipeRecord(str, new TestRecord(str2, l, (Headers) null, l2), new StringSerializer(), new LongSerializer(), (Instant) null);
    }

    private void compareKeyValue(TestRecord<String, Long> testRecord, String str, Long l) {
        MatcherAssert.assertThat(testRecord.getKey(), CoreMatchers.equalTo(str));
        MatcherAssert.assertThat(testRecord.getValue(), CoreMatchers.equalTo(l));
    }

    @Test
    public void shouldFlushStoreForFirstInput() {
        setup();
        pipeInput("input-topic", "a", 1L, 9999L);
        compareKeyValue(this.testDriver.readRecord("result-topic", this.stringDeserializer, this.longDeserializer), "a", 21L);
        Assert.assertTrue(this.testDriver.isEmpty("result-topic"));
    }

    @Test
    public void shouldNotUpdateStoreForSmallerValue() {
        setup();
        pipeInput("input-topic", "a", 1L, 9999L);
        MatcherAssert.assertThat(this.store.get("a"), CoreMatchers.equalTo(21L));
        compareKeyValue(this.testDriver.readRecord("result-topic", this.stringDeserializer, this.longDeserializer), "a", 21L);
        Assert.assertTrue(this.testDriver.isEmpty("result-topic"));
    }

    @Test
    public void shouldNotUpdateStoreForLargerValue() {
        setup();
        pipeInput("input-topic", "a", 42L, 9999L);
        MatcherAssert.assertThat(this.store.get("a"), CoreMatchers.equalTo(42L));
        compareKeyValue(this.testDriver.readRecord("result-topic", this.stringDeserializer, this.longDeserializer), "a", 42L);
        Assert.assertTrue(this.testDriver.isEmpty("result-topic"));
    }

    @Test
    public void shouldUpdateStoreForNewKey() {
        setup();
        pipeInput("input-topic", "b", 21L, 9999L);
        MatcherAssert.assertThat(this.store.get("b"), CoreMatchers.equalTo(21L));
        compareKeyValue(this.testDriver.readRecord("result-topic", this.stringDeserializer, this.longDeserializer), "a", 21L);
        compareKeyValue(this.testDriver.readRecord("result-topic", this.stringDeserializer, this.longDeserializer), "b", 21L);
        Assert.assertTrue(this.testDriver.isEmpty("result-topic"));
    }

    @Test
    public void shouldPunctuateIfEvenTimeAdvances() {
        setup();
        pipeInput("input-topic", "a", 1L, 9999L);
        compareKeyValue(this.testDriver.readRecord("result-topic", this.stringDeserializer, this.longDeserializer), "a", 21L);
        pipeInput("input-topic", "a", 1L, 9999L);
        Assert.assertTrue(this.testDriver.isEmpty("result-topic"));
        pipeInput("input-topic", "a", 1L, 10000L);
        compareKeyValue(this.testDriver.readRecord("result-topic", this.stringDeserializer, this.longDeserializer), "a", 21L);
        Assert.assertTrue(this.testDriver.isEmpty("result-topic"));
    }

    @Test
    public void shouldPunctuateIfWallClockTimeAdvances() {
        setup();
        this.testDriver.advanceWallClockTime(Duration.ofMillis(60000L));
        compareKeyValue(this.testDriver.readRecord("result-topic", this.stringDeserializer, this.longDeserializer), "a", 21L);
        Assert.assertTrue(this.testDriver.isEmpty("result-topic"));
    }

    @Test
    public void shouldAllowPrePopulatingStatesStoresWithCachingEnabled() {
        Topology topology = new Topology();
        topology.addSource("sourceProcessor", new String[]{"input-topic"});
        topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), new String[]{"sourceProcessor"});
        topology.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("aggStore"), Serdes.String(), Serdes.Long()).withCachingEnabled(), new String[]{"aggregator"});
        this.testDriver = new TopologyTestDriver(topology, this.config);
        this.store = this.testDriver.getKeyValueStore("aggStore");
        this.store.put("a", 21L);
    }

    @Test
    public void shouldCleanUpPersistentStateStoresOnClose() {
        Throwable th;
        Topology topology = new Topology();
        topology.addSource("sourceProcessor", new String[]{"input-topic"});
        topology.addProcessor("storeProcessor", new ProcessorSupplier<String, Long>() { // from class: org.apache.kafka.streams.TopologyTestDriverTest.5
            public Processor<String, Long> get() {
                return new Processor<String, Long>() { // from class: org.apache.kafka.streams.TopologyTestDriverTest.5.1
                    private KeyValueStore<String, Long> store;

                    public void init(ProcessorContext processorContext) {
                        this.store = processorContext.getStateStore("storeProcessorStore");
                    }

                    public void process(String str, Long l) {
                        this.store.put(str, l);
                    }

                    public void close() {
                    }
                };
            }
        }, new String[]{"sourceProcessor"});
        topology.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("storeProcessorStore"), Serdes.String(), Serdes.Long()), new String[]{"storeProcessor"});
        Properties properties = new Properties();
        properties.put("application.id", "test-TopologyTestDriver-cleanup");
        properties.put("bootstrap.servers", "dummy:1234");
        properties.put("state.dir", TestUtils.tempDirectory().getAbsolutePath());
        properties.put("default.key.serde", Serdes.String().getClass().getName());
        properties.put("default.value.serde", Serdes.Long().getClass().getName());
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties);
        Throwable th2 = null;
        try {
            try {
                Assert.assertNull(topologyTestDriver.getKeyValueStore("storeProcessorStore").get("a"));
                topologyTestDriver.pipeRecord("input-topic", new TestRecord("a", 1L), new StringSerializer(), new LongSerializer(), Instant.now());
                Assert.assertEquals(1L, topologyTestDriver.getKeyValueStore("storeProcessorStore").get("a"));
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                topologyTestDriver = new TopologyTestDriver(topology, properties);
                th = null;
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
            try {
                try {
                    Assert.assertNull("Closing the prior test driver should have cleaned up this store and value.", topologyTestDriver.getKeyValueStore("storeProcessorStore").get("a"));
                    if (topologyTestDriver != null) {
                        if (0 == 0) {
                            topologyTestDriver.close();
                            return;
                        }
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                } catch (Throwable th6) {
                    th = th6;
                    throw th6;
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void shouldFeedStoreFromGlobalKTable() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.globalTable("topic", Consumed.with(Serdes.String(), Serdes.String()), Materialized.as("globalStore"));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.config);
        Throwable th = null;
        try {
            try {
                KeyValueStore keyValueStore = topologyTestDriver.getKeyValueStore("globalStore");
                Assert.assertNotNull(keyValueStore);
                Assert.assertNotNull(topologyTestDriver.getAllStateStores().get("globalStore"));
                topologyTestDriver.pipeRecord("topic", new TestRecord("k1", "value1"), new StringSerializer(), new StringSerializer(), Instant.now());
                Assert.assertEquals("value1", keyValueStore.get("k1"));
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    private Topology setupMultipleSourcesPatternTopology(Pattern... patternArr) {
        Topology topology = new Topology();
        String[] strArr = new String[patternArr.length];
        int i = 0;
        for (Pattern pattern : patternArr) {
            String str = pattern + "-source";
            String str2 = pattern + "-processor";
            topology.addSource(str, pattern);
            int i2 = i;
            i++;
            strArr[i2] = str2;
            topology.addProcessor(str2, new MockProcessorSupplier(), new String[]{str});
        }
        topology.addSink("sink-topic", SINK_TOPIC_1, strArr);
        return topology;
    }

    @Test
    public void shouldProcessFromSourcesThatMatchMultiplePattern() {
        Pattern compile = Pattern.compile("source-topic-\\d");
        Pattern compile2 = Pattern.compile("source-topic-[A-Z]");
        TestRecord<byte[], byte[]> testRecord = new TestRecord<>(this.key2, this.value2, (Headers) null, 43L);
        this.testDriver = new TopologyTestDriver(setupMultipleSourcesPatternTopology(compile, compile2), this.config);
        List list = this.mockProcessors.get(0).processedRecords;
        List list2 = this.mockProcessors.get(1).processedRecords;
        pipeRecord(SOURCE_TOPIC_1, this.testRecord1);
        Assert.assertEquals(1L, list.size());
        Assert.assertEquals(0L, list2.size());
        MatcherAssert.assertThat((Record) list.get(0), CoreMatchers.equalTo(new Record(SOURCE_TOPIC_1, this.testRecord1, 0L)));
        pipeRecord("source-topic-Z", testRecord);
        Assert.assertEquals(1L, list.size());
        Assert.assertEquals(1L, list2.size());
        MatcherAssert.assertThat((Record) list2.get(0), CoreMatchers.equalTo(new Record("source-topic-Z", testRecord, 0L)));
    }

    @Test
    public void shouldProcessFromSourceThatMatchPattern() {
        Pattern compile = Pattern.compile("source-topic-\\d");
        Topology topology = new Topology();
        topology.addSource("source", compile);
        topology.addSink("sink", SINK_TOPIC_1, new String[]{"source"});
        this.testDriver = new TopologyTestDriver(topology, this.config);
        pipeRecord(SOURCE_TOPIC_1, this.testRecord1);
        ProducerRecord readRecord = this.testDriver.readRecord(SINK_TOPIC_1);
        Assert.assertEquals(this.key1, readRecord.key());
        Assert.assertEquals(this.value1, readRecord.value());
        Assert.assertEquals(SINK_TOPIC_1, readRecord.topic());
    }

    @Test
    public void shouldThrowPatternNotValidForTopicNameException() {
        Topology topology = new Topology();
        topology.addSource("source", new String[]{"source-topic-\\d"});
        topology.addSink("sink", SINK_TOPIC_1, new String[]{"source"});
        this.testDriver = new TopologyTestDriver(topology, this.config);
        try {
            pipeRecord(SOURCE_TOPIC_1, this.testRecord1);
        } catch (TopologyException e) {
            Assert.assertEquals(String.format("Invalid topology: Topology add source of type String for topic: %s cannot contain regex pattern for input record topic: %s and hence cannot process the message.", "source-topic-\\d", SOURCE_TOPIC_1), e.getMessage());
        }
    }

    @Test
    public void shouldNotCreateStateDirectoryForStatelessTopology() {
        setup();
        Assert.assertFalse(new File(this.config.getProperty("state.dir"), this.config.getProperty("application.id")).exists());
    }

    @Test
    public void shouldCreateStateDirectoryForStatefulTopology() {
        setup(Stores.persistentKeyValueStore("aggStore"));
        File file = new File(this.config.getProperty("state.dir"), this.config.getProperty("application.id"));
        Assert.assertTrue(file.exists());
        Assert.assertTrue(file.isDirectory());
        Assert.assertTrue(new File(file, new TaskId(0, 0).toString()).exists());
    }

    @Test
    public void shouldEnqueueLaterOutputsAfterEarlierOnes() {
        Properties properties = new Properties();
        properties.setProperty("application.id", "dummy");
        properties.setProperty("bootstrap.servers", "dummy");
        Topology topology = new Topology();
        topology.addSource("source", new StringDeserializer(), new StringDeserializer(), new String[]{"input"});
        topology.addProcessor("recursiveProcessor", () -> {
            return new AbstractProcessor<String, String>() { // from class: org.apache.kafka.streams.TopologyTestDriverTest.6
                public void process(String str, String str2) {
                    if (!str2.startsWith("recurse-")) {
                        context().forward(str, "recurse-" + str2, To.child("recursiveSink"));
                    }
                    context().forward(str, str2, To.child("sink"));
                }
            };
        }, new String[]{"source"});
        topology.addSink("recursiveSink", "input", new StringSerializer(), new StringSerializer(), new String[]{"recursiveProcessor"});
        topology.addSink("sink", "output", new StringSerializer(), new StringSerializer(), new String[]{"recursiveProcessor"});
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("input", new StringSerializer(), new StringSerializer());
                TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer());
                createInputTopic.pipeInput("B", "beta");
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToList(), CoreMatchers.is(Arrays.asList(new KeyValue("B", "beta"), new KeyValue("B", "recurse-beta"))));
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies() {
        Properties properties = new Properties();
        properties.setProperty("application.id", "dummy");
        properties.setProperty("bootstrap.servers", "dummy");
        Topology topology = new Topology();
        topology.addSource("source", new StringDeserializer(), new StringDeserializer(), new String[]{"input"});
        topology.addGlobalStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("global-store"), Serdes.String(), Serdes.String()).withLoggingDisabled(), "globalSource", new StringDeserializer(), new StringDeserializer(), "global-topic", "globalProcessor", () -> {
            return new Processor<String, String>() { // from class: org.apache.kafka.streams.TopologyTestDriverTest.7
                private KeyValueStore stateStore;

                public void init(ProcessorContext processorContext) {
                    this.stateStore = processorContext.getStateStore("global-store");
                }

                public void process(String str, String str2) {
                    this.stateStore.put(str, str2);
                }

                public void close() {
                }
            };
        });
        topology.addProcessor("recursiveProcessor", () -> {
            return new AbstractProcessor<String, String>() { // from class: org.apache.kafka.streams.TopologyTestDriverTest.8
                public void process(String str, String str2) {
                    if (!str2.startsWith("recurse-")) {
                        context().forward(str, "recurse-" + str2, To.child("recursiveSink"));
                    }
                    context().forward(str, str2, To.child("sink"));
                    context().forward(str, str2, To.child("globalSink"));
                }
            };
        }, new String[]{"source"});
        topology.addSink("recursiveSink", "input", new StringSerializer(), new StringSerializer(), new String[]{"recursiveProcessor"});
        topology.addSink("sink", "output", new StringSerializer(), new StringSerializer(), new String[]{"recursiveProcessor"});
        topology.addSink("globalSink", "global-topic", new StringSerializer(), new StringSerializer(), new String[]{"recursiveProcessor"});
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("input", new StringSerializer(), new StringSerializer());
                TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic("global-topic", new StringDeserializer(), new StringDeserializer());
                createInputTopic.pipeInput("A", "alpha");
                KeyValueStore keyValueStore = topologyTestDriver.getKeyValueStore("global-store");
                MatcherAssert.assertThat(keyValueStore, CoreMatchers.notNullValue());
                MatcherAssert.assertThat(keyValueStore.get("A"), CoreMatchers.is("recurse-alpha"));
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToList(), CoreMatchers.is(Arrays.asList(new KeyValue("A", "alpha"), new KeyValue("A", "recurse-alpha"))));
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldRespectTaskIdling() {
        Properties properties = new Properties();
        properties.setProperty("application.id", "dummy");
        properties.setProperty("bootstrap.servers", "dummy");
        properties.setProperty("max.task.idle.ms", "1000");
        Topology topology = new Topology();
        topology.addSource("source1", new StringDeserializer(), new StringDeserializer(), new String[]{"input1"});
        topology.addSource("source2", new StringDeserializer(), new StringDeserializer(), new String[]{"input2"});
        topology.addSink("sink", "output", new StringSerializer(), new StringSerializer(), new String[]{"source1", "source2"});
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("input1", new StringSerializer(), new StringSerializer());
                TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic("input2", new StringSerializer(), new StringSerializer());
                TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer());
                createInputTopic.pipeInput("A", "alpha");
                topologyTestDriver.advanceWallClockTime(Duration.ofMillis(1L));
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToList(), CoreMatchers.is(Collections.emptyList()));
                createInputTopic2.pipeInput("B", "beta");
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToList(), CoreMatchers.is(Collections.singletonList(new KeyValue("A", "alpha"))));
                topologyTestDriver.advanceWallClockTime(Duration.ofSeconds(1L));
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToList(), CoreMatchers.is(Collections.singletonList(new KeyValue("B", "beta"))));
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }
}
