package org.apache.kafka.streams;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStore;
import org.apache.kafka.test.ProcessorTopologyTestDriver;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/TopologyTest.class */
public class TopologyTest {
    private final StoreBuilder storeBuilder = (StoreBuilder) EasyMock.createNiceMock(StoreBuilder.class);
    private final KeyValueStoreBuilder globalStoreBuilder = (KeyValueStoreBuilder) EasyMock.createNiceMock(KeyValueStoreBuilder.class);
    private final Topology topology = new Topology();
    private final InternalTopologyBuilder.TopologyDescription expectedDescription = new InternalTopologyBuilder.TopologyDescription();

    /* loaded from: input_file:org/apache/kafka/streams/TopologyTest$LocalMockProcessorSupplier.class */
    private static class LocalMockProcessorSupplier implements ProcessorSupplier {
        static final String STORE_NAME = "store";

        private LocalMockProcessorSupplier() {
        }

        public Processor get() {
            return new Processor() { // from class: org.apache.kafka.streams.TopologyTest.LocalMockProcessorSupplier.1
                public void init(ProcessorContext processorContext) {
                    processorContext.getStateStore(LocalMockProcessorSupplier.STORE_NAME);
                }

                public void process(Object obj, Object obj2) {
                }

                public void punctuate(long j) {
                }

                public void close() {
                }
            };
        }
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullNameWhenAddingSourceWithTopic() {
        this.topology.addSource((String) null, new String[]{"topic"});
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullNameWhenAddingSourceWithPattern() {
        this.topology.addSource((String) null, Pattern.compile(".*"));
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullTopicsWhenAddingSoureWithTopic() {
        this.topology.addSource("source", (String[]) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullTopicsWhenAddingSourceWithPattern() {
        this.topology.addSource("source", (Pattern) null);
    }

    @Test(expected = TopologyException.class)
    public void shouldNotAllowZeroTopicsWhenAddingSource() {
        this.topology.addSource("source", new String[0]);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullNameWhenAddingProcessor() {
        this.topology.addProcessor((String) null, new ProcessorSupplier() { // from class: org.apache.kafka.streams.TopologyTest.1
            public Processor get() {
                return new MockProcessorSupplier().get();
            }
        }, new String[0]);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullProcessorSupplierWhenAddingProcessor() {
        this.topology.addProcessor("name", (ProcessorSupplier) null, new String[0]);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullNameWhenAddingSink() {
        this.topology.addSink((String) null, "topic", new String[0]);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullTopicWhenAddingSink() {
        this.topology.addSink("name", (String) null, new String[0]);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores() {
        this.topology.connectProcessorAndStateStores((String) null, new String[]{"store"});
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullStoreNameWhenConnectingProcessorAndStateStores() {
        this.topology.connectProcessorAndStateStores("processor", (String[]) null);
    }

    @Test(expected = TopologyException.class)
    public void shouldNotAllowZeroStoreNameWhenConnectingProcessorAndStateStores() {
        this.topology.connectProcessorAndStateStores("processor", new String[0]);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAddNullStateStoreSupplier() {
        this.topology.addStateStore((StoreBuilder) null, new String[0]);
    }

    @Test
    public void shouldNotAllowToAddSourcesWithSameName() {
        this.topology.addSource("source", new String[]{"topic-1"});
        try {
            this.topology.addSource("source", new String[]{"topic-2"});
            Assert.fail("Should throw TopologyException for duplicate source name");
        } catch (TopologyException e) {
        }
    }

    @Test
    public void shouldNotAllowToAddTopicTwice() {
        this.topology.addSource("source", new String[]{"topic-1"});
        try {
            this.topology.addSource("source-2", new String[]{"topic-1"});
            Assert.fail("Should throw TopologyException for already used topic");
        } catch (TopologyException e) {
        }
    }

    @Test
    public void testPatternMatchesAlreadyProvidedTopicSource() {
        this.topology.addSource("source-1", new String[]{"foo"});
        try {
            this.topology.addSource("source-2", Pattern.compile("f.*"));
            Assert.fail("Should have thrown TopologyException for overlapping pattern with already registered topic");
        } catch (TopologyException e) {
        }
    }

    @Test
    public void testNamedTopicMatchesAlreadyProvidedPattern() {
        this.topology.addSource("source-1", Pattern.compile("f.*"));
        try {
            this.topology.addSource("source-2", new String[]{"foo"});
            Assert.fail("Should have thrown TopologyException for overlapping topic with already registered pattern");
        } catch (TopologyException e) {
        }
    }

    @Test
    public void shouldNotAllowToAddProcessorWithSameName() {
        this.topology.addSource("source", new String[]{"topic-1"});
        this.topology.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
        try {
            this.topology.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
            Assert.fail("Should throw TopologyException for duplicate processor name");
        } catch (TopologyException e) {
        }
    }

    @Test(expected = TopologyException.class)
    public void shouldFailOnUnknownSource() {
        this.topology.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
    }

    @Test(expected = TopologyException.class)
    public void shouldFailIfNodeIsItsOwnParent() {
        this.topology.addProcessor("processor", new MockProcessorSupplier(), new String[]{"processor"});
    }

    @Test
    public void shouldNotAllowToAddSinkWithSameName() {
        this.topology.addSource("source", new String[]{"topic-1"});
        this.topology.addSink("sink", "topic-2", new String[]{"source"});
        try {
            this.topology.addSink("sink", "topic-3", new String[]{"source"});
            Assert.fail("Should throw TopologyException for duplicate sink name");
        } catch (TopologyException e) {
        }
    }

    @Test(expected = TopologyException.class)
    public void shouldFailWithUnknownParent() {
        this.topology.addSink("sink", "topic-2", new String[]{"source"});
    }

    @Test(expected = TopologyException.class)
    public void shouldFailIfSinkIsItsOwnParent() {
        this.topology.addSink("sink", "topic-2", new String[]{"sink"});
    }

    @Test
    public void shouldFailIfSinkIsParent() {
        this.topology.addSource("source", new String[]{"topic-1"});
        this.topology.addSink("sink-1", "topic-2", new String[]{"source"});
        try {
            this.topology.addSink("sink-2", "topic-3", new String[]{"sink-1"});
            Assert.fail("Should throw TopologyException for using sink as parent");
        } catch (TopologyException e) {
        }
    }

    @Test(expected = TopologyException.class)
    public void shouldNotAllowToAddStateStoreToNonExistingProcessor() {
        mockStoreBuilder();
        EasyMock.replay(new Object[]{this.storeBuilder});
        this.topology.addStateStore(this.storeBuilder, new String[]{"no-such-processsor"});
    }

    @Test
    public void shouldNotAllowToAddStateStoreToSource() {
        mockStoreBuilder();
        EasyMock.replay(new Object[]{this.storeBuilder});
        this.topology.addSource("source-1", new String[]{"topic-1"});
        try {
            this.topology.addStateStore(this.storeBuilder, new String[]{"source-1"});
            Assert.fail("Should have thrown TopologyException for adding store to source node");
        } catch (TopologyException e) {
        }
    }

    @Test
    public void shouldNotAllowToAddStateStoreToSink() {
        mockStoreBuilder();
        EasyMock.replay(new Object[]{this.storeBuilder});
        this.topology.addSink("sink-1", "topic-1", new String[0]);
        try {
            this.topology.addStateStore(this.storeBuilder, new String[]{"sink-1"});
            Assert.fail("Should have thrown TopologyException for adding store to sink node");
        } catch (TopologyException e) {
        }
    }

    private void mockStoreBuilder() {
        EasyMock.expect(this.storeBuilder.name()).andReturn("store").anyTimes();
        EasyMock.expect(this.storeBuilder.logConfig()).andReturn(Collections.emptyMap());
        EasyMock.expect(Boolean.valueOf(this.storeBuilder.loggingEnabled())).andReturn(false);
    }

    @Test
    public void shouldNotAllowToAddStoreWithSameName() {
        mockStoreBuilder();
        EasyMock.replay(new Object[]{this.storeBuilder});
        this.topology.addStateStore(this.storeBuilder, new String[0]);
        try {
            this.topology.addStateStore(this.storeBuilder, new String[0]);
            Assert.fail("Should have thrown TopologyException for duplicate store name");
        } catch (TopologyException e) {
        }
    }

    @Test(expected = TopologyBuilderException.class)
    public void shouldThrowOnUnassignedStateStoreAccess() throws Exception {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "host:1");
        properties.put("application.id", "appId");
        properties.put("state.dir", TestUtils.tempDirectory().getAbsolutePath());
        StreamsConfig streamsConfig = new StreamsConfig(properties);
        mockStoreBuilder();
        EasyMock.expect(this.storeBuilder.build()).andReturn(new MockStateStore("store", false));
        EasyMock.replay(new Object[]{this.storeBuilder});
        this.topology.addSource("source", new String[]{"topic"}).addProcessor("goodGuy", new LocalMockProcessorSupplier(), new String[]{"source"}).addStateStore(this.storeBuilder, new String[]{"goodGuy"}).addProcessor("badGuy", new LocalMockProcessorSupplier(), new String[]{"source"});
        try {
            new ProcessorTopologyTestDriver(streamsConfig, this.topology.internalTopologyBuilder);
        } catch (StreamsException e) {
            TopologyBuilderException cause = e.getCause();
            if (cause == null || !(cause instanceof TopologyBuilderException) || !cause.getMessage().equals("Invalid topology building: Processor badGuy has no access to StateStore store")) {
                throw new RuntimeException("Did expect different exception. Did catch:", e);
            }
            throw cause;
        }
    }

    @Test(expected = TopologyException.class)
    public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
        EasyMock.expect(this.globalStoreBuilder.name()).andReturn("anyName").anyTimes();
        EasyMock.replay(new Object[]{this.globalStoreBuilder});
        this.topology.addGlobalStore(this.globalStoreBuilder, "sameName", (Deserializer) null, (Deserializer) null, "anyTopicName", "sameName", new MockProcessorSupplier());
    }

    @Test
    public void shouldDescribeEmptyTopology() {
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
    }

    @Test
    public void singleSourceShouldHaveSingleSubtopology() {
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, Collections.singleton(addSource("source", "topic"))));
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
    }

    @Test
    public void singleSourceWithListOfTopicsShouldHaveSingleSubtopology() {
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, Collections.singleton(addSource("source", "topic1", "topic2", "topic3"))));
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
    }

    @Test
    public void singleSourcePatternShouldHaveSingleSubtopology() {
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, Collections.singleton(addSource("source", Pattern.compile("topic[0-9]")))));
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
    }

    @Test
    public void multipleSourcesShouldHaveDistinctSubtopologies() {
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, Collections.singleton(addSource("source1", "topic1"))));
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(1, Collections.singleton(addSource("source2", "topic2"))));
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(2, Collections.singleton(addSource("source3", "topic3"))));
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
    }

