package org.apache.kafka.streams;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Set;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockPredicate;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/StreamsBuilderTest.class */
public class StreamsBuilderTest {
    private final StreamsBuilder builder = new StreamsBuilder();

    @Rule
    public final KStreamTestDriver driver = new KStreamTestDriver();

    @Test(expected = TopologyException.class)
    public void testFrom() {
        this.builder.stream(Arrays.asList("topic-1", "topic-2"));
        this.builder.build().addSource("KSTREAM-SOURCE-0000000000", new String[]{"topic-3"});
    }

    @Test
    public void shouldAllowJoinUnmaterializedFilteredKTable() {
        this.builder.stream("stream-topic").join(this.builder.table("table-topic").filter(MockPredicate.allGoodPredicate()), MockValueJoiner.TOSTRING_JOINER);
        this.driver.setUp(this.builder, TestUtils.tempDirectory());
        ProcessorTopology build = this.builder.internalTopologyBuilder.build();
        MatcherAssert.assertThat(Integer.valueOf(build.stateStores().size()), CoreMatchers.equalTo(1));
        MatcherAssert.assertThat(build.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), CoreMatchers.equalTo(Collections.singleton(((StateStore) build.stateStores().get(0)).name())));
        MatcherAssert.assertThat(Boolean.valueOf(build.processorConnectedStateStores("KTABLE-FILTER-0000000003").isEmpty()), CoreMatchers.is(true));
    }

    @Test
    public void shouldAllowJoinMaterializedFilteredKTable() {
        this.builder.stream("stream-topic").join(this.builder.table("table-topic").filter(MockPredicate.allGoodPredicate(), Materialized.as("store")), MockValueJoiner.TOSTRING_JOINER);
        this.driver.setUp(this.builder, TestUtils.tempDirectory());
        ProcessorTopology build = this.builder.internalTopologyBuilder.build();
        MatcherAssert.assertThat(Integer.valueOf(build.stateStores().size()), CoreMatchers.equalTo(2));
        MatcherAssert.assertThat(build.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), CoreMatchers.equalTo(Collections.singleton("store")));
        MatcherAssert.assertThat(build.processorConnectedStateStores("KTABLE-FILTER-0000000003"), CoreMatchers.equalTo(Collections.singleton("store")));
    }

    @Test
    public void shouldAllowJoinUnmaterializedMapValuedKTable() {
        this.builder.stream("stream-topic").join(this.builder.table("table-topic").mapValues(MockMapper.noOpValueMapper()), MockValueJoiner.TOSTRING_JOINER);
        this.driver.setUp(this.builder, TestUtils.tempDirectory());
        ProcessorTopology build = this.builder.internalTopologyBuilder.build();
        MatcherAssert.assertThat(Integer.valueOf(build.stateStores().size()), CoreMatchers.equalTo(1));
        MatcherAssert.assertThat(build.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), CoreMatchers.equalTo(Collections.singleton(((StateStore) build.stateStores().get(0)).name())));
        MatcherAssert.assertThat(Boolean.valueOf(build.processorConnectedStateStores("KTABLE-MAPVALUES-0000000003").isEmpty()), CoreMatchers.is(true));
    }

    @Test
    public void shouldAllowJoinMaterializedMapValuedKTable() {
        this.builder.stream("stream-topic").join(this.builder.table("table-topic").mapValues(MockMapper.noOpValueMapper(), Materialized.as("store")), MockValueJoiner.TOSTRING_JOINER);
        this.driver.setUp(this.builder, TestUtils.tempDirectory());
        ProcessorTopology build = this.builder.internalTopologyBuilder.build();
        MatcherAssert.assertThat(Integer.valueOf(build.stateStores().size()), CoreMatchers.equalTo(2));
        MatcherAssert.assertThat(build.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), CoreMatchers.equalTo(Collections.singleton("store")));
        MatcherAssert.assertThat(build.processorConnectedStateStores("KTABLE-MAPVALUES-0000000003"), CoreMatchers.equalTo(Collections.singleton("store")));
    }

    @Test
    public void shouldAllowJoinUnmaterializedJoinedKTable() {
        this.builder.stream("stream-topic").join(this.builder.table("table-topic1").join(this.builder.table("table-topic2"), MockValueJoiner.TOSTRING_JOINER), MockValueJoiner.TOSTRING_JOINER);
        this.driver.setUp(this.builder, TestUtils.tempDirectory());
        ProcessorTopology build = this.builder.internalTopologyBuilder.build();
        MatcherAssert.assertThat(Integer.valueOf(build.stateStores().size()), CoreMatchers.equalTo(2));
        MatcherAssert.assertThat(build.processorConnectedStateStores("KSTREAM-JOIN-0000000010"), CoreMatchers.equalTo(Utils.mkSet(new String[]{((StateStore) build.stateStores().get(0)).name(), ((StateStore) build.stateStores().get(1)).name()})));
        MatcherAssert.assertThat(Boolean.valueOf(build.processorConnectedStateStores("KTABLE-MERGE-0000000007").isEmpty()), CoreMatchers.is(true));
    }

    @Test
    public void shouldAllowJoinMaterializedJoinedKTable() {
        this.builder.stream("stream-topic").join(this.builder.table("table-topic1").join(this.builder.table("table-topic2"), MockValueJoiner.TOSTRING_JOINER, Materialized.as("store")), MockValueJoiner.TOSTRING_JOINER);
        this.driver.setUp(this.builder, TestUtils.tempDirectory());
        ProcessorTopology build = this.builder.internalTopologyBuilder.build();
        MatcherAssert.assertThat(Integer.valueOf(build.stateStores().size()), CoreMatchers.equalTo(3));
        MatcherAssert.assertThat(build.processorConnectedStateStores("KSTREAM-JOIN-0000000010"), CoreMatchers.equalTo(Collections.singleton("store")));
        MatcherAssert.assertThat(build.processorConnectedStateStores("KTABLE-MERGE-0000000007"), CoreMatchers.equalTo(Collections.singleton("store")));
    }

    @Test
    public void shouldAllowJoinMaterializedSourceKTable() {
        this.builder.stream("stream-topic").join(this.builder.table("table-topic"), MockValueJoiner.TOSTRING_JOINER);
        this.driver.setUp(this.builder, TestUtils.tempDirectory());
        ProcessorTopology build = this.builder.internalTopologyBuilder.build();
        MatcherAssert.assertThat(Integer.valueOf(build.stateStores().size()), CoreMatchers.equalTo(1));
        MatcherAssert.assertThat(build.processorConnectedStateStores("KTABLE-SOURCE-0000000002"), CoreMatchers.equalTo(Collections.singleton(((StateStore) build.stateStores().get(0)).name())));
        MatcherAssert.assertThat(build.processorConnectedStateStores("KSTREAM-JOIN-0000000004"), CoreMatchers.equalTo(Collections.singleton(((StateStore) build.stateStores().get(0)).name())));
    }

    @Test
    public void shouldProcessingFromSinkTopic() {
        KStream stream = this.builder.stream("topic-source");
        stream.to("topic-sink");
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        stream.process(mockProcessorSupplier, new String[0]);
        this.driver.setUp(this.builder);
        this.driver.setTime(0L);
        this.driver.process("topic-source", "A", "aa");
        Assert.assertEquals(Utils.mkList(new String[]{"A:aa"}), mockProcessorSupplier.processed);
    }

    @Test
    public void shouldProcessViaThroughTopic() {
        KStream stream = this.builder.stream("topic-source");
        KStream through = stream.through("topic-sink");
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        MockProcessorSupplier mockProcessorSupplier2 = new MockProcessorSupplier();
        stream.process(mockProcessorSupplier, new String[0]);
        through.process(mockProcessorSupplier2, new String[0]);
        this.driver.setUp(this.builder);
        this.driver.setTime(0L);
        this.driver.process("topic-source", "A", "aa");
        Assert.assertEquals(Utils.mkList(new String[]{"A:aa"}), mockProcessorSupplier.processed);
        Assert.assertEquals(Utils.mkList(new String[]{"A:aa"}), mockProcessorSupplier2.processed);
    }

    @Test
    public void testMerge() {
        KStream merge = this.builder.stream("topic-1").merge(this.builder.stream("topic-2"));
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        merge.process(mockProcessorSupplier, new String[0]);
        this.driver.setUp(this.builder);
        this.driver.setTime(0L);
        this.driver.process("topic-1", "A", "aa");
        this.driver.process("topic-2", "B", "bb");
        this.driver.process("topic-2", "C", "cc");
        this.driver.process("topic-1", "D", "dd");
        Assert.assertEquals(Utils.mkList(new String[]{"A:aa", "B:bb", "C:cc", "D:dd"}), mockProcessorSupplier.processed);
    }

    @Test
    public void shouldUseSerdesDefinedInMaterializedToConsumeTable() {
        final HashMap hashMap = new HashMap();
        this.builder.table("topic", Materialized.as("store").withKeySerde(Serdes.Long()).withValueSerde(Serdes.String())).toStream().foreach(new ForeachAction<Long, String>() { // from class: org.apache.kafka.streams.StreamsBuilderTest.1
            public void apply(Long l, String str) {
                hashMap.put(l, str);
            }
        });
        this.driver.setUp(this.builder, TestUtils.tempDirectory());
        this.driver.setTime(0L);
        this.driver.process("topic", 1L, "value1");
        this.driver.process("topic", 2L, "value2");
        this.driver.flushState();
        KeyValueStore keyValueStore = this.driver.allStateStores().get("store");
        MatcherAssert.assertThat(keyValueStore.get(1L), CoreMatchers.equalTo("value1"));
        MatcherAssert.assertThat(keyValueStore.get(2L), CoreMatchers.equalTo("value2"));
        MatcherAssert.assertThat(hashMap.get(1L), CoreMatchers.equalTo("value1"));
        MatcherAssert.assertThat(hashMap.get(2L), CoreMatchers.equalTo("value2"));
    }

    @Test
    public void shouldUseSerdesDefinedInMaterializedToConsumeGlobalTable() {
        this.builder.globalTable("topic", Materialized.as("store").withKeySerde(Serdes.Long()).withValueSerde(Serdes.String()));
        this.driver.setUp(this.builder, TestUtils.tempDirectory());
        this.driver.setTime(0L);
        this.driver.process("topic", 1L, "value1");
        this.driver.process("topic", 2L, "value2");
        this.driver.flushState();
        KeyValueStore keyValueStore = this.driver.allStateStores().get("store");
        MatcherAssert.assertThat(keyValueStore.get(1L), CoreMatchers.equalTo("value1"));
        MatcherAssert.assertThat(keyValueStore.get(2L), CoreMatchers.equalTo("value2"));
    }

    @Test
    public void shouldUseDefaultNodeAndStoreNames() {
        this.builder.table("topic", Materialized.with(Serdes.Long(), Serdes.String()));
        Iterator it = this.builder.build().describe().subtopologies().iterator();
        Iterator it2 = ((TopologyDescription.Subtopology) it.next()).nodes().iterator();
        MatcherAssert.assertThat(((TopologyDescription.Node) it2.next()).name(), CoreMatchers.equalTo("KSTREAM-SOURCE-0000000001"));
        TopologyDescription.Processor processor = (TopologyDescription.Node) it2.next();
        MatcherAssert.assertThat(processor.name(), CoreMatchers.equalTo("KTABLE-SOURCE-0000000002"));
        Iterator it3 = processor.stores().iterator();
        MatcherAssert.assertThat(it3.next(), CoreMatchers.equalTo("topic-STATE-STORE-0000000000"));
        Assert.assertFalse(it2.hasNext());
        Assert.assertFalse(it3.hasNext());
        Assert.assertFalse(it.hasNext());
    }

    @Test(expected = TopologyException.class)
    public void shouldThrowExceptionWhenNoTopicPresent() throws Exception {
        this.builder.stream(Collections.emptyList());
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowExceptionWhenTopicNamesAreNull() throws Exception {
        this.builder.stream(Arrays.asList(null, null));
    }

    public static InternalTopologyBuilder internalTopologyBuilder(StreamsBuilder streamsBuilder) {
        return streamsBuilder.internalTopologyBuilder;
    }

    public static Collection<Set<String>> getCopartitionedGroups(StreamsBuilder streamsBuilder) {
        return streamsBuilder.internalTopologyBuilder.copartitionGroups();
    }
}
