package org.apache.kafka.streams.kstream.internals;

import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.class */
public class KTableKTableForeignKeyJoinScenarioTest {
    private static final String LEFT_TABLE = "left_table";
    private static final String RIGHT_TABLE = "right_table";
    private static final String OUTPUT = "output-topic";

    @Rule
    public TestName testName = new TestName();

    @Test
    public void shouldWorkWithDefaultSerdes() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable table = streamsBuilder.table("A");
        table.join(table.join(streamsBuilder.table("B"), str -> {
            return Integer.valueOf(Integer.parseInt(str.split("-")[0]));
        }, (str2, str3) -> {
            return "(" + str2 + "," + str3 + ")";
        }, Materialized.as("asdf")), (str4, str5) -> {
            return "(" + str4 + "," + str5 + ")";
        }).toStream().to("output");
        validateTopologyCanProcessData(streamsBuilder);
    }

    @Test
    public void shouldWorkWithDefaultAndConsumedSerdes() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable table = streamsBuilder.table("A", Consumed.with(Serdes.Integer(), Serdes.String()));
        table.join(table.join(streamsBuilder.table("B"), str -> {
            return Integer.valueOf(Integer.parseInt(str.split("-")[0]));
        }, (str2, str3) -> {
            return "(" + str2 + "," + str3 + ")";
        }, Materialized.as("asdf")), (str4, str5) -> {
            return "(" + str4 + "," + str5 + ")";
        }).toStream().to("output");
        validateTopologyCanProcessData(streamsBuilder);
    }

    @Test
    public void shouldWorkWithDefaultAndJoinResultSerdes() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable table = streamsBuilder.table("A");
        table.join(table.join(streamsBuilder.table("B"), str -> {
            return Integer.valueOf(Integer.parseInt(str.split("-")[0]));
        }, (str2, str3) -> {
            return "(" + str2 + "," + str3 + ")";
        }, Materialized.as("asdf").withKeySerde(Serdes.Integer()).withValueSerde(Serdes.String())), (str4, str5) -> {
            return "(" + str4 + "," + str5 + ")";
        }).toStream().to("output");
        validateTopologyCanProcessData(streamsBuilder);
    }

    @Test
    public void shouldWorkWithDefaultAndEquiJoinResultSerdes() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable table = streamsBuilder.table("A");
        table.join(table.join(streamsBuilder.table("B"), str -> {
            return Integer.valueOf(Integer.parseInt(str.split("-")[0]));
        }, (str2, str3) -> {
            return "(" + str2 + "," + str3 + ")";
        }, Materialized.as("asdf")), (str4, str5) -> {
            return "(" + str4 + "," + str5 + ")";
        }, Materialized.with(Serdes.Integer(), Serdes.String())).toStream().to("output");
        validateTopologyCanProcessData(streamsBuilder);
    }

    @Test
    public void shouldWorkWithDefaultAndProducedSerdes() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable table = streamsBuilder.table("A");
        table.join(table.join(streamsBuilder.table("B"), str -> {
            return Integer.valueOf(Integer.parseInt(str.split("-")[0]));
        }, (str2, str3) -> {
            return "(" + str2 + "," + str3 + ")";
        }, Materialized.as("asdf")), (str4, str5) -> {
            return "(" + str4 + "," + str5 + ")";
        }).toStream().to("output", Produced.with(Serdes.Integer(), Serdes.String()));
        validateTopologyCanProcessData(streamsBuilder);
    }

    @Test
    public void shouldUseExpectedTopicsWithSerde() {
        Properties mkProperties = Utils.mkProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("application.id", "ktable-ktable-joinOnForeignKey"), Utils.mkEntry("state.dir", TestUtils.tempDirectory().getPath())}));
        UniqueTopicSerdeScope uniqueTopicSerdeScope = new UniqueTopicSerdeScope();
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table(LEFT_TABLE, Consumed.with(uniqueTopicSerdeScope.decorateSerde(Serdes.Integer(), mkProperties, true), uniqueTopicSerdeScope.decorateSerde(Serdes.String(), mkProperties, false))).join(streamsBuilder.table(RIGHT_TABLE, Consumed.with(uniqueTopicSerdeScope.decorateSerde(Serdes.Integer(), mkProperties, true), uniqueTopicSerdeScope.decorateSerde(Serdes.String(), mkProperties, false))), str -> {
            return Integer.valueOf(Integer.parseInt(str.split("\\|")[1]));
        }, (str2, str3) -> {
            return "(" + str2 + "," + str3 + ")";
        }, Materialized.with((Serde) null, uniqueTopicSerdeScope.decorateSerde(Serdes.String(), mkProperties, false))).toStream().to(OUTPUT);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(mkProperties), mkProperties);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(LEFT_TABLE, new IntegerSerializer(), new StringSerializer());
                TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic(RIGHT_TABLE, new IntegerSerializer(), new StringSerializer());
                createInputTopic.pipeInput(2, "lhsValue1|1");
                createInputTopic2.pipeInput(1, "rhsValue1");
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                MatcherAssert.assertThat(uniqueTopicSerdeScope.registeredTopics(), Matchers.is(Utils.mkSet(new String[]{"ktable-ktable-joinOnForeignKey-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-fk--key", "ktable-ktable-joinOnForeignKey-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-pk--key", "ktable-ktable-joinOnForeignKey-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-vh--value", "ktable-ktable-joinOnForeignKey-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic--key", "ktable-ktable-joinOnForeignKey-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic--key", "ktable-ktable-joinOnForeignKey-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic--value", "ktable-ktable-joinOnForeignKey-left_table-STATE-STORE-0000000000-changelog--key", "ktable-ktable-joinOnForeignKey-left_table-STATE-STORE-0000000000-changelog--value", "ktable-ktable-joinOnForeignKey-right_table-STATE-STORE-0000000003-changelog--key", "ktable-ktable-joinOnForeignKey-right_table-STATE-STORE-0000000003-changelog--value", "output-topic--key", "output-topic--value"})));
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    private void validateTopologyCanProcessData(StreamsBuilder streamsBuilder) {
        Properties properties = new Properties();
        IntegrationTestUtils.safeUniqueTestName(this.testName);
        properties.setProperty("default.key.serde", Serdes.IntegerSerde.class.getName());
        properties.setProperty("default.value.serde", Serdes.StringSerde.class.getName());
        properties.setProperty("state.dir", TestUtils.tempDirectory().getAbsolutePath());
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), properties);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("A", new IntegerSerializer(), new StringSerializer());
                TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic("B", new IntegerSerializer(), new StringSerializer());
                TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic("output", new IntegerDeserializer(), new StringDeserializer());
                createInputTopic.pipeInput(1, "999-alpha");
                createInputTopic2.pipeInput(999, "beta");
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), Matchers.is(Collections.singletonMap(1, "(999-alpha,(999-alpha,beta))")));
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }
}
