package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.AutoOffsetReset;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.internals.AutoOffsetResetInternal;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignTableJoinProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionSendProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.graph.ForeignJoinSubscriptionSendNode;
import org.apache.kafka.streams.kstream.internals.graph.ForeignTableJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.kstream.internals.graph.KTableKTableJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamStreamJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.TableFilterNode;
import org.apache.kafka.streams.kstream.internals.graph.TableRepartitionMapNode;
import org.apache.kafka.streams.kstream.internals.graph.WindowedStreamProcessorNode;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsInstanceOf;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.class */
public class InternalStreamsBuilderTest {
    private static final String APP_ID = "app-id";
    private final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder());
    private final ConsumedInternal<String, String> consumed = new ConsumedInternal<>(Consumed.with((Serde) null, (Serde) null));
    private final String storePrefix = "prefix-";
    private final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(Materialized.as("test-store"), this.builder, "prefix-");
    private final Properties props = StreamsTestUtils.getStreamsConfig();

    @Test
    public void testNewName() {
        Assertions.assertEquals("X-0000000000", this.builder.newProcessorName("X-"));
        Assertions.assertEquals("Y-0000000001", this.builder.newProcessorName("Y-"));
        Assertions.assertEquals("Z-0000000002", this.builder.newProcessorName("Z-"));
        InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder());
        Assertions.assertEquals("X-0000000000", internalStreamsBuilder.newProcessorName("X-"));
        Assertions.assertEquals("Y-0000000001", internalStreamsBuilder.newProcessorName("Y-"));
        Assertions.assertEquals("Z-0000000002", internalStreamsBuilder.newProcessorName("Z-"));
    }

    @Test
    public void testNewStoreName() {
        Assertions.assertEquals("X-STATE-STORE-0000000000", this.builder.newStoreName("X-"));
        Assertions.assertEquals("Y-STATE-STORE-0000000001", this.builder.newStoreName("Y-"));
        Assertions.assertEquals("Z-STATE-STORE-0000000002", this.builder.newStoreName("Z-"));
        InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder());
        Assertions.assertEquals("X-STATE-STORE-0000000000", internalStreamsBuilder.newStoreName("X-"));
        Assertions.assertEquals("Y-STATE-STORE-0000000001", internalStreamsBuilder.newStoreName("Y-"));
        Assertions.assertEquals("Z-STATE-STORE-0000000002", internalStreamsBuilder.newStoreName("Z-"));
    }

    @Test
    public void shouldHaveCorrectSourceTopicsForTableFromMergedStream() {
        this.builder.stream(Collections.singleton("topic-1"), this.consumed).mapValues(str -> {
            return str;
        }).filter((str2, str3) -> {
            return true;
        }).merge(this.builder.stream(Collections.singleton("topic-2"), this.consumed).filter((str4, str5) -> {
            return true;
        })).merge(this.builder.stream(Collections.singleton("topic-3"), this.consumed)).groupByKey().count(Materialized.as("my-table"));
        this.builder.buildAndOptimizeTopology();
        Assertions.assertEquals(Arrays.asList("topic-1", "topic-2", "topic-3"), this.builder.internalTopologyBuilder.stateStoreNameToFullSourceTopicNames().get("my-table"));
    }

    @Test
    public void shouldNotMaterializeSourceKTableIfNotRequired() {
        KTable table = this.builder.table(AssignmentTestUtils.TP_2_NAME, this.consumed, new MaterializedInternal(Materialized.with((Serde) null, (Serde) null), this.builder, "prefix-"));
        this.builder.buildAndOptimizeTopology();
        ProcessorTopology buildTopology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID))).buildTopology();
        Assertions.assertEquals(0, buildTopology.stateStores().size());
        Assertions.assertEquals(0, buildTopology.storeToChangelogTopic().size());
        Assertions.assertNull(table.queryableStoreName());
    }

    @Test
    public void shouldBuildGlobalTableWithNonQueryableStoreName() {
        Assertions.assertNull(this.builder.globalTable(AssignmentTestUtils.TP_2_NAME, this.consumed, new MaterializedInternal(Materialized.with((Serde) null, (Serde) null), this.builder, "prefix-")).queryableStoreName());
    }

    @Test
    public void shouldBuildGlobalTableWithQueryaIbleStoreName() {
        Assertions.assertEquals("globalTable", this.builder.globalTable(AssignmentTestUtils.TP_2_NAME, this.consumed, new MaterializedInternal(Materialized.as("globalTable"), this.builder, "prefix-")).queryableStoreName());
    }

    @Test
    public void shouldBuildSimpleGlobalTableTopology() {
        this.builder.globalTable("table", this.consumed, new MaterializedInternal(Materialized.as("globalTable"), this.builder, "prefix-"));
        this.builder.buildAndOptimizeTopology();
        List globalStateStores = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID))).buildGlobalStateTopology().globalStateStores();
        Assertions.assertEquals(1, globalStateStores.size());
        Assertions.assertEquals("globalTable", ((StateStore) globalStateStores.get(0)).name());
    }

    @Test
    public void shouldThrowOnVersionedStoreSupplierForGlobalTable() {
        MaterializedInternal materializedInternal = new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("store", Duration.ZERO)), this.builder, "prefix-");
        Assertions.assertThrows(TopologyException.class, () -> {
            this.builder.globalTable("table", this.consumed, materializedInternal);
        });
    }

    private void doBuildGlobalTopologyWithAllGlobalTables() {
        ProcessorTopology buildGlobalStateTopology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID))).buildGlobalStateTopology();
        List globalStateStores = buildGlobalStateTopology.globalStateStores();
        Assertions.assertEquals(Set.of("table", "table2"), buildGlobalStateTopology.sourceTopics());
        Assertions.assertEquals(2, globalStateStores.size());
    }

    @Test
    public void shouldBuildGlobalTopologyWithAllGlobalTables() {
        this.builder.globalTable("table", this.consumed, new MaterializedInternal(Materialized.as("global1"), this.builder, "prefix-"));
        this.builder.globalTable("table2", this.consumed, new MaterializedInternal(Materialized.as("global2"), this.builder, "prefix-"));
        this.builder.buildAndOptimizeTopology();
        doBuildGlobalTopologyWithAllGlobalTables();
    }

    @Test
    public void shouldAddGlobalTablesToEachGroup() {
        GlobalKTable globalTable = this.builder.globalTable("table", this.consumed, new MaterializedInternal(Materialized.as("globalTable"), this.builder, "prefix-"));
        GlobalKTable globalTable2 = this.builder.globalTable("table2", this.consumed, new MaterializedInternal(Materialized.as("globalTable2"), this.builder, "prefix-"));
        this.builder.table("not-global", this.consumed, new MaterializedInternal(Materialized.as("not-global"), this.builder, "prefix-"));
        KeyValueMapper keyValueMapper = (str, str2) -> {
            return str2;
        };
        this.builder.stream(Collections.singleton("t1"), this.consumed).leftJoin(globalTable, keyValueMapper, MockValueJoiner.TOSTRING_JOINER);
        this.builder.stream(Collections.singleton("t2"), this.consumed).leftJoin(globalTable2, keyValueMapper, MockValueJoiner.TOSTRING_JOINER);
        Iterator it = this.builder.internalTopologyBuilder.nodeGroups().keySet().iterator();
        while (it.hasNext()) {
            List globalStateStores = this.builder.internalTopologyBuilder.buildSubtopology(((Integer) it.next()).intValue()).globalStateStores();
            HashSet hashSet = new HashSet();
            Iterator it2 = globalStateStores.iterator();
            while (it2.hasNext()) {
                hashSet.add(((StateStore) it2.next()).name());
            }
            Assertions.assertEquals(2, globalStateStores.size());
            Assertions.assertTrue(hashSet.contains("globalTable"));
            Assertions.assertTrue(hashSet.contains("globalTable2"));
        }
    }

    @Test
    public void shouldMapStateStoresToCorrectSourceTopics() {
        KStream stream = this.builder.stream(Collections.singleton("events"), this.consumed);
        stream.map(MockMapper.selectValueKeyValueMapper()).leftJoin(this.builder.table("table-topic", this.consumed, new MaterializedInternal(Materialized.as("table-store"), this.builder, "prefix-")), MockValueJoiner.TOSTRING_JOINER).groupByKey().count(Materialized.as("count"));
        this.builder.buildAndOptimizeTopology();
        this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID)));
        Assertions.assertEquals(Collections.singletonList("table-topic"), this.builder.internalTopologyBuilder.sourceTopicsForStore("table-store"));
        Assertions.assertEquals(Collections.singletonList("app-id-KSTREAM-MAP-0000000003-repartition"), this.builder.internalTopologyBuilder.sourceTopicsForStore("count"));
    }

    @Test
    public void shouldAddTopicToNoneAutoOffsetResetList() {
        this.builder.stream(Collections.singleton("topic-1"), new ConsumedInternal(Consumed.with(AutoOffsetReset.none())));
        this.builder.buildAndOptimizeTopology();
        MatcherAssert.assertThat(this.builder.internalTopologyBuilder.offsetResetStrategy("topic-1"), CoreMatchers.equalTo(AutoOffsetResetStrategy.NONE));
    }

    @Test
    public void shouldAddTopicToEarliestAutoOffsetResetList() {
        this.builder.stream(Collections.singleton("topic-1"), new ConsumedInternal(Consumed.with(AutoOffsetReset.earliest())));
        this.builder.buildAndOptimizeTopology();
        MatcherAssert.assertThat(this.builder.internalTopologyBuilder.offsetResetStrategy("topic-1"), CoreMatchers.equalTo(AutoOffsetResetStrategy.EARLIEST));
    }

    @Test
    public void shouldAddTopicToLatestAutoOffsetResetList() {
        this.builder.stream(Collections.singleton("topic-1"), new ConsumedInternal(Consumed.with(AutoOffsetReset.latest())));
        this.builder.buildAndOptimizeTopology();
        MatcherAssert.assertThat(this.builder.internalTopologyBuilder.offsetResetStrategy("topic-1"), CoreMatchers.equalTo(AutoOffsetResetStrategy.LATEST));
    }

    @Test
    public void shouldAddTopicToDurationAutoOffsetResetList() {
        this.builder.stream(Collections.singleton("topic-1"), new ConsumedInternal(Consumed.with(new AutoOffsetResetInternal(AutoOffsetReset.byDuration(Duration.ofSeconds(42L))))));
        this.builder.buildAndOptimizeTopology();
        MatcherAssert.assertThat(this.builder.internalTopologyBuilder.offsetResetStrategy("topic-1").type(), CoreMatchers.equalTo(AutoOffsetResetStrategy.StrategyType.BY_DURATION));
        MatcherAssert.assertThat(Long.valueOf(((Duration) this.builder.internalTopologyBuilder.offsetResetStrategy("topic-1").duration().get()).toSeconds()), CoreMatchers.equalTo(42L));
    }

    @Test
    public void shouldAddTableToNoneAutoOffsetResetList() {
        this.builder.table("topic-1", new ConsumedInternal(Consumed.with(AutoOffsetReset.none())), this.materialized);
        this.builder.buildAndOptimizeTopology();
        MatcherAssert.assertThat(this.builder.internalTopologyBuilder.offsetResetStrategy("topic-1"), CoreMatchers.equalTo(AutoOffsetResetStrategy.NONE));
    }

    @Test
    public void shouldAddTableToEarliestAutoOffsetResetList() {
        this.builder.table("topic-1", new ConsumedInternal(Consumed.with(AutoOffsetReset.earliest())), this.materialized);
        this.builder.buildAndOptimizeTopology();
        MatcherAssert.assertThat(this.builder.internalTopologyBuilder.offsetResetStrategy("topic-1"), CoreMatchers.equalTo(AutoOffsetResetStrategy.EARLIEST));
    }

    @Test
    public void shouldAddTableToLatestAutoOffsetResetList() {
        this.builder.table("topic-1", new ConsumedInternal(Consumed.with(AutoOffsetReset.latest())), this.materialized);
        this.builder.buildAndOptimizeTopology();
        MatcherAssert.assertThat(this.builder.internalTopologyBuilder.offsetResetStrategy("topic-1"), CoreMatchers.equalTo(AutoOffsetResetStrategy.LATEST));
    }

    @Test
    public void shouldAddTableToDurationAutoOffsetResetList() {
        this.builder.table("topic-1", new ConsumedInternal(Consumed.with(AutoOffsetResetInternal.byDuration(Duration.ofSeconds(42L)))), this.materialized);
        this.builder.buildAndOptimizeTopology();
        MatcherAssert.assertThat(this.builder.internalTopologyBuilder.offsetResetStrategy("topic-1").type(), CoreMatchers.equalTo(AutoOffsetResetStrategy.StrategyType.BY_DURATION));
        MatcherAssert.assertThat(Long.valueOf(((Duration) this.builder.internalTopologyBuilder.offsetResetStrategy("topic-1").duration().get()).toSeconds()), CoreMatchers.equalTo(42L));
    }

    @Test
    public void shouldNotAddTableToOffsetResetLists() {
        this.builder.table("topic-1", this.consumed, this.materialized);
        this.builder.buildAndOptimizeTopology();
        MatcherAssert.assertThat(this.builder.internalTopologyBuilder.offsetResetStrategy("topic-1"), CoreMatchers.equalTo((Object) null));
    }

    @Test
    public void shouldNotAddRegexTopicsToOffsetResetLists() {
        this.builder.stream(Pattern.compile("topic-\\d"), this.consumed);
        this.builder.buildAndOptimizeTopology();
        MatcherAssert.assertThat(this.builder.internalTopologyBuilder.offsetResetStrategy("topic-5"), CoreMatchers.equalTo((Object) null));
    }

    @Test
    public void shouldAddRegexTopicToEarliestAutoOffsetResetList() {
        this.builder.stream(Pattern.compile("topic-\\d+"), new ConsumedInternal(Consumed.with(AutoOffsetReset.earliest())));
        this.builder.buildAndOptimizeTopology();
        MatcherAssert.assertThat(this.builder.internalTopologyBuilder.offsetResetStrategy("topic-500000"), CoreMatchers.equalTo(AutoOffsetResetStrategy.EARLIEST));
    }

    @Test
    public void shouldAddRegexTopicToLatestAutoOffsetResetList() {
        this.builder.stream(Pattern.compile("topic-\\d+"), new ConsumedInternal(Consumed.with(AutoOffsetReset.latest())));
        this.builder.buildAndOptimizeTopology();
        MatcherAssert.assertThat(this.builder.internalTopologyBuilder.offsetResetStrategy("topic-1000000"), CoreMatchers.equalTo(AutoOffsetResetStrategy.LATEST));
    }

    @Test
    public void shouldAddRegexTopicToDurationAutoOffsetResetList() {
        this.builder.stream(Pattern.compile("topic-\\d+"), new ConsumedInternal(Consumed.with(AutoOffsetResetInternal.byDuration(Duration.ofSeconds(42L)))));
        this.builder.buildAndOptimizeTopology();
        MatcherAssert.assertThat(this.builder.internalTopologyBuilder.offsetResetStrategy("topic-1000000").type(), CoreMatchers.equalTo(AutoOffsetResetStrategy.StrategyType.BY_DURATION));
        MatcherAssert.assertThat(Long.valueOf(((Duration) this.builder.internalTopologyBuilder.offsetResetStrategy("topic-1000000").duration().get()).toSeconds()), CoreMatchers.equalTo(42L));
    }

    @Test
    public void shouldHaveNullTimestampExtractorWhenNoneSupplied() {
        this.builder.stream(Collections.singleton(AssignmentTestUtils.TOPIC_PREFIX), this.consumed);
        this.builder.buildAndOptimizeTopology();
        this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID)));
        Assertions.assertNull(this.builder.internalTopologyBuilder.buildTopology().source(AssignmentTestUtils.TOPIC_PREFIX).timestampExtractor());
    }

    @Test
    public void shouldUseProvidedTimestampExtractor() {
        this.builder.stream(Collections.singleton(AssignmentTestUtils.TOPIC_PREFIX), new ConsumedInternal(Consumed.with(new MockTimestampExtractor())));
        this.builder.buildAndOptimizeTopology();
        MatcherAssert.assertThat(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID))).buildTopology().source(AssignmentTestUtils.TOPIC_PREFIX).timestampExtractor(), IsInstanceOf.instanceOf(MockTimestampExtractor.class));
    }

    @Test
    public void ktableShouldHaveNullTimestampExtractorWhenNoneSupplied() {
        this.builder.table(AssignmentTestUtils.TOPIC_PREFIX, this.consumed, this.materialized);
        this.builder.buildAndOptimizeTopology();
        Assertions.assertNull(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID))).buildTopology().source(AssignmentTestUtils.TOPIC_PREFIX).timestampExtractor());
    }

    @Test
    public void ktableShouldUseProvidedTimestampExtractor() {
        this.builder.table(AssignmentTestUtils.TOPIC_PREFIX, new ConsumedInternal(Consumed.with(new MockTimestampExtractor())), this.materialized);
        this.builder.buildAndOptimizeTopology();
        MatcherAssert.assertThat(this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID))).buildTopology().source(AssignmentTestUtils.TOPIC_PREFIX).timestampExtractor(), IsInstanceOf.instanceOf(MockTimestampExtractor.class));
    }

    @Test
    public void shouldMarkStreamStreamJoinAsSelfJoinSingleStream() {
        this.props.put("topology.optimization", "all");
        KStream stream = this.builder.stream(Collections.singleton("t1"), this.consumed);
        stream.join(stream, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L)));
        this.builder.buildAndOptimizeTopology(this.props);
        StreamStreamJoinNode nodeByType = getNodeByType(this.builder.root, StreamStreamJoinNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType);
        Assertions.assertTrue(nodeByType.getSelfJoin());
        AtomicInteger atomicInteger = new AtomicInteger();
        countJoinWindowNodes(atomicInteger, this.builder.root, new HashSet());
        Assertions.assertEquals(atomicInteger.get(), 1);
    }

    @Test
    public void shouldMarkStreamStreamJoinAsSelfJoinTwoStreams() {
        this.props.put("topology.optimization", "all");
        this.builder.stream(Collections.singleton("t1"), this.consumed).join(this.builder.stream(Collections.singleton("t1"), this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L)));
        this.builder.buildAndOptimizeTopology(this.props);
        StreamStreamJoinNode nodeByType = getNodeByType(this.builder.root, StreamStreamJoinNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType);
        Assertions.assertTrue(nodeByType.getSelfJoin());
        AtomicInteger atomicInteger = new AtomicInteger();
        countJoinWindowNodes(atomicInteger, this.builder.root, new HashSet());
        Assertions.assertEquals(atomicInteger.get(), 1);
    }

    @Test
    public void shouldMarkStreamStreamJoinAsSelfJoinMergeTwoStreams() {
        this.props.put("topology.optimization", "all");
        KStream merge = this.builder.stream(Collections.singleton("t1"), this.consumed).merge(this.builder.stream(Collections.singleton("t2"), this.consumed));
        merge.join(merge, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L)));
        this.builder.buildAndOptimizeTopology(this.props);
        StreamStreamJoinNode nodeByType = getNodeByType(this.builder.root, StreamStreamJoinNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType);
        Assertions.assertTrue(nodeByType.getSelfJoin());
        AtomicInteger atomicInteger = new AtomicInteger();
        countJoinWindowNodes(atomicInteger, this.builder.root, new HashSet());
        Assertions.assertEquals(atomicInteger.get(), 1);
    }

    @Test
    public void shouldMarkFirstStreamStreamJoinAsSelfJoin3WayJoin() {
        this.props.put("topology.optimization", "all");
        KStream stream = this.builder.stream(Collections.singleton("t1"), this.consumed);
        KStream stream2 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        stream.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L))).join(this.builder.stream(Collections.singleton("t3"), this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L)));
        this.builder.buildAndOptimizeTopology(this.props);
        ArrayList arrayList = new ArrayList();
        getNodesByType(this.builder.root, StreamStreamJoinNode.class, new HashSet(), arrayList);
        Assertions.assertEquals(arrayList.size(), 2);
        Assertions.assertTrue(((StreamStreamJoinNode) arrayList.get(0)).getSelfJoin());
        Assertions.assertFalse(((StreamStreamJoinNode) arrayList.get(1)).getSelfJoin());
        AtomicInteger atomicInteger = new AtomicInteger();
        countJoinWindowNodes(atomicInteger, this.builder.root, new HashSet());
        Assertions.assertEquals(atomicInteger.get(), 3);
    }

    @Test
    public void shouldMarkAllStreamStreamJoinsAsSelfJoin() {
        this.props.put("topology.optimization", "all");
        this.builder.stream(Collections.singleton("t1"), this.consumed).join(this.builder.stream(Collections.singleton("t1"), this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L))).merge(this.builder.stream(Collections.singleton("t2"), this.consumed).join(this.builder.stream(Collections.singleton("t2"), this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L))));
        this.builder.buildAndOptimizeTopology(this.props);
        ArrayList arrayList = new ArrayList();
        getNodesByType(this.builder.root, StreamStreamJoinNode.class, new HashSet(), arrayList);
        Assertions.assertEquals(arrayList.size(), 2);
        Assertions.assertTrue(((StreamStreamJoinNode) arrayList.get(0)).getSelfJoin());
        Assertions.assertTrue(((StreamStreamJoinNode) arrayList.get(1)).getSelfJoin());
        AtomicInteger atomicInteger = new AtomicInteger();
        countJoinWindowNodes(atomicInteger, this.builder.root, new HashSet());
        Assertions.assertEquals(atomicInteger.get(), 2);
    }

    @Test
    public void shouldMarkFirstStreamStreamJoinAsSelfJoinNwaySameSource() {
        this.props.put("topology.optimization", "all");
        KStream stream = this.builder.stream(Collections.singleton("t1"), this.consumed);
        KStream stream2 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        stream.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L))).join(this.builder.stream(Collections.singleton("t1"), this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L)));
        this.builder.buildAndOptimizeTopology(this.props);
        ArrayList arrayList = new ArrayList();
        getNodesByType(this.builder.root, StreamStreamJoinNode.class, new HashSet(), arrayList);
        Assertions.assertEquals(arrayList.size(), 2);
        Assertions.assertTrue(((StreamStreamJoinNode) arrayList.get(0)).getSelfJoin());
        Assertions.assertFalse(((StreamStreamJoinNode) arrayList.get(1)).getSelfJoin());
        AtomicInteger atomicInteger = new AtomicInteger();
        countJoinWindowNodes(atomicInteger, this.builder.root, new HashSet());
        Assertions.assertEquals(atomicInteger.get(), 3);
    }

    @Test
    public void shouldMarkFirstStreamStreamJoinAsSelfJoinNway() {
        this.props.put("topology.optimization", "all");
        KStream stream = this.builder.stream(Collections.singleton("t1"), this.consumed);
        KStream stream2 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        stream.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L))).join(this.builder.stream(Collections.singleton("t2"), this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L)));
        this.builder.buildAndOptimizeTopology(this.props);
        ArrayList arrayList = new ArrayList();
        getNodesByType(this.builder.root, StreamStreamJoinNode.class, new HashSet(), arrayList);
        Assertions.assertEquals(arrayList.size(), 2);
        Assertions.assertTrue(((StreamStreamJoinNode) arrayList.get(0)).getSelfJoin());
        Assertions.assertFalse(((StreamStreamJoinNode) arrayList.get(1)).getSelfJoin());
        AtomicInteger atomicInteger = new AtomicInteger();
        countJoinWindowNodes(atomicInteger, this.builder.root, new HashSet());
        Assertions.assertEquals(atomicInteger.get(), 3);
    }

    @Test
    public void shouldMarkStreamStreamJoinAsSelfJoinTwoStreamsWithNoOpFilter() {
        this.props.put("topology.optimization", "all");
        KStream stream = this.builder.stream(Collections.singleton("t1"), this.consumed);
        KStream stream2 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        stream.filter((str, str2) -> {
            return str2 != null;
        });
        stream.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L)));
        this.builder.buildAndOptimizeTopology(this.props);
        StreamStreamJoinNode nodeByType = getNodeByType(this.builder.root, StreamStreamJoinNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType);
        Assertions.assertTrue(nodeByType.getSelfJoin());
        AtomicInteger atomicInteger = new AtomicInteger();
        countJoinWindowNodes(atomicInteger, this.builder.root, new HashSet());
        Assertions.assertEquals(atomicInteger.get(), 1);
    }

    @Test
    public void shouldMarkStreamStreamJoinAsSelfJoinTwoJoinsSameSource() {
        this.props.put("topology.optimization", "all");
        KStream stream = this.builder.stream(Collections.singleton("t1"), this.consumed);
        KStream stream2 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        KStream stream3 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        KStream stream4 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        stream.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L)));
        stream3.join(stream4, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L)));
        this.builder.buildAndOptimizeTopology(this.props);
        ArrayList arrayList = new ArrayList();
        getNodesByType(this.builder.root, StreamStreamJoinNode.class, new HashSet(), arrayList);
        Assertions.assertEquals(arrayList.size(), 2);
        Assertions.assertTrue(((StreamStreamJoinNode) arrayList.get(0)).getSelfJoin());
        Assertions.assertTrue(((StreamStreamJoinNode) arrayList.get(1)).getSelfJoin());
        AtomicInteger atomicInteger = new AtomicInteger();
        countJoinWindowNodes(atomicInteger, this.builder.root, new HashSet());
        Assertions.assertEquals(atomicInteger.get(), 2);
    }

    @Test
    public void shouldNotMarkStreamStreamJoinAsSelfJoinTwoStreamsWithFilter() {
        this.props.put("topology.optimization", "all");
        KStream stream = this.builder.stream(Collections.singleton("t1"), this.consumed);
        stream.filter((str, str2) -> {
            return str2 != null;
        }).join(this.builder.stream(Collections.singleton("t1"), this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L)));
        this.builder.buildAndOptimizeTopology(this.props);
        StreamStreamJoinNode nodeByType = getNodeByType(this.builder.root, StreamStreamJoinNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType);
        Assertions.assertFalse(nodeByType.getSelfJoin());
        AtomicInteger atomicInteger = new AtomicInteger();
        countJoinWindowNodes(atomicInteger, this.builder.root, new HashSet());
        Assertions.assertEquals(atomicInteger.get(), 2);
    }

    @Test
    public void shouldNotMarkStreamStreamJoinAsSelfJoinOneStreamWithMap() {
        this.props.put("topology.optimization", "all");
        this.builder.stream(Collections.singleton("t1"), this.consumed).mapValues(str -> {
            return str;
        }).join(this.builder.stream(Collections.singleton("t1"), this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L)));
        this.builder.buildAndOptimizeTopology(this.props);
        StreamStreamJoinNode nodeByType = getNodeByType(this.builder.root, StreamStreamJoinNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType);
        Assertions.assertFalse(nodeByType.getSelfJoin());
        AtomicInteger atomicInteger = new AtomicInteger();
        countJoinWindowNodes(atomicInteger, this.builder.root, new HashSet());
        Assertions.assertEquals(atomicInteger.get(), 2);
    }

    @Test
    public void shouldNotMarkStreamStreamJoinAsSelfJoinMultipleSources() {
        this.props.put("topology.optimization", "all");
        this.builder.stream(Collections.singleton("t1"), this.consumed).join(this.builder.stream(Collections.singleton("t2"), this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L)));
        this.builder.buildAndOptimizeTopology(this.props);
        StreamStreamJoinNode nodeByType = getNodeByType(this.builder.root, StreamStreamJoinNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType);
        Assertions.assertFalse(nodeByType.getSelfJoin());
        AtomicInteger atomicInteger = new AtomicInteger();
        countJoinWindowNodes(atomicInteger, this.builder.root, new HashSet());
        Assertions.assertEquals(atomicInteger.get(), 2);
    }

    @Test
    public void shouldOptimizeJoinWhenInConfig() {
        this.props.put("topology.optimization", "single.store.self.join");
        KStream stream = this.builder.stream(Collections.singleton("t1"), this.consumed);
        stream.join(stream, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L)));
        this.builder.buildAndOptimizeTopology(this.props);
        StreamStreamJoinNode nodeByType = getNodeByType(this.builder.root, StreamStreamJoinNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType);
        Assertions.assertTrue(nodeByType.getSelfJoin());
        AtomicInteger atomicInteger = new AtomicInteger();
        countJoinWindowNodes(atomicInteger, this.builder.root, new HashSet());
        Assertions.assertEquals(atomicInteger.get(), 1);
    }

    @Test
    public void shouldNotOptimizeJoinWhenNotInConfig() {
        this.props.put("topology.optimization", String.join(",", "reuse.ktable.source.topics", "merge.repartition.topics"));
        KStream stream = this.builder.stream(Collections.singleton("t1"), this.consumed);
        stream.join(stream, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L)));
        this.builder.buildAndOptimizeTopology(this.props);
        StreamStreamJoinNode nodeByType = getNodeByType(this.builder.root, StreamStreamJoinNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType);
        Assertions.assertFalse(nodeByType.getSelfJoin());
        AtomicInteger atomicInteger = new AtomicInteger();
        countJoinWindowNodes(atomicInteger, this.builder.root, new HashSet());
        Assertions.assertEquals(atomicInteger.get(), 2);
    }

    @Test
    public void shouldSetUseVersionedSemanticsOnTableFilter() {
        this.builder.table("t1", this.consumed, new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("store", Duration.ofMinutes(5L))), this.builder, "prefix-")).filter((str, str2) -> {
            return str2 != null;
        });
        this.builder.buildAndOptimizeTopology();
        GraphNode nodeByType = getNodeByType(this.builder.root, TableFilterNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType);
        verifyVersionedSemantics((TableFilterNode<?, ?>) nodeByType, true);
    }

    @Test
    public void shouldSetUseVersionedSemanticsWithIntermediateNode() {
        this.builder.table("t1", this.consumed, new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5L))), this.builder, "prefix-")).mapValues(str -> {
            if (str != null) {
                return str + str;
            }
            return null;
        }).filter((str2, str3) -> {
            return str3 != null;
        });
        this.builder.buildAndOptimizeTopology();
        GraphNode nodeByType = getNodeByType(this.builder.root, TableFilterNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType);
        verifyVersionedSemantics((TableFilterNode<?, ?>) nodeByType, true);
    }

    @Test
    public void shouldNotSetUseVersionedSemanticsWithMaterializedIntermediateNode() {
        MaterializedInternal materializedInternal = new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5L))), this.builder, "prefix-");
        this.builder.table("t1", this.consumed, materializedInternal).mapValues(str -> {
            if (str != null) {
                return str + str;
            }
            return null;
        }, new MaterializedInternal(Materialized.as("unversioned"), this.builder, "prefix-")).filter((str2, str3) -> {
            return str3 != null;
        });
        this.builder.buildAndOptimizeTopology();
        GraphNode nodeByType = getNodeByType(this.builder.root, TableFilterNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType);
        verifyVersionedSemantics((TableFilterNode<?, ?>) nodeByType, false);
    }

    @Test
    public void shouldSetUseVersionedSemanticsWithIntermediateNodeMaterializedAsVersioned() {
        MaterializedInternal materializedInternal = new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5L))), this.builder, "prefix-");
        this.builder.table("t1", this.consumed, materializedInternal).mapValues(str -> {
            if (str != null) {
                return str + str;
            }
            return null;
        }, new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5L))), this.builder, "prefix-")).filter((str2, str3) -> {
            return str3 != null;
        });
        this.builder.buildAndOptimizeTopology();
        GraphNode nodeByType = getNodeByType(this.builder.root, TableFilterNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType);
        verifyVersionedSemantics((TableFilterNode<?, ?>) nodeByType, true);
    }

    @Test
    public void shouldNotSetUseVersionedSemanticsWithIntermediateAggregation() {
        this.builder.table("t1", this.consumed, new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5L))), this.builder, "prefix-")).groupBy((v1, v2) -> {
            return new KeyValue(v1, v2);
        }).count().filter((str, l) -> {
            return l != null;
        });
        this.builder.buildAndOptimizeTopology();
        GraphNode nodeByType = getNodeByType(this.builder.root, TableFilterNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType);
        verifyVersionedSemantics((TableFilterNode<?, ?>) nodeByType, false);
    }

    @Test
    public void shouldSetUseVersionedSemanticsWithIntermediateAggregationMaterializedAsVersioned() {
        MaterializedInternal materializedInternal = new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5L))), this.builder, "prefix-");
        this.builder.table("t1", this.consumed, materializedInternal).groupBy((v1, v2) -> {
            return new KeyValue(v1, v2);
        }).count(new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5L))), this.builder, "prefix-")).filter((str, l) -> {
            return l != null;
        });
        this.builder.buildAndOptimizeTopology();
        GraphNode nodeByType = getNodeByType(this.builder.root, TableFilterNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType);
        verifyVersionedSemantics((TableFilterNode<?, ?>) nodeByType, true);
    }

    @Test
    public void shouldNotSetUseVersionedSemanticsWithIntermediateJoin() {
        this.builder.table("t1", this.consumed, new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5L))), this.builder, "prefix-")).join(this.builder.table("t2", this.consumed, new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5L))), this.builder, "prefix-")), (str, str2) -> {
            return str + str2;
        }).filter((str3, str4) -> {
            return str4 != null;
        });
        this.builder.buildAndOptimizeTopology();
        GraphNode nodeByType = getNodeByType(this.builder.root, TableFilterNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType);
        verifyVersionedSemantics((TableFilterNode<?, ?>) nodeByType, false);
    }

    @Test
    public void shouldSetUseVersionedSemanticsWithIntermediateJoinMaterializedAsVersioned() {
        MaterializedInternal materializedInternal = new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5L))), this.builder, "prefix-");
        MaterializedInternal materializedInternal2 = new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5L))), this.builder, "prefix-");
        this.builder.table("t1", this.consumed, materializedInternal).join(this.builder.table("t2", this.consumed, materializedInternal2), (str, str2) -> {
            return str + str2;
        }, new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned3", Duration.ofMinutes(5L))), this.builder, "prefix-")).filter((str3, str4) -> {
            return str4 != null;
        });
        this.builder.buildAndOptimizeTopology();
        GraphNode nodeByType = getNodeByType(this.builder.root, TableFilterNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType);
        verifyVersionedSemantics((TableFilterNode<?, ?>) nodeByType, true);
    }

    @Test
    public void shouldNotSetUseVersionedSemanticsWithIntermediateForeignKeyJoin() {
        this.builder.table("t1", this.consumed, new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5L))), this.builder, "prefix-")).join(this.builder.table("t2", this.consumed, new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5L))), this.builder, "prefix-")), str -> {
            return str;
        }, (str2, str3) -> {
            return str2 + str3;
        }).filter((str4, str5) -> {
            return str5 != null;
        });
        this.builder.buildAndOptimizeTopology();
        GraphNode nodeByType = getNodeByType(this.builder.root, TableFilterNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType);
        verifyVersionedSemantics((TableFilterNode<?, ?>) nodeByType, false);
    }

    @Test
    public void shouldSetUseVersionedSemanticsWithIntermediateForeignKeyJoinMaterializedAsVersioned() {
        MaterializedInternal materializedInternal = new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5L))), this.builder, "prefix-");
        MaterializedInternal materializedInternal2 = new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5L))), this.builder, "prefix-");
        this.builder.table("t1", this.consumed, materializedInternal).join(this.builder.table("t2", this.consumed, materializedInternal2), str -> {
            return str;
        }, (str2, str3) -> {
            return str2 + str3;
        }, new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned3", Duration.ofMinutes(5L))), this.builder, "prefix-")).filter((str4, str5) -> {
            return str5 != null;
        });
        this.builder.buildAndOptimizeTopology();
        GraphNode nodeByType = getNodeByType(this.builder.root, TableFilterNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType);
        verifyVersionedSemantics((TableFilterNode<?, ?>) nodeByType, true);
    }

    @Test
    public void shouldNotSetUseVersionedSemanticsWithToStreamAndBack() {
        this.builder.table("t1", this.consumed, new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5L))), this.builder, "prefix-")).toStream().toTable().filter((str, str2) -> {
            return str2 != null;
        });
        this.builder.buildAndOptimizeTopology();
        GraphNode nodeByType = getNodeByType(this.builder.root, TableFilterNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType);
        verifyVersionedSemantics((TableFilterNode<?, ?>) nodeByType, false);
    }

    @Test
    public void shouldSetUseVersionedSemanticsWithToStreamAndBackIfMaterializedAsVersioned() {
        MaterializedInternal materializedInternal = new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5L))), this.builder, "prefix-");
        this.builder.table("t1", this.consumed, materializedInternal).toStream().toTable(new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5L))), this.builder, "prefix-")).filter((str, str2) -> {
            return str2 != null;
        });
        this.builder.buildAndOptimizeTopology();
        GraphNode nodeByType = getNodeByType(this.builder.root, TableFilterNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType);
        verifyVersionedSemantics((TableFilterNode<?, ?>) nodeByType, true);
    }

    @Test
    public void shouldSetUseVersionedSemanticsOnTableRepartitionMap() {
        this.builder.table("t1", this.consumed, new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5L))), this.builder, "prefix-")).groupBy((v1, v2) -> {
            return new KeyValue(v1, v2);
        }).count();
        this.builder.buildAndOptimizeTopology();
        GraphNode nodeByType = getNodeByType(this.builder.root, TableRepartitionMapNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType);
        verifyVersionedSemantics((TableRepartitionMapNode<?, ?>) nodeByType, true);
    }

    @Test
    public void shouldSetUseVersionedSemanticsOnTableRepartitionMapWithIntermediateNodes() {
        this.builder.table("t1", this.consumed, new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5L))), this.builder, "prefix-")).filter((str, str2) -> {
            return str2 != null;
        }).mapValues(str3 -> {
            return str3 + str3;
        }).groupBy((v1, v2) -> {
            return new KeyValue(v1, v2);
        }).count();
        this.builder.buildAndOptimizeTopology();
        GraphNode nodeByType = getNodeByType(this.builder.root, TableRepartitionMapNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType);
        verifyVersionedSemantics((TableRepartitionMapNode<?, ?>) nodeByType, true);
    }

    @Test
    public void shouldSetUseVersionedSemanticsOnTableJoin() {
        this.builder.table("t1", this.consumed, new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5L))), this.builder, "prefix-")).join(this.builder.table("t2", this.consumed, new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5L))), this.builder, "prefix-")), (str, str2) -> {
            return str + str2;
        });
        this.builder.buildAndOptimizeTopology();
        GraphNode nodeByType = getNodeByType(this.builder.root, KTableKTableJoinNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType);
        verifyVersionedSemantics((KTableKTableJoinNode) nodeByType, true, true);
    }

    @Test
    public void shouldSetUseVersionedSemanticsOnTableJoinLeftOnly() {
        this.builder.table("t1", this.consumed, new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5L))), this.builder, "prefix-")).join(this.builder.table("t2", this.consumed, new MaterializedInternal(Materialized.as("unversioned"), this.builder, "prefix-")), (str, str2) -> {
            return str + str2;
        });
        this.builder.buildAndOptimizeTopology();
        GraphNode nodeByType = getNodeByType(this.builder.root, KTableKTableJoinNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType);
        verifyVersionedSemantics((KTableKTableJoinNode) nodeByType, true, false);
    }

    @Test
    public void shouldSetUseVersionedSemanticsOnTableJoinRightOnly() {
        MaterializedInternal materializedInternal = new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5L))), this.builder, "prefix-");
        this.builder.table("t1", this.consumed, new MaterializedInternal(Materialized.as("unversioned"), this.builder, "prefix-")).join(this.builder.table("t2", this.consumed, materializedInternal), (str, str2) -> {
            return str + str2;
        });
        this.builder.buildAndOptimizeTopology();
        GraphNode nodeByType = getNodeByType(this.builder.root, KTableKTableJoinNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType);
        verifyVersionedSemantics((KTableKTableJoinNode) nodeByType, false, true);
    }

    @Test
    public void shouldSetUseVersionedSemanticsOnTableSelfJoin() {
        KTable table = this.builder.table("t1", this.consumed, new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5L))), this.builder, "prefix-"));
        table.join(table, (str, str2) -> {
            return str + str2;
        });
        this.builder.buildAndOptimizeTopology();
        GraphNode nodeByType = getNodeByType(this.builder.root, KTableKTableJoinNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType);
        verifyVersionedSemantics((KTableKTableJoinNode) nodeByType, true, true);
    }

    @Test
    public void shouldSetUseVersionedSemanticsOnTableForeignJoin() {
        this.builder.table("t1", this.consumed, new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5L))), this.builder, "prefix-")).join(this.builder.table("t2", this.consumed, new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5L))), this.builder, "prefix-")), str -> {
            return str;
        }, (str2, str3) -> {
            return str2 + str3;
        });
        this.builder.buildAndOptimizeTopology();
        GraphNode nodeByType = getNodeByType(this.builder.root, ForeignJoinSubscriptionSendNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType);
        verifyVersionedSemantics((ForeignJoinSubscriptionSendNode<?, ?>) nodeByType, true);
        GraphNode nodeByType2 = getNodeByType(this.builder.root, ForeignTableJoinNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType2);
        verifyVersionedSemantics((ForeignTableJoinNode<?, ?>) nodeByType2, true);
    }

    @Test
    public void shouldSetUseVersionedSemanticsOnTableForeignJoinLeftOnly() {
        this.builder.table("t1", this.consumed, new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5L))), this.builder, "prefix-")).join(this.builder.table("t2", this.consumed, new MaterializedInternal(Materialized.as("unversioned"), this.builder, "prefix-")), str -> {
            return str;
        }, (str2, str3) -> {
            return str2 + str3;
        });
        this.builder.buildAndOptimizeTopology();
        GraphNode nodeByType = getNodeByType(this.builder.root, ForeignJoinSubscriptionSendNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType);
        verifyVersionedSemantics((ForeignJoinSubscriptionSendNode<?, ?>) nodeByType, true);
        GraphNode nodeByType2 = getNodeByType(this.builder.root, ForeignTableJoinNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType2);
        verifyVersionedSemantics((ForeignTableJoinNode<?, ?>) nodeByType2, false);
    }

    @Test
    public void shouldSetUseVersionedSemanticsOnTableForeignJoinRightOnly() {
        MaterializedInternal materializedInternal = new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5L))), this.builder, "prefix-");
        this.builder.table("t1", this.consumed, new MaterializedInternal(Materialized.as("unversioned"), this.builder, "prefix-")).join(this.builder.table("t2", this.consumed, materializedInternal), str -> {
            return str;
        }, (str2, str3) -> {
            return str2 + str3;
        });
        this.builder.buildAndOptimizeTopology();
        GraphNode nodeByType = getNodeByType(this.builder.root, ForeignJoinSubscriptionSendNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType);
        verifyVersionedSemantics((ForeignJoinSubscriptionSendNode<?, ?>) nodeByType, false);
        GraphNode nodeByType2 = getNodeByType(this.builder.root, ForeignTableJoinNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType2);
        verifyVersionedSemantics((ForeignTableJoinNode<?, ?>) nodeByType2, true);
    }

    @Test
    public void shouldSetUseVersionedSemanticsOnTableForeignSelfJoin() {
        KTable table = this.builder.table("t1", this.consumed, new MaterializedInternal(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5L))), this.builder, "prefix-"));
        table.join(table, str -> {
            return str;
        }, (str2, str3) -> {
            return str2 + str3;
        });
        this.builder.buildAndOptimizeTopology();
        GraphNode nodeByType = getNodeByType(this.builder.root, ForeignJoinSubscriptionSendNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType);
        verifyVersionedSemantics((ForeignJoinSubscriptionSendNode<?, ?>) nodeByType, true);
        GraphNode nodeByType2 = getNodeByType(this.builder.root, ForeignTableJoinNode.class, new HashSet());
        Assertions.assertNotNull(nodeByType2);
        verifyVersionedSemantics((ForeignTableJoinNode<?, ?>) nodeByType2, true);
    }

    private void verifyVersionedSemantics(TableFilterNode<?, ?> tableFilterNode, boolean z) {
        KTableFilter processorSupplier = tableFilterNode.processorParameters().processorSupplier();
        Assertions.assertInstanceOf(KTableFilter.class, processorSupplier);
        Assertions.assertEquals(Boolean.valueOf(z), Boolean.valueOf(processorSupplier.isUseVersionedSemantics()));
    }

    private void verifyVersionedSemantics(TableRepartitionMapNode<?, ?> tableRepartitionMapNode, boolean z) {
        KTableRepartitionMap processorSupplier = tableRepartitionMapNode.processorParameters().processorSupplier();
        Assertions.assertInstanceOf(KTableRepartitionMap.class, processorSupplier);
        Assertions.assertEquals(Boolean.valueOf(z), Boolean.valueOf(processorSupplier.isUseVersionedSemantics()));
    }

    private void verifyVersionedSemantics(KTableKTableJoinNode<?, ?, ?, ?> kTableKTableJoinNode, boolean z, boolean z2) {
        KTableKTableAbstractJoin processorSupplier = kTableKTableJoinNode.thisProcessorParameters().processorSupplier();
        Assertions.assertInstanceOf(KTableKTableAbstractJoin.class, processorSupplier);
        Assertions.assertEquals(Boolean.valueOf(z), Boolean.valueOf(processorSupplier.isUseVersionedSemantics()));
        KTableKTableAbstractJoin processorSupplier2 = kTableKTableJoinNode.otherProcessorParameters().processorSupplier();
        Assertions.assertInstanceOf(KTableKTableAbstractJoin.class, processorSupplier2);
        Assertions.assertEquals(Boolean.valueOf(z2), Boolean.valueOf(processorSupplier2.isUseVersionedSemantics()));
    }

    private void verifyVersionedSemantics(ForeignJoinSubscriptionSendNode<?, ?> foreignJoinSubscriptionSendNode, boolean z) {
        SubscriptionSendProcessorSupplier processorSupplier = foreignJoinSubscriptionSendNode.processorParameters().processorSupplier();
        Assertions.assertInstanceOf(SubscriptionSendProcessorSupplier.class, processorSupplier);
        Assertions.assertEquals(Boolean.valueOf(z), Boolean.valueOf(processorSupplier.isUseVersionedSemantics()));
    }

    private void verifyVersionedSemantics(ForeignTableJoinNode<?, ?> foreignTableJoinNode, boolean z) {
        ForeignTableJoinProcessorSupplier processorSupplier = foreignTableJoinNode.processorParameters().processorSupplier();
        Assertions.assertInstanceOf(ForeignTableJoinProcessorSupplier.class, processorSupplier);
        Assertions.assertEquals(Boolean.valueOf(z), Boolean.valueOf(processorSupplier.isUseVersionedSemantics()));
    }

    private GraphNode getNodeByType(GraphNode graphNode, Class<? extends GraphNode> cls, Set<GraphNode> set) {
        if (cls.isAssignableFrom(graphNode.getClass())) {
            return graphNode;
        }
        for (GraphNode graphNode2 : graphNode.children()) {
            set.add(graphNode2);
            GraphNode nodeByType = getNodeByType(graphNode2, cls, set);
            if (nodeByType != null) {
                return nodeByType;
            }
        }
        return null;
    }

    private void getNodesByType(GraphNode graphNode, Class<? extends GraphNode> cls, Set<GraphNode> set, List<GraphNode> list) {
        if (cls.isAssignableFrom(graphNode.getClass())) {
            list.add(graphNode);
        }
        for (GraphNode graphNode2 : graphNode.children()) {
            if (!set.contains(graphNode2)) {
                set.add(graphNode2);
                getNodesByType(graphNode2, cls, set, list);
            }
        }
    }

    private void countJoinWindowNodes(AtomicInteger atomicInteger, GraphNode graphNode, Set<GraphNode> set) {
        if (graphNode instanceof WindowedStreamProcessorNode) {
            atomicInteger.incrementAndGet();
        }
        for (GraphNode graphNode2 : graphNode.children()) {
            if (!set.contains(graphNode2)) {
                set.add(graphNode2);
                countJoinWindowNodes(atomicInteger, graphNode2, set);
            }
        }
    }
}
