package org.apache.kafka.streams;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.processor.RecordContext;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockProcessorSupplier;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/TopologyTest.class */
public class TopologyTest {
    private final StoreBuilder<MockKeyValueStore> storeBuilder = (StoreBuilder) EasyMock.createNiceMock(StoreBuilder.class);
    private final KeyValueStoreBuilder<?, ?> globalStoreBuilder = (KeyValueStoreBuilder) EasyMock.createNiceMock(KeyValueStoreBuilder.class);
    private final Topology topology = new Topology();
    private final InternalTopologyBuilder.TopologyDescription expectedDescription = new InternalTopologyBuilder.TopologyDescription();

    /* loaded from: input_file:org/apache/kafka/streams/TopologyTest$LocalMockProcessorSupplier.class */
    private static class LocalMockProcessorSupplier implements ProcessorSupplier<Object, Object, Object, Object> {
        static final String STORE_NAME = "store";

        private LocalMockProcessorSupplier() {
        }

        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public Processor<Object, Object, Object, Object> m0get() {
            return new Processor<Object, Object, Object, Object>() { // from class: org.apache.kafka.streams.TopologyTest.LocalMockProcessorSupplier.1
                public void init(ProcessorContext<Object, Object> processorContext) {
                    processorContext.getStateStore(LocalMockProcessorSupplier.STORE_NAME);
                }

                public void process(Record<Object, Object> record) {
                }
            };
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/TopologyTest$MockProcessorSupplierProvidingStore.class */
    private static class MockProcessorSupplierProvidingStore<K, V> extends MockApiProcessorSupplier<K, V, Void, Void> {
        private final StoreBuilder<MockKeyValueStore> storeBuilder;

        public MockProcessorSupplierProvidingStore(StoreBuilder<MockKeyValueStore> storeBuilder) {
            this.storeBuilder = storeBuilder;
        }

        public Set<StoreBuilder<?>> stores() {
            return Collections.singleton(this.storeBuilder);
        }
    }

    @Test
    public void shouldNotAllowNullNameWhenAddingSourceWithTopic() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.topology.addSource((String) null, new String[]{"topic"});
        });
    }

    @Test
    public void shouldNotAllowNullNameWhenAddingSourceWithPattern() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.topology.addSource((String) null, Pattern.compile(".*"));
        });
    }

    @Test
    public void shouldNotAllowNullTopicsWhenAddingSoureWithTopic() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.topology.addSource("source", (String[]) null);
        });
    }

    @Test
    public void shouldNotAllowNullTopicsWhenAddingSourceWithPattern() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.topology.addSource("source", (Pattern) null);
        });
    }

    @Test
    public void shouldNotAllowZeroTopicsWhenAddingSource() {
        Assert.assertThrows(TopologyException.class, () -> {
            this.topology.addSource("source", new String[0]);
        });
    }

    @Test
    public void shouldNotAllowNullNameWhenAddingProcessor() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.topology.addProcessor((String) null, () -> {
                return new MockApiProcessorSupplier().m167get();
            }, new String[0]);
        });
    }

    @Test
    public void shouldNotAllowNullProcessorSupplierWhenAddingProcessor() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.topology.addProcessor("name", (ProcessorSupplier) null, new String[0]);
        });
    }

    @Test
    public void shouldNotAllowNullNameWhenAddingSink() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.topology.addSink((String) null, "topic", new String[0]);
        });
    }

    @Test
    public void shouldNotAllowNullTopicWhenAddingSink() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.topology.addSink("name", (String) null, new String[0]);
        });
    }

    @Test
    public void shouldNotAllowNullTopicChooserWhenAddingSink() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.topology.addSink("name", (TopicNameExtractor) null, new String[0]);
        });
    }

    @Test
    public void shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.topology.connectProcessorAndStateStores((String) null, new String[]{"store"});
        });
    }

    @Test
    public void shouldNotAllowNullStoreNameWhenConnectingProcessorAndStateStores() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.topology.connectProcessorAndStateStores("processor", (String[]) null);
        });
    }

    @Test
    public void shouldNotAllowZeroStoreNameWhenConnectingProcessorAndStateStores() {
        Assert.assertThrows(TopologyException.class, () -> {
            this.topology.connectProcessorAndStateStores("processor", new String[0]);
        });
    }

    @Test
    public void shouldNotAddNullStateStoreSupplier() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.topology.addStateStore((StoreBuilder) null, new String[0]);
        });
    }

    @Test
    public void shouldNotAllowToAddSourcesWithSameName() {
        this.topology.addSource("source", new String[]{"topic-1"});
        try {
            this.topology.addSource("source", new String[]{"topic-2"});
            Assert.fail("Should throw TopologyException for duplicate source name");
        } catch (TopologyException e) {
        }
    }

    @Test
    public void shouldNotAllowToAddTopicTwice() {
        this.topology.addSource("source", new String[]{"topic-1"});
        try {
            this.topology.addSource("source-2", new String[]{"topic-1"});
            Assert.fail("Should throw TopologyException for already used topic");
        } catch (TopologyException e) {
        }
    }

    @Test
    public void testPatternMatchesAlreadyProvidedTopicSource() {
        this.topology.addSource("source-1", new String[]{"foo"});
        try {
            this.topology.addSource("source-2", Pattern.compile("f.*"));
            Assert.fail("Should have thrown TopologyException for overlapping pattern with already registered topic");
        } catch (TopologyException e) {
        }
    }

    @Test
    public void testNamedTopicMatchesAlreadyProvidedPattern() {
        this.topology.addSource("source-1", Pattern.compile("f.*"));
        try {
            this.topology.addSource("source-2", new String[]{"foo"});
            Assert.fail("Should have thrown TopologyException for overlapping topic with already registered pattern");
        } catch (TopologyException e) {
        }
    }

    @Test
    public void shouldNotAllowToAddProcessorWithSameName() {
        this.topology.addSource("source", new String[]{"topic-1"});
        this.topology.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source"});
        try {
            this.topology.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source"});
            Assert.fail("Should throw TopologyException for duplicate processor name");
        } catch (TopologyException e) {
        }
    }

    @Test
    public void shouldNotAllowToAddProcessorWithEmptyParents() {
        this.topology.addSource("source", new String[]{"topic-1"});
        try {
            this.topology.addProcessor("processor", new MockApiProcessorSupplier(), new String[0]);
            Assert.fail("Should throw TopologyException for processor without at least one parent node");
        } catch (TopologyException e) {
        }
    }

    @Test
    public void shouldNotAllowToAddProcessorWithNullParents() {
        this.topology.addSource("source", new String[]{"topic-1"});
        try {
            this.topology.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{(String) null});
            Assert.fail("Should throw NullPointerException for processor when null parent names are provided");
        } catch (NullPointerException e) {
        }
    }

    @Test
    public void shouldFailOnUnknownSource() {
        Assert.assertThrows(TopologyException.class, () -> {
            this.topology.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source"});
        });
    }

    @Test
    public void shouldFailIfNodeIsItsOwnParent() {
        Assert.assertThrows(TopologyException.class, () -> {
            this.topology.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"processor"});
        });
    }

    @Test
    public void shouldNotAllowToAddSinkWithSameName() {
        this.topology.addSource("source", new String[]{"topic-1"});
        this.topology.addSink("sink", "topic-2", new String[]{"source"});
        try {
            this.topology.addSink("sink", "topic-3", new String[]{"source"});
            Assert.fail("Should throw TopologyException for duplicate sink name");
        } catch (TopologyException e) {
        }
    }

    @Test
    public void shouldNotAllowToAddSinkWithEmptyParents() {
        this.topology.addSource("source", new String[]{"topic-1"});
        this.topology.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source"});
        try {
            this.topology.addSink("sink", "topic-2", new String[0]);
            Assert.fail("Should throw TopologyException for sink without at least one parent node");
        } catch (TopologyException e) {
        }
    }

    @Test
    public void shouldNotAllowToAddSinkWithNullParents() {
        this.topology.addSource("source", new String[]{"topic-1"});
        this.topology.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source"});
        try {
            this.topology.addSink("sink", "topic-2", new String[]{(String) null});
            Assert.fail("Should throw NullPointerException for sink when null parent names are provided");
        } catch (NullPointerException e) {
        }
    }

    @Test
    public void shouldFailWithUnknownParent() {
        Assert.assertThrows(TopologyException.class, () -> {
            this.topology.addSink("sink", "topic-2", new String[]{"source"});
        });
    }

    @Test
    public void shouldFailIfSinkIsItsOwnParent() {
        Assert.assertThrows(TopologyException.class, () -> {
            this.topology.addSink("sink", "topic-2", new String[]{"sink"});
        });
    }

    @Test
    public void shouldFailIfSinkIsParent() {
        this.topology.addSource("source", new String[]{"topic-1"});
        this.topology.addSink("sink-1", "topic-2", new String[]{"source"});
        try {
            this.topology.addSink("sink-2", "topic-3", new String[]{"sink-1"});
            Assert.fail("Should throw TopologyException for using sink as parent");
        } catch (TopologyException e) {
        }
    }

    @Test
    public void shouldNotAllowToAddStateStoreToNonExistingProcessor() {
        mockStoreBuilder();
        EasyMock.replay(new Object[]{this.storeBuilder});
        Assert.assertThrows(TopologyException.class, () -> {
            this.topology.addStateStore(this.storeBuilder, new String[]{"no-such-processor"});
        });
    }

    @Test
    public void shouldNotAllowToAddStateStoreToSource() {
        mockStoreBuilder();
        EasyMock.replay(new Object[]{this.storeBuilder});
        this.topology.addSource("source-1", new String[]{"topic-1"});
        try {
            this.topology.addStateStore(this.storeBuilder, new String[]{"source-1"});
            Assert.fail("Should have thrown TopologyException for adding store to source node");
        } catch (TopologyException e) {
        }
    }

    @Test
    public void shouldNotAllowToAddStateStoreToSink() {
        mockStoreBuilder();
        EasyMock.replay(new Object[]{this.storeBuilder});
        this.topology.addSource("source-1", new String[]{"topic-1"});
        this.topology.addSink("sink-1", "topic-1", new String[]{"source-1"});
        try {
            this.topology.addStateStore(this.storeBuilder, new String[]{"sink-1"});
            Assert.fail("Should have thrown TopologyException for adding store to sink node");
        } catch (TopologyException e) {
        }
    }

    private void mockStoreBuilder() {
        EasyMock.expect(this.storeBuilder.name()).andReturn("store").anyTimes();
        EasyMock.expect(this.storeBuilder.logConfig()).andReturn(Collections.emptyMap());
        EasyMock.expect(Boolean.valueOf(this.storeBuilder.loggingEnabled())).andReturn(false);
    }

    @Test
    public void shouldNotAllowToAddStoreWithSameNameAndDifferentInstance() {
        mockStoreBuilder();
        EasyMock.replay(new Object[]{this.storeBuilder});
        this.topology.addStateStore(this.storeBuilder, new String[0]);
        StoreBuilder storeBuilder = (StoreBuilder) EasyMock.createNiceMock(StoreBuilder.class);
        EasyMock.expect(storeBuilder.name()).andReturn("store").anyTimes();
        EasyMock.expect(storeBuilder.logConfig()).andReturn(Collections.emptyMap());
        EasyMock.expect(Boolean.valueOf(storeBuilder.loggingEnabled())).andReturn(false);
        EasyMock.replay(new Object[]{storeBuilder});
        try {
            this.topology.addStateStore(storeBuilder, new String[0]);
            Assert.fail("Should have thrown TopologyException for same store name with different StoreBuilder");
        } catch (TopologyException e) {
        }
    }

    @Test
    public void shouldAllowToShareStoreUsingSameStoreBuilder() {
        mockStoreBuilder();
        EasyMock.replay(new Object[]{this.storeBuilder});
        this.topology.addSource("source", new String[]{"topic-1"});
        this.topology.addProcessor("processor-1", new MockProcessorSupplierProvidingStore(this.storeBuilder), new String[]{"source"});
        this.topology.addProcessor("processor-2", new MockProcessorSupplierProvidingStore(this.storeBuilder), new String[]{"source"});
    }

    @Test
    public void shouldThrowOnUnassignedStateStoreAccess() {
        mockStoreBuilder();
        EasyMock.expect(this.storeBuilder.build()).andReturn(new MockKeyValueStore("store", false));
        EasyMock.replay(new Object[]{this.storeBuilder});
        this.topology.addSource("source", new String[]{"topic"}).addProcessor("goodGuy", new LocalMockProcessorSupplier(), new String[]{"source"}).addStateStore(this.storeBuilder, new String[]{"goodGuy"}).addProcessor("badGuy", new LocalMockProcessorSupplier(), new String[]{"source"});
        try {
            new TopologyTestDriver(this.topology);
            Assert.fail("Should have thrown StreamsException");
        } catch (StreamsException e) {
            MatcherAssert.assertThat(e.toString(), CoreMatchers.equalTo("org.apache.kafka.streams.errors.StreamsException: failed to initialize processor badGuy"));
        }
    }

    @Test
    @Deprecated
    public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
        EasyMock.expect(this.globalStoreBuilder.name()).andReturn("anyName").anyTimes();
        EasyMock.replay(new Object[]{this.globalStoreBuilder});
        Assert.assertThrows(TopologyException.class, () -> {
            this.topology.addGlobalStore(this.globalStoreBuilder, "sameName", (Deserializer) null, (Deserializer) null, "anyTopicName", "sameName", new MockProcessorSupplier());
        });
    }

    @Test
    public void shouldDescribeEmptyTopology() {
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
    }

    @Test
    public void sinkShouldReturnNullTopicWithDynamicRouting() {
        MatcherAssert.assertThat(new InternalTopologyBuilder.Sink("sink", (obj, obj2, recordContext) -> {
            return recordContext.topic() + "-" + obj;
        }).topic(), CoreMatchers.equalTo((Object) null));
    }

    @Test
    public void sinkShouldReturnTopicNameExtractorWithDynamicRouting() {
        TopicNameExtractor topicNameExtractor = (obj, obj2, recordContext) -> {
            return recordContext.topic() + "-" + obj;
        };
        MatcherAssert.assertThat(new InternalTopologyBuilder.Sink("sink", topicNameExtractor).topicNameExtractor(), CoreMatchers.equalTo(topicNameExtractor));
    }

    @Test
    public void singleSourceShouldHaveSingleSubtopology() {
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, Collections.singleton(addSource("source", "topic"))));
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
        MatcherAssert.assertThat(Integer.valueOf(this.topology.describe().hashCode()), CoreMatchers.equalTo(Integer.valueOf(this.expectedDescription.hashCode())));
    }

    @Test
    public void singleSourceWithListOfTopicsShouldHaveSingleSubtopology() {
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, Collections.singleton(addSource("source", "topic1", "topic2", "topic3"))));
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
        MatcherAssert.assertThat(Integer.valueOf(this.topology.describe().hashCode()), CoreMatchers.equalTo(Integer.valueOf(this.expectedDescription.hashCode())));
    }

    @Test
    public void singleSourcePatternShouldHaveSingleSubtopology() {
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, Collections.singleton(addSource("source", Pattern.compile("topic[0-9]")))));
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
        MatcherAssert.assertThat(Integer.valueOf(this.topology.describe().hashCode()), CoreMatchers.equalTo(Integer.valueOf(this.expectedDescription.hashCode())));
    }

    @Test
    public void multipleSourcesShouldHaveDistinctSubtopologies() {
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, Collections.singleton(addSource("source1", "topic1"))));
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(1, Collections.singleton(addSource("source2", "topic2"))));
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(2, Collections.singleton(addSource("source3", "topic3"))));
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
        MatcherAssert.assertThat(Integer.valueOf(this.topology.describe().hashCode()), CoreMatchers.equalTo(Integer.valueOf(this.expectedDescription.hashCode())));
    }

    @Test
    public void sourceAndProcessorShouldHaveSingleSubtopology() {
        TopologyDescription.Source addSource = addSource("source", "topic");
        TopologyDescription.Processor addProcessor = addProcessor("processor", addSource);
        HashSet hashSet = new HashSet();
        hashSet.add(addSource);
        hashSet.add(addProcessor);
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, hashSet));
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
        MatcherAssert.assertThat(Integer.valueOf(this.topology.describe().hashCode()), CoreMatchers.equalTo(Integer.valueOf(this.expectedDescription.hashCode())));
    }

    @Test
    public void sourceAndProcessorWithStateShouldHaveSingleSubtopology() {
        TopologyDescription.Source addSource = addSource("source", "topic");
        TopologyDescription.Processor addProcessorWithNewStore = addProcessorWithNewStore("processor", new String[]{"store"}, addSource);
        HashSet hashSet = new HashSet();
        hashSet.add(addSource);
        hashSet.add(addProcessorWithNewStore);
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, hashSet));
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
        MatcherAssert.assertThat(Integer.valueOf(this.topology.describe().hashCode()), CoreMatchers.equalTo(Integer.valueOf(this.expectedDescription.hashCode())));
    }

    @Test
    public void sourceAndProcessorWithMultipleStatesShouldHaveSingleSubtopology() {
        TopologyDescription.Source addSource = addSource("source", "topic");
        TopologyDescription.Processor addProcessorWithNewStore = addProcessorWithNewStore("processor", new String[]{"store1", "store2"}, addSource);
        HashSet hashSet = new HashSet();
        hashSet.add(addSource);
        hashSet.add(addProcessorWithNewStore);
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, hashSet));
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
        MatcherAssert.assertThat(Integer.valueOf(this.topology.describe().hashCode()), CoreMatchers.equalTo(Integer.valueOf(this.expectedDescription.hashCode())));
    }

    @Test
    public void sourceWithMultipleProcessorsShouldHaveSingleSubtopology() {
        TopologyDescription.Source addSource = addSource("source", "topic");
        TopologyDescription.Processor addProcessor = addProcessor("processor1", addSource);
        TopologyDescription.Processor addProcessor2 = addProcessor("processor2", addSource);
        HashSet hashSet = new HashSet();
        hashSet.add(addSource);
        hashSet.add(addProcessor);
        hashSet.add(addProcessor2);
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, hashSet));
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
        MatcherAssert.assertThat(Integer.valueOf(this.topology.describe().hashCode()), CoreMatchers.equalTo(Integer.valueOf(this.expectedDescription.hashCode())));
    }

    @Test
    public void processorWithMultipleSourcesShouldHaveSingleSubtopology() {
        TopologyDescription.Source addSource = addSource("source1", "topic0");
        TopologyDescription.Source addSource2 = addSource("source2", Pattern.compile("topic[1-9]"));
        TopologyDescription.Processor addProcessor = addProcessor("processor", addSource, addSource2);
        HashSet hashSet = new HashSet();
        hashSet.add(addSource);
        hashSet.add(addSource2);
        hashSet.add(addProcessor);
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, hashSet));
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
        MatcherAssert.assertThat(Integer.valueOf(this.topology.describe().hashCode()), CoreMatchers.equalTo(Integer.valueOf(this.expectedDescription.hashCode())));
    }

    @Test
    public void multipleSourcesWithProcessorsShouldHaveDistinctSubtopologies() {
        TopologyDescription.Source addSource = addSource("source1", "topic1");
        TopologyDescription.Processor addProcessor = addProcessor("processor1", addSource);
        TopologyDescription.Source addSource2 = addSource("source2", "topic2");
        TopologyDescription.Processor addProcessor2 = addProcessor("processor2", addSource2);
        TopologyDescription.Source addSource3 = addSource("source3", "topic3");
        TopologyDescription.Processor addProcessor3 = addProcessor("processor3", addSource3);
        HashSet hashSet = new HashSet();
        hashSet.add(addSource);
        hashSet.add(addProcessor);
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, hashSet));
        HashSet hashSet2 = new HashSet();
        hashSet2.add(addSource2);
        hashSet2.add(addProcessor2);
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(1, hashSet2));
        HashSet hashSet3 = new HashSet();
        hashSet3.add(addSource3);
        hashSet3.add(addProcessor3);
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(2, hashSet3));
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
        MatcherAssert.assertThat(Integer.valueOf(this.topology.describe().hashCode()), CoreMatchers.equalTo(Integer.valueOf(this.expectedDescription.hashCode())));
    }

    @Test
    public void multipleSourcesWithSinksShouldHaveDistinctSubtopologies() {
        TopologyDescription.Source addSource = addSource("source1", "topic1");
        TopologyDescription.Sink addSink = addSink("sink1", "sinkTopic1", addSource);
        TopologyDescription.Source addSource2 = addSource("source2", "topic2");
        TopologyDescription.Sink addSink2 = addSink("sink2", "sinkTopic2", addSource2);
        TopologyDescription.Source addSource3 = addSource("source3", "topic3");
        TopologyDescription.Sink addSink3 = addSink("sink3", "sinkTopic3", addSource3);
        HashSet hashSet = new HashSet();
        hashSet.add(addSource);
        hashSet.add(addSink);
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, hashSet));
        HashSet hashSet2 = new HashSet();
        hashSet2.add(addSource2);
        hashSet2.add(addSink2);
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(1, hashSet2));
        HashSet hashSet3 = new HashSet();
        hashSet3.add(addSource3);
        hashSet3.add(addSink3);
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(2, hashSet3));
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
        MatcherAssert.assertThat(Integer.valueOf(this.topology.describe().hashCode()), CoreMatchers.equalTo(Integer.valueOf(this.expectedDescription.hashCode())));
    }

    @Test
    public void processorsWithSameSinkShouldHaveSameSubtopology() {
        TopologyDescription.Source addSource = addSource("source", "topic");
        TopologyDescription.Processor addProcessor = addProcessor("processor1", addSource);
        TopologyDescription.Source addSource2 = addSource("source2", "topic2");
        TopologyDescription.Processor addProcessor2 = addProcessor("processor2", addSource2);
        TopologyDescription.Source addSource3 = addSource("source3", "topic3");
        TopologyDescription.Processor addProcessor3 = addProcessor("processor3", addSource3);
        TopologyDescription.Sink addSink = addSink("sink", "sinkTopic", addProcessor, addProcessor2, addProcessor3);
        HashSet hashSet = new HashSet();
        hashSet.add(addSource);
        hashSet.add(addProcessor);
        hashSet.add(addSource2);
        hashSet.add(addProcessor2);
        hashSet.add(addSource3);
        hashSet.add(addProcessor3);
        hashSet.add(addSink);
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, hashSet));
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
        MatcherAssert.assertThat(Integer.valueOf(this.topology.describe().hashCode()), CoreMatchers.equalTo(Integer.valueOf(this.expectedDescription.hashCode())));
    }

    @Test
    public void processorsWithSharedStateShouldHaveSameSubtopology() {
        String[] strArr = {"store1"};
        String[] strArr2 = {"store2"};
        String[] strArr3 = {strArr[0], strArr2[0]};
        TopologyDescription.Source addSource = addSource("source", "topic");
        TopologyDescription.Processor addProcessorWithNewStore = addProcessorWithNewStore("processor1", strArr, addSource);
        TopologyDescription.Source addSource2 = addSource("source2", "topic2");
        TopologyDescription.Processor addProcessorWithNewStore2 = addProcessorWithNewStore("processor2", strArr2, addSource2);
        TopologyDescription.Source addSource3 = addSource("source3", "topic3");
        TopologyDescription.Processor addProcessorWithExistingStore = addProcessorWithExistingStore("processor3", strArr3, addSource3);
        HashSet hashSet = new HashSet();
        hashSet.add(addSource);
        hashSet.add(addProcessorWithNewStore);
        hashSet.add(addSource2);
        hashSet.add(addProcessorWithNewStore2);
        hashSet.add(addSource3);
        hashSet.add(addProcessorWithExistingStore);
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, hashSet));
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
        MatcherAssert.assertThat(Integer.valueOf(this.topology.describe().hashCode()), CoreMatchers.equalTo(Integer.valueOf(this.expectedDescription.hashCode())));
    }

    @Test
    public void shouldDescribeGlobalStoreTopology() {
        addGlobalStoreToTopologyAndExpectedDescription("globalStore", "source", "globalTopic", "processor", 0);
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
        MatcherAssert.assertThat(Integer.valueOf(this.topology.describe().hashCode()), CoreMatchers.equalTo(Integer.valueOf(this.expectedDescription.hashCode())));
    }

    @Test
    public void shouldDescribeMultipleGlobalStoreTopology() {
        addGlobalStoreToTopologyAndExpectedDescription("globalStore1", "source1", "globalTopic1", "processor1", 0);
        addGlobalStoreToTopologyAndExpectedDescription("globalStore2", "source2", "globalTopic2", "processor2", 1);
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
        MatcherAssert.assertThat(Integer.valueOf(this.topology.describe().hashCode()), CoreMatchers.equalTo(Integer.valueOf(this.expectedDescription.hashCode())));
    }

    @Test
    public void topologyWithDynamicRoutingShouldDescribeExtractorClass() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("input-topic").to(new TopicNameExtractor<Object, Object>() { // from class: org.apache.kafka.streams.TopologyTest.1
            public String extract(Object obj, Object obj2, RecordContext recordContext) {
                return recordContext.topic() + "-" + obj;
            }

            public String toString() {
                return "anonymous topic name extractor. topic is [recordContext.topic()]-[key]";
            }
        });
        Assert.assertEquals("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> KSTREAM-SINK-0000000001\n    Sink: KSTREAM-SINK-0000000001 (extractor class: anonymous topic name extractor. topic is [recordContext.topic()]-[key])\n      <-- KSTREAM-SOURCE-0000000000\n\n", streamsBuilder.build().describe().toString());
    }

    @Test
    public void kGroupedStreamZeroArgCountShouldPreserveTopologyStructure() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("input-topic").groupByKey().count();
        Assert.assertEquals("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> KSTREAM-AGGREGATE-0000000002\n    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])\n      --> none\n      <-- KSTREAM-SOURCE-0000000000\n\n", streamsBuilder.build().describe().toString());
    }

    @Test
    public void kGroupedStreamNamedMaterializedCountShouldPreserveTopologyStructure() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("input-topic").groupByKey().count(Materialized.as("count-store"));
        Assert.assertEquals("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> KSTREAM-AGGREGATE-0000000001\n    Processor: KSTREAM-AGGREGATE-0000000001 (stores: [count-store])\n      --> none\n      <-- KSTREAM-SOURCE-0000000000\n\n", streamsBuilder.build().describe().toString());
    }

    @Test
    public void kGroupedStreamAnonymousMaterializedCountShouldPreserveTopologyStructure() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("input-topic").groupByKey().count(Materialized.with((Serde) null, Serdes.Long()));
        Assert.assertEquals("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> KSTREAM-AGGREGATE-0000000003\n    Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n      --> none\n      <-- KSTREAM-SOURCE-0000000000\n\n", streamsBuilder.build().describe().toString());
    }

    @Test
    public void timeWindowZeroArgCountShouldPreserveTopologyStructure() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("input-topic").groupByKey().windowedBy(TimeWindows.of(Duration.ofMillis(1L))).count();
        Assert.assertEquals("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> KSTREAM-AGGREGATE-0000000002\n    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])\n      --> none\n      <-- KSTREAM-SOURCE-0000000000\n\n", streamsBuilder.build().describe().toString());
    }

    @Test
    public void timeWindowNamedMaterializedCountShouldPreserveTopologyStructure() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("input-topic").groupByKey().windowedBy(TimeWindows.of(Duration.ofMillis(1L))).count(Materialized.as("count-store"));
        Assert.assertEquals("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> KSTREAM-AGGREGATE-0000000001\n    Processor: KSTREAM-AGGREGATE-0000000001 (stores: [count-store])\n      --> none\n      <-- KSTREAM-SOURCE-0000000000\n\n", streamsBuilder.build().describe().toString());
    }

    @Test
    public void timeWindowAnonymousMaterializedCountShouldPreserveTopologyStructure() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("input-topic").groupByKey().windowedBy(TimeWindows.of(Duration.ofMillis(1L))).count(Materialized.with((Serde) null, Serdes.Long()));
        Assert.assertEquals("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> KSTREAM-AGGREGATE-0000000003\n    Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n      --> none\n      <-- KSTREAM-SOURCE-0000000000\n\n", streamsBuilder.build().describe().toString());
    }

    @Test
    public void sessionWindowZeroArgCountShouldPreserveTopologyStructure() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("input-topic").groupByKey().windowedBy(SessionWindows.with(Duration.ofMillis(1L))).count();
        Assert.assertEquals("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> KSTREAM-AGGREGATE-0000000002\n    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])\n      --> none\n      <-- KSTREAM-SOURCE-0000000000\n\n", streamsBuilder.build().describe().toString());
    }

    @Test
    public void sessionWindowNamedMaterializedCountShouldPreserveTopologyStructure() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("input-topic").groupByKey().windowedBy(SessionWindows.with(Duration.ofMillis(1L))).count(Materialized.as("count-store"));
        Assert.assertEquals("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> KSTREAM-AGGREGATE-0000000001\n    Processor: KSTREAM-AGGREGATE-0000000001 (stores: [count-store])\n      --> none\n      <-- KSTREAM-SOURCE-0000000000\n\n", streamsBuilder.build().describe().toString());
    }

    @Test
    public void sessionWindowAnonymousMaterializedCountShouldPreserveTopologyStructure() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("input-topic").groupByKey().windowedBy(SessionWindows.with(Duration.ofMillis(1L))).count(Materialized.with((Serde) null, Serdes.Long()));
        Assert.assertEquals("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> KSTREAM-AGGREGATE-0000000003\n    Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n      --> none\n      <-- KSTREAM-SOURCE-0000000000\n\n", streamsBuilder.build().describe().toString());
    }

    @Test
    public void tableZeroArgCountShouldPreserveTopologyStructure() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table("input-topic").groupBy((obj, obj2) -> {
            return null;
        }).count();
        Assert.assertEquals("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n      --> KTABLE-SOURCE-0000000002\n    Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n      --> KTABLE-SELECT-0000000003\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KTABLE-SELECT-0000000003 (stores: [])\n      --> KSTREAM-SINK-0000000005\n      <-- KTABLE-SOURCE-0000000002\n    Sink: KSTREAM-SINK-0000000005 (topic: KTABLE-AGGREGATE-STATE-STORE-0000000004-repartition)\n      <-- KTABLE-SELECT-0000000003\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000006 (topics: [KTABLE-AGGREGATE-STATE-STORE-0000000004-repartition])\n      --> KTABLE-AGGREGATE-0000000007\n    Processor: KTABLE-AGGREGATE-0000000007 (stores: [KTABLE-AGGREGATE-STATE-STORE-0000000004])\n      --> none\n      <-- KSTREAM-SOURCE-0000000006\n\n", streamsBuilder.build().describe().toString());
    }

    @Test
    public void tableNamedMaterializedCountShouldPreserveTopologyStructure() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table("input-topic").groupBy((obj, obj2) -> {
            return null;
        }).count(Materialized.as("count-store"));
        Assert.assertEquals("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n      --> KTABLE-SOURCE-0000000002\n    Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n      --> KTABLE-SELECT-0000000003\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KTABLE-SELECT-0000000003 (stores: [])\n      --> KSTREAM-SINK-0000000004\n      <-- KTABLE-SOURCE-0000000002\n    Sink: KSTREAM-SINK-0000000004 (topic: count-store-repartition)\n      <-- KTABLE-SELECT-0000000003\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000005 (topics: [count-store-repartition])\n      --> KTABLE-AGGREGATE-0000000006\n    Processor: KTABLE-AGGREGATE-0000000006 (stores: [count-store])\n      --> none\n      <-- KSTREAM-SOURCE-0000000005\n\n", streamsBuilder.build().describe().toString());
    }

    @Test
    public void tableAnonymousMaterializedCountShouldPreserveTopologyStructure() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table("input-topic").groupBy((obj, obj2) -> {
            return null;
        }).count(Materialized.with((Serde) null, Serdes.Long()));
        Assert.assertEquals("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n      --> KTABLE-SOURCE-0000000002\n    Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n      --> KTABLE-SELECT-0000000003\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KTABLE-SELECT-0000000003 (stores: [])\n      --> KSTREAM-SINK-0000000005\n      <-- KTABLE-SOURCE-0000000002\n    Sink: KSTREAM-SINK-0000000005 (topic: KTABLE-AGGREGATE-STATE-STORE-0000000004-repartition)\n      <-- KTABLE-SELECT-0000000003\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000006 (topics: [KTABLE-AGGREGATE-STATE-STORE-0000000004-repartition])\n      --> KTABLE-AGGREGATE-0000000007\n    Processor: KTABLE-AGGREGATE-0000000007 (stores: [KTABLE-AGGREGATE-STATE-STORE-0000000004])\n      --> none\n      <-- KSTREAM-SOURCE-0000000006\n\n", streamsBuilder.build().describe().toString());
    }

    @Test
    public void kTableNonMaterializedMapValuesShouldPreserveTopologyStructure() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table("input-topic").mapValues((obj, obj2) -> {
            return null;
        });
        Assert.assertEquals("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n      --> KTABLE-SOURCE-0000000002\n    Processor: KTABLE-SOURCE-0000000002 (stores: [])\n      --> KTABLE-MAPVALUES-0000000003\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KTABLE-MAPVALUES-0000000003 (stores: [])\n      --> none\n      <-- KTABLE-SOURCE-0000000002\n\n", streamsBuilder.build().describe().toString());
    }

    @Test
    public void kTableAnonymousMaterializedMapValuesShouldPreserveTopologyStructure() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table("input-topic").mapValues((obj, obj2) -> {
            return null;
        }, Materialized.with((Serde) null, (Serde) null));
        Assert.assertEquals("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n      --> KTABLE-SOURCE-0000000002\n    Processor: KTABLE-SOURCE-0000000002 (stores: [])\n      --> KTABLE-MAPVALUES-0000000004\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KTABLE-MAPVALUES-0000000004 (stores: [])\n      --> none\n      <-- KTABLE-SOURCE-0000000002\n\n", streamsBuilder.build().describe().toString());
    }

    @Test
    public void kTableNamedMaterializedMapValuesShouldPreserveTopologyStructure() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table("input-topic").mapValues((obj, obj2) -> {
            return null;
        }, Materialized.as("store-name").withKeySerde((Serde) null).withValueSerde((Serde) null));
        Assert.assertEquals("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n      --> KTABLE-SOURCE-0000000002\n    Processor: KTABLE-SOURCE-0000000002 (stores: [])\n      --> KTABLE-MAPVALUES-0000000003\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KTABLE-MAPVALUES-0000000003 (stores: [store-name])\n      --> none\n      <-- KTABLE-SOURCE-0000000002\n\n", streamsBuilder.build().describe().toString());
    }

    @Test
    public void kTableNonMaterializedFilterShouldPreserveTopologyStructure() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table("input-topic").filter((obj, obj2) -> {
            return false;
        });
        Assert.assertEquals("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n      --> KTABLE-SOURCE-0000000002\n    Processor: KTABLE-SOURCE-0000000002 (stores: [])\n      --> KTABLE-FILTER-0000000003\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KTABLE-FILTER-0000000003 (stores: [])\n      --> none\n      <-- KTABLE-SOURCE-0000000002\n\n", streamsBuilder.build().describe().toString());
    }

    @Test
    public void kTableAnonymousMaterializedFilterShouldPreserveTopologyStructure() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table("input-topic").filter((obj, obj2) -> {
            return false;
        }, Materialized.with((Serde) null, (Serde) null));
        Assert.assertEquals("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n      --> KTABLE-SOURCE-0000000002\n    Processor: KTABLE-SOURCE-0000000002 (stores: [])\n      --> KTABLE-FILTER-0000000004\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KTABLE-FILTER-0000000004 (stores: [])\n      --> none\n      <-- KTABLE-SOURCE-0000000002\n\n", streamsBuilder.build().describe().toString());
    }

    @Test
    public void kTableNamedMaterializedFilterShouldPreserveTopologyStructure() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table("input-topic").filter((obj, obj2) -> {
            return false;
        }, Materialized.as("store-name"));
        Assert.assertEquals("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n      --> KTABLE-SOURCE-0000000002\n    Processor: KTABLE-SOURCE-0000000002 (stores: [])\n      --> KTABLE-FILTER-0000000003\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KTABLE-FILTER-0000000003 (stores: [store-name])\n      --> none\n      <-- KTABLE-SOURCE-0000000002\n\n", streamsBuilder.build().describe().toString());
    }

    @Test
    public void topologyWithStaticTopicNameExtractorShouldRespectEqualHashcodeContract() {
        Topology topology = topologyWithStaticTopicName();
        Topology topology2 = topologyWithStaticTopicName();
        MatcherAssert.assertThat(topology.describe(), CoreMatchers.equalTo(topology2.describe()));
        MatcherAssert.assertThat(Integer.valueOf(topology.describe().hashCode()), CoreMatchers.equalTo(Integer.valueOf(topology2.describe().hashCode())));
    }

    private Topology topologyWithStaticTopicName() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("from-topic-name").to("to-topic-name");
        return streamsBuilder.build();
    }

    private TopologyDescription.Source addSource(String str, String... strArr) {
        this.topology.addSource((Topology.AutoOffsetReset) null, str, (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, strArr);
        StringBuilder sb = new StringBuilder(strArr[0]);
        for (int i = 1; i < strArr.length; i++) {
            sb.append(", ").append(strArr[i]);
        }
        return new InternalTopologyBuilder.Source(str, new HashSet(Arrays.asList(strArr)), (Pattern) null);
    }

    private TopologyDescription.Source addSource(String str, Pattern pattern) {
        this.topology.addSource((Topology.AutoOffsetReset) null, str, (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, pattern);
        return new InternalTopologyBuilder.Source(str, (Set) null, pattern);
    }

    private TopologyDescription.Processor addProcessor(String str, TopologyDescription.Node... nodeArr) {
        return addProcessorWithNewStore(str, new String[0], nodeArr);
    }

    private TopologyDescription.Processor addProcessorWithNewStore(String str, String[] strArr, TopologyDescription.Node... nodeArr) {
        return addProcessorWithStore(str, strArr, true, nodeArr);
    }

    private TopologyDescription.Processor addProcessorWithExistingStore(String str, String[] strArr, TopologyDescription.Node... nodeArr) {
        return addProcessorWithStore(str, strArr, false, nodeArr);
    }

    private TopologyDescription.Processor addProcessorWithStore(String str, String[] strArr, boolean z, TopologyDescription.Node... nodeArr) {
        String[] strArr2 = new String[nodeArr.length];
        for (int i = 0; i < nodeArr.length; i++) {
            strArr2[i] = nodeArr[i].name();
        }
        this.topology.addProcessor(str, new MockApiProcessorSupplier(), strArr2);
        if (z) {
            for (String str2 : strArr) {
                StoreBuilder storeBuilder = (StoreBuilder) EasyMock.createNiceMock(StoreBuilder.class);
                EasyMock.expect(storeBuilder.name()).andReturn(str2).anyTimes();
                EasyMock.replay(new Object[]{storeBuilder});
                this.topology.addStateStore(storeBuilder, new String[]{str});
            }
        } else {
            this.topology.connectProcessorAndStateStores(str, strArr);
        }
        InternalTopologyBuilder.AbstractNode processor = new InternalTopologyBuilder.Processor(str, new HashSet(Arrays.asList(strArr)));
        for (TopologyDescription.Node node : nodeArr) {
            ((InternalTopologyBuilder.AbstractNode) node).addSuccessor(processor);
            processor.addPredecessor(node);
        }
        return processor;
    }

    private TopologyDescription.Sink addSink(String str, String str2, TopologyDescription.Node... nodeArr) {
        String[] strArr = new String[nodeArr.length];
        for (int i = 0; i < nodeArr.length; i++) {
            strArr[i] = nodeArr[i].name();
        }
        this.topology.addSink(str, str2, (Serializer) null, (Serializer) null, (StreamPartitioner) null, strArr);
        InternalTopologyBuilder.AbstractNode sink = new InternalTopologyBuilder.Sink(str, str2);
        for (TopologyDescription.Node node : nodeArr) {
            ((InternalTopologyBuilder.AbstractNode) node).addSuccessor(sink);
            sink.addPredecessor(node);
        }
        return sink;
    }

    @Deprecated
    private void addGlobalStoreToTopologyAndExpectedDescription(String str, String str2, String str3, String str4, int i) {
        KeyValueStoreBuilder keyValueStoreBuilder = (KeyValueStoreBuilder) EasyMock.createNiceMock(KeyValueStoreBuilder.class);
        EasyMock.expect(keyValueStoreBuilder.name()).andReturn(str).anyTimes();
        EasyMock.replay(new Object[]{keyValueStoreBuilder});
        this.topology.addGlobalStore(keyValueStoreBuilder, str2, (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, str3, str4, new MockProcessorSupplier());
        this.expectedDescription.addGlobalStore(new InternalTopologyBuilder.GlobalStore(str2, str4, str, str3, i));
    }
}
