package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamImplValueJoinerWithKeyTest.class */
public class KStreamImplValueJoinerWithKeyTest {
    private KStream<String, Integer> leftStream;
    private KStream<String, Integer> rightStream;
    private KTable<String, Integer> ktable;
    private GlobalKTable<String, Integer> globalKTable;
    private StreamsBuilder builder;
    private final Properties props = StreamsTestUtils.getStreamsConfig((Serde<?>) Serdes.String(), (Serde<?>) Serdes.String());
    private final String leftTopic = "left";
    private final String rightTopic = "right";
    private final String ktableTopic = "ktableTopic";
    private final String globalTopic = "globalTopic";
    private final String outputTopic = "joined-result";
    private final ValueJoinerWithKey<String, Integer, Integer, String> valueJoinerWithKey = (str, num, num2) -> {
        return str + ":" + (num.intValue() + (num2 == null ? 0 : num2.intValue()));
    };
    private final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(100), Duration.ofHours(24));
    private final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer());
    private final Joined<String, Integer, Integer> joined = Joined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer());
    private final KeyValueMapper<String, Integer, String> keyValueMapper = (str, num) -> {
        return str;
    };

    @BeforeEach
    public void setup() {
        this.builder = new StreamsBuilder();
        this.leftStream = this.builder.stream("left", Consumed.with(Serdes.String(), Serdes.Integer()));
        this.rightStream = this.builder.stream("right", Consumed.with(Serdes.String(), Serdes.Integer()));
        this.ktable = this.builder.table("ktableTopic", Consumed.with(Serdes.String(), Serdes.Integer()));
        this.globalKTable = this.builder.globalTable("globalTopic", Consumed.with(Serdes.String(), Serdes.Integer()));
    }

    @Test
    public void shouldIncludeKeyInStreamSteamJoinResults() {
        this.leftStream.join(this.rightStream, this.valueJoinerWithKey, this.joinWindows, this.streamJoined).to("joined-result", Produced.with(Serdes.String(), Serdes.String()));
        runJoinTopology(this.builder, Collections.singletonList(KeyValue.pair("A", "A:5")), false, "right");
    }

    @Test
    public void shouldIncludeKeyInStreamLeftJoinResults() {
        this.leftStream.leftJoin(this.rightStream, this.valueJoinerWithKey, this.joinWindows, this.streamJoined).to("joined-result", Produced.with(Serdes.String(), Serdes.String()));
        runJoinTopology(this.builder, Collections.singletonList(KeyValue.pair("A", "A:5")), false, "right");
    }

    @Test
    public void shouldIncludeKeyInStreamOuterJoinResults() {
        this.leftStream.outerJoin(this.rightStream, this.valueJoinerWithKey, this.joinWindows, this.streamJoined).to("joined-result", Produced.with(Serdes.String(), Serdes.String()));
        runJoinTopology(this.builder, Collections.singletonList(KeyValue.pair("A", "A:5")), false, "right");
    }

    @Test
    public void shouldIncludeKeyInStreamTableJoinResults() {
        this.leftStream.join(this.ktable, this.valueJoinerWithKey, this.joined).to("joined-result", Produced.with(Serdes.String(), Serdes.String()));
        runJoinTopology(this.builder, Collections.singletonList(KeyValue.pair("A", "A:5")), true, "ktableTopic");
    }

    @Test
    public void shouldIncludeKeyInStreamTableLeftJoinResults() {
        this.leftStream.leftJoin(this.ktable, this.valueJoinerWithKey, this.joined).to("joined-result", Produced.with(Serdes.String(), Serdes.String()));
        runJoinTopology(this.builder, Collections.singletonList(KeyValue.pair("A", "A:5")), true, "ktableTopic");
    }

    @Test
    public void shouldIncludeKeyInStreamGlobalTableJoinResults() {
        this.leftStream.join(this.globalKTable, this.keyValueMapper, this.valueJoinerWithKey).to("joined-result", Produced.with(Serdes.String(), Serdes.String()));
        runJoinTopology(this.builder, Collections.singletonList(KeyValue.pair("A", "A:5")), true, "globalTopic");
    }

    @Test
    public void shouldIncludeKeyInStreamGlobalTableLeftJoinResults() {
        this.leftStream.leftJoin(this.globalKTable, this.keyValueMapper, this.valueJoinerWithKey).to("joined-result", Produced.with(Serdes.String(), Serdes.String()));
        runJoinTopology(this.builder, Collections.singletonList(KeyValue.pair("A", "A:5")), true, "globalTopic");
    }

    private void runJoinTopology(StreamsBuilder streamsBuilder, List<KeyValue<String, String>> list, boolean z, String str) {
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(str, new StringSerializer(), new IntegerSerializer());
                TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic("left", new StringSerializer(), new IntegerSerializer());
                TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic("joined-result", new StringDeserializer(), new StringDeserializer());
                if (z) {
                    createInputTopic.pipeInput("A", 2);
                    createInputTopic2.pipeInput("A", 3);
                } else {
                    createInputTopic2.pipeInput("A", 3);
                    createInputTopic.pipeInput("A", 2);
                }
                Assertions.assertEquals(list, createOutputTopic.readKeyValuesToList());
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }
}