    @Test
    public void sourceAndProcessorShouldHaveSingleSubtopology() {
        TopologyDescription.Source addSource = addSource("source", "topic");
        TopologyDescription.Processor addProcessor = addProcessor("processor", addSource);
        HashSet hashSet = new HashSet();
        hashSet.add(addSource);
        hashSet.add(addProcessor);
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, hashSet));
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
    }

    @Test
    public void sourceAndProcessorWithStateShouldHaveSingleSubtopology() {
        TopologyDescription.Source addSource = addSource("source", "topic");
        TopologyDescription.Processor addProcessorWithNewStore = addProcessorWithNewStore("processor", new String[]{"store"}, addSource);
        HashSet hashSet = new HashSet();
        hashSet.add(addSource);
        hashSet.add(addProcessorWithNewStore);
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, hashSet));
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
    }

    @Test
    public void sourceAndProcessorWithMultipleStatesShouldHaveSingleSubtopology() {
        TopologyDescription.Source addSource = addSource("source", "topic");
        TopologyDescription.Processor addProcessorWithNewStore = addProcessorWithNewStore("processor", new String[]{"store1", "store2"}, addSource);
        HashSet hashSet = new HashSet();
        hashSet.add(addSource);
        hashSet.add(addProcessorWithNewStore);
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, hashSet));
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
    }

    @Test
    public void sourceWithMultipleProcessorsShouldHaveSingleSubtopology() {
        TopologyDescription.Source addSource = addSource("source", "topic");
        TopologyDescription.Processor addProcessor = addProcessor("processor1", addSource);
        TopologyDescription.Processor addProcessor2 = addProcessor("processor2", addSource);
        HashSet hashSet = new HashSet();
        hashSet.add(addSource);
        hashSet.add(addProcessor);
        hashSet.add(addProcessor2);
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, hashSet));
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
    }

    @Test
    public void processorWithMultipleSourcesShouldHaveSingleSubtopology() {
        TopologyDescription.Source addSource = addSource("source1", "topic0");
        TopologyDescription.Source addSource2 = addSource("source2", Pattern.compile("topic[1-9]"));
        TopologyDescription.Processor addProcessor = addProcessor("processor", addSource, addSource2);
        HashSet hashSet = new HashSet();
        hashSet.add(addSource);
        hashSet.add(addSource2);
        hashSet.add(addProcessor);
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, hashSet));
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
    }

    @Test
    public void multipleSourcesWithProcessorsShouldHaveDistinctSubtopologies() {
        TopologyDescription.Source addSource = addSource("source1", "topic1");
        TopologyDescription.Processor addProcessor = addProcessor("processor1", addSource);
        TopologyDescription.Source addSource2 = addSource("source2", "topic2");
        TopologyDescription.Processor addProcessor2 = addProcessor("processor2", addSource2);
        TopologyDescription.Source addSource3 = addSource("source3", "topic3");
        TopologyDescription.Processor addProcessor3 = addProcessor("processor3", addSource3);
        HashSet hashSet = new HashSet();
        hashSet.add(addSource);
        hashSet.add(addProcessor);
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, hashSet));
        HashSet hashSet2 = new HashSet();
        hashSet2.add(addSource2);
        hashSet2.add(addProcessor2);
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(1, hashSet2));
        HashSet hashSet3 = new HashSet();
        hashSet3.add(addSource3);
        hashSet3.add(addProcessor3);
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(2, hashSet3));
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
    }

    @Test
    public void multipleSourcesWithSinksShouldHaveDistinctSubtopologies() {
        TopologyDescription.Source addSource = addSource("source1", "topic1");
        TopologyDescription.Sink addSink = addSink("sink1", "sinkTopic1", addSource);
        TopologyDescription.Source addSource2 = addSource("source2", "topic2");
        TopologyDescription.Sink addSink2 = addSink("sink2", "sinkTopic2", addSource2);
        TopologyDescription.Source addSource3 = addSource("source3", "topic3");
        TopologyDescription.Sink addSink3 = addSink("sink3", "sinkTopic3", addSource3);
        HashSet hashSet = new HashSet();
        hashSet.add(addSource);
        hashSet.add(addSink);
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, hashSet));
        HashSet hashSet2 = new HashSet();
        hashSet2.add(addSource2);
        hashSet2.add(addSink2);
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(1, hashSet2));
        HashSet hashSet3 = new HashSet();
        hashSet3.add(addSource3);
        hashSet3.add(addSink3);
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(2, hashSet3));
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
    }

    @Test
    public void processorsWithSameSinkShouldHaveSameSubtopology() {
        TopologyDescription.Source addSource = addSource("source", "topic");
        TopologyDescription.Processor addProcessor = addProcessor("processor1", addSource);
        TopologyDescription.Source addSource2 = addSource("source2", "topic2");
        TopologyDescription.Processor addProcessor2 = addProcessor("processor2", addSource2);
        TopologyDescription.Source addSource3 = addSource("source3", "topic3");
        TopologyDescription.Processor addProcessor3 = addProcessor("processor3", addSource3);
        TopologyDescription.Sink addSink = addSink("sink", "sinkTopic", addProcessor, addProcessor2, addProcessor3);
        HashSet hashSet = new HashSet();
        hashSet.add(addSource);
        hashSet.add(addProcessor);
        hashSet.add(addSource2);
        hashSet.add(addProcessor2);
        hashSet.add(addSource3);
        hashSet.add(addProcessor3);
        hashSet.add(addSink);
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, hashSet));
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
    }

    @Test
    public void processorsWithSharedStateShouldHaveSameSubtopology() {
        String[] strArr = {"store1"};
        String[] strArr2 = {"store2"};
        String[] strArr3 = {strArr[0], strArr2[0]};
        TopologyDescription.Source addSource = addSource("source", "topic");
        TopologyDescription.Processor addProcessorWithNewStore = addProcessorWithNewStore("processor1", strArr, addSource);
        TopologyDescription.Source addSource2 = addSource("source2", "topic2");
        TopologyDescription.Processor addProcessorWithNewStore2 = addProcessorWithNewStore("processor2", strArr2, addSource2);
        TopologyDescription.Source addSource3 = addSource("source3", "topic3");
        TopologyDescription.Processor addProcessorWithExistingStore = addProcessorWithExistingStore("processor3", strArr3, addSource3);
        HashSet hashSet = new HashSet();
        hashSet.add(addSource);
        hashSet.add(addProcessorWithNewStore);
        hashSet.add(addSource2);
        hashSet.add(addProcessorWithNewStore2);
        hashSet.add(addSource3);
        hashSet.add(addProcessorWithExistingStore);
        this.expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, hashSet));
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
    }

    @Test
    public void shouldDescribeGlobalStoreTopology() {
        addGlobalStoreToTopologyAndExpectedDescription("globalStore", "source", "globalTopic", "processor", 0);
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
    }

    @Test
    public void shouldDescribeMultipleGlobalStoreTopology() {
        addGlobalStoreToTopologyAndExpectedDescription("globalStore1", "source1", "globalTopic1", "processor1", 0);
        addGlobalStoreToTopologyAndExpectedDescription("globalStore2", "source2", "globalTopic2", "processor2", 1);
        MatcherAssert.assertThat(this.topology.describe(), CoreMatchers.equalTo(this.expectedDescription));
    }

    private TopologyDescription.Source addSource(String str, String... strArr) {
        this.topology.addSource((Topology.AutoOffsetReset) null, str, (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, strArr);
        String str2 = strArr[0];
        for (int i = 1; i < strArr.length; i++) {
            str2 = str2 + ", " + strArr[i];
        }
        return new InternalTopologyBuilder.Source(str, str2);
    }

    private TopologyDescription.Source addSource(String str, Pattern pattern) {
        this.topology.addSource((Topology.AutoOffsetReset) null, str, (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, pattern);
        return new InternalTopologyBuilder.Source(str, pattern.toString());
    }

    private TopologyDescription.Processor addProcessor(String str, TopologyDescription.Node... nodeArr) {
        return addProcessorWithNewStore(str, new String[0], nodeArr);
    }

    private TopologyDescription.Processor addProcessorWithNewStore(String str, String[] strArr, TopologyDescription.Node... nodeArr) {
        return addProcessorWithStore(str, strArr, true, nodeArr);
    }

    private TopologyDescription.Processor addProcessorWithExistingStore(String str, String[] strArr, TopologyDescription.Node... nodeArr) {
        return addProcessorWithStore(str, strArr, false, nodeArr);
    }

    private TopologyDescription.Processor addProcessorWithStore(String str, String[] strArr, boolean z, TopologyDescription.Node... nodeArr) {
        String[] strArr2 = new String[nodeArr.length];
        for (int i = 0; i < nodeArr.length; i++) {
            strArr2[i] = nodeArr[i].name();
        }
        this.topology.addProcessor(str, new MockProcessorSupplier(), strArr2);
        if (z) {
            for (String str2 : strArr) {
                StoreBuilder storeBuilder = (StoreBuilder) EasyMock.createNiceMock(StoreBuilder.class);
                EasyMock.expect(storeBuilder.name()).andReturn(str2).anyTimes();
                EasyMock.replay(new Object[]{storeBuilder});
                this.topology.addStateStore(storeBuilder, new String[]{str});
            }
        } else {
            this.topology.connectProcessorAndStateStores(str, strArr);
        }
        InternalTopologyBuilder.AbstractNode processor = new InternalTopologyBuilder.Processor(str, new HashSet(Arrays.asList(strArr)));
        for (TopologyDescription.Node node : nodeArr) {
            ((InternalTopologyBuilder.AbstractNode) node).addSuccessor(processor);
            processor.addPredecessor(node);
        }
        return processor;
    }

    private TopologyDescription.Sink addSink(String str, String str2, TopologyDescription.Node... nodeArr) {
        String[] strArr = new String[nodeArr.length];
        for (int i = 0; i < nodeArr.length; i++) {
            strArr[i] = nodeArr[i].name();
        }
        this.topology.addSink(str, str2, (Serializer) null, (Serializer) null, (StreamPartitioner) null, strArr);
        InternalTopologyBuilder.AbstractNode sink = new InternalTopologyBuilder.Sink(str, str2);
        for (TopologyDescription.Node node : nodeArr) {
            ((InternalTopologyBuilder.AbstractNode) node).addSuccessor(sink);
            sink.addPredecessor(node);
        }
        return sink;
    }

    private void addGlobalStoreToTopologyAndExpectedDescription(String str, String str2, String str3, String str4, int i) {
        KeyValueStoreBuilder keyValueStoreBuilder = (KeyValueStoreBuilder) EasyMock.createNiceMock(KeyValueStoreBuilder.class);
        EasyMock.expect(keyValueStoreBuilder.name()).andReturn(str).anyTimes();
        EasyMock.replay(new Object[]{keyValueStoreBuilder});
        this.topology.addGlobalStore(keyValueStoreBuilder, str2, (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, str3, str4, new MockProcessorSupplier());
        this.expectedDescription.addGlobalStore(new InternalTopologyBuilder.GlobalStore(str2, str4, str, str3, i));
    }
}
