package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockMapper;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tag("integration")
@Timeout(600)
/* loaded from: input_file:org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.class */
public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest {
    private static final String APP_ID = "stream-stream-join-integration-test";

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testSelfJoin(boolean z) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("inputTopicLeft");
        Properties properties = setupConfigsAndUtils(z);
        properties.put("application.id", "stream-stream-join-integration-test-selfJoin");
        properties.put("topology.optimization", "all");
        List<List<TestRecord<Long, String>>> asList = Arrays.asList(null, Collections.singletonList(new TestRecord(0L, "A-A", (Headers) null, 2L)), Arrays.asList(new TestRecord(0L, "B-A", (Headers) null, 3L), new TestRecord(0L, "A-B", (Headers) null, 3L), new TestRecord(0L, "B-B", (Headers) null, 3L)), null, Arrays.asList(new TestRecord(0L, "C-A", (Headers) null, 5L), new TestRecord(0L, "C-B", (Headers) null, 5L), new TestRecord(0L, "A-C", (Headers) null, 5L), new TestRecord(0L, "B-C", (Headers) null, 5L), new TestRecord(0L, "C-C", (Headers) null, 5L)), null, Arrays.asList(new TestRecord(0L, "D-A", (Headers) null, 7L), new TestRecord(0L, "D-B", (Headers) null, 7L), new TestRecord(0L, "D-C", (Headers) null, 7L), new TestRecord(0L, "A-D", (Headers) null, 7L), new TestRecord(0L, "B-D", (Headers) null, 7L), new TestRecord(0L, "C-D", (Headers) null, 7L), new TestRecord(0L, "D-D", (Headers) null, 7L)));
        stream.join(stream, this.valueJoiner, JoinWindows.ofTimeDifferenceAndGrace(Duration.ofSeconds(10L), Duration.ofHours(24L))).to("outputTopic");
        runSelfJoinTestWithDriver(asList, properties, streamsBuilder.build(properties));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testInner(boolean z) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("inputTopicLeft");
        KStream stream2 = streamsBuilder.stream("inputTopicRight");
        Properties properties = setupConfigsAndUtils(z);
        properties.put("application.id", "stream-stream-join-integration-test-inner");
        List<List<TestRecord<Long, String>>> asList = Arrays.asList(null, null, null, Collections.singletonList(new TestRecord(0L, "A-a", (Headers) null, 4L)), Collections.singletonList(new TestRecord(0L, "B-a", (Headers) null, 5L)), Arrays.asList(new TestRecord(0L, "A-b", (Headers) null, 6L), new TestRecord(0L, "B-b", (Headers) null, 6L)), null, null, Arrays.asList(new TestRecord(0L, "C-a", (Headers) null, 9L), new TestRecord(0L, "C-b", (Headers) null, 9L)), Arrays.asList(new TestRecord(0L, "A-c", (Headers) null, 10L), new TestRecord(0L, "B-c", (Headers) null, 10L), new TestRecord(0L, "C-c", (Headers) null, 10L)), null, null, null, Arrays.asList(new TestRecord(0L, "A-d", (Headers) null, 14L), new TestRecord(0L, "B-d", (Headers) null, 14L), new TestRecord(0L, "C-d", (Headers) null, 14L)), Arrays.asList(new TestRecord(0L, "D-a", (Headers) null, 15L), new TestRecord(0L, "D-b", (Headers) null, 15L), new TestRecord(0L, "D-c", (Headers) null, 15L), new TestRecord(0L, "D-d", (Headers) null, 15L)), null, null);
        stream.join(stream2, this.valueJoiner, JoinWindows.ofTimeDifferenceAndGrace(Duration.ofSeconds(10L), Duration.ofHours(24L))).to("outputTopic");
        runTestWithDriver(this.inputWithoutOutOfOrderData, asList, properties, streamsBuilder.build(properties));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testInnerRepartitioned(boolean z) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("inputTopicLeft");
        KStream stream2 = streamsBuilder.stream("inputTopicRight");
        Properties properties = setupConfigsAndUtils(z);
        properties.put("application.id", "stream-stream-join-integration-test-inner-repartitioned");
        List<List<TestRecord<Long, String>>> asList = Arrays.asList(null, null, null, Collections.singletonList(new TestRecord(0L, "A-a", (Headers) null, 4L)), Collections.singletonList(new TestRecord(0L, "B-a", (Headers) null, 5L)), Arrays.asList(new TestRecord(0L, "A-b", (Headers) null, 6L), new TestRecord(0L, "B-b", (Headers) null, 6L)), null, null, Arrays.asList(new TestRecord(0L, "C-a", (Headers) null, 9L), new TestRecord(0L, "C-b", (Headers) null, 9L)), Arrays.asList(new TestRecord(0L, "A-c", (Headers) null, 10L), new TestRecord(0L, "B-c", (Headers) null, 10L), new TestRecord(0L, "C-c", (Headers) null, 10L)), null, null, null, Arrays.asList(new TestRecord(0L, "A-d", (Headers) null, 14L), new TestRecord(0L, "B-d", (Headers) null, 14L), new TestRecord(0L, "C-d", (Headers) null, 14L)), Arrays.asList(new TestRecord(0L, "D-a", (Headers) null, 15L), new TestRecord(0L, "D-b", (Headers) null, 15L), new TestRecord(0L, "D-c", (Headers) null, 15L), new TestRecord(0L, "D-d", (Headers) null, 15L)), null, null);
        stream.map(MockMapper.noOpKeyValueMapper()).join(stream2.flatMap(MockMapper.noOpFlatKeyValueMapper()).selectKey(MockMapper.selectKeyKeyValueMapper()), this.valueJoiner, JoinWindows.ofTimeDifferenceAndGrace(Duration.ofSeconds(10L), Duration.ofHours(24L))).to("outputTopic");
        runTestWithDriver(this.inputWithoutOutOfOrderData, asList, properties, streamsBuilder.build(properties));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testLeft(boolean z) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("inputTopicLeft");
        KStream stream2 = streamsBuilder.stream("inputTopicRight");
        Properties properties = setupConfigsAndUtils(z);
        properties.put("application.id", "stream-stream-join-integration-test-left");
        List<List<TestRecord<Long, String>>> asList = Arrays.asList(null, null, null, Collections.singletonList(new TestRecord(0L, "A-a", (Headers) null, 4L)), Collections.singletonList(new TestRecord(0L, "B-a", (Headers) null, 5L)), Arrays.asList(new TestRecord(0L, "A-b", (Headers) null, 6L), new TestRecord(0L, "B-b", (Headers) null, 6L)), null, null, Arrays.asList(new TestRecord(0L, "C-a", (Headers) null, 9L), new TestRecord(0L, "C-b", (Headers) null, 9L)), Arrays.asList(new TestRecord(0L, "A-c", (Headers) null, 10L), new TestRecord(0L, "B-c", (Headers) null, 10L), new TestRecord(0L, "C-c", (Headers) null, 10L)), null, null, null, Arrays.asList(new TestRecord(0L, "A-d", (Headers) null, 14L), new TestRecord(0L, "B-d", (Headers) null, 14L), new TestRecord(0L, "C-d", (Headers) null, 14L)), Arrays.asList(new TestRecord(0L, "D-a", (Headers) null, 15L), new TestRecord(0L, "D-b", (Headers) null, 15L), new TestRecord(0L, "D-c", (Headers) null, 15L), new TestRecord(0L, "D-d", (Headers) null, 15L)), Arrays.asList(new TestRecord((Object) null, "E-null", (Headers) null, 16L)), null);
        stream.leftJoin(stream2, this.valueJoiner, JoinWindows.ofTimeDifferenceAndGrace(Duration.ofSeconds(10L), Duration.ofHours(24L))).to("outputTopic");
        runTestWithDriver(this.inputWithoutOutOfOrderData, asList, properties, streamsBuilder.build(properties));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testLeftRepartitioned(boolean z) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("inputTopicLeft");
        KStream stream2 = streamsBuilder.stream("inputTopicRight");
        Properties properties = setupConfigsAndUtils(z);
        properties.put("application.id", "stream-stream-join-integration-test-left-repartitioned");
        List<List<TestRecord<Long, String>>> asList = Arrays.asList(null, null, null, Collections.singletonList(new TestRecord(0L, "A-a", (Headers) null, 4L)), Collections.singletonList(new TestRecord(0L, "B-a", (Headers) null, 5L)), Arrays.asList(new TestRecord(0L, "A-b", (Headers) null, 6L), new TestRecord(0L, "B-b", (Headers) null, 6L)), null, null, Arrays.asList(new TestRecord(0L, "C-a", (Headers) null, 9L), new TestRecord(0L, "C-b", (Headers) null, 9L)), Arrays.asList(new TestRecord(0L, "A-c", (Headers) null, 10L), new TestRecord(0L, "B-c", (Headers) null, 10L), new TestRecord(0L, "C-c", (Headers) null, 10L)), null, null, null, Arrays.asList(new TestRecord(0L, "A-d", (Headers) null, 14L), new TestRecord(0L, "B-d", (Headers) null, 14L), new TestRecord(0L, "C-d", (Headers) null, 14L)), Arrays.asList(new TestRecord(0L, "D-a", (Headers) null, 15L), new TestRecord(0L, "D-b", (Headers) null, 15L), new TestRecord(0L, "D-c", (Headers) null, 15L), new TestRecord(0L, "D-d", (Headers) null, 15L)), Arrays.asList(new TestRecord((Object) null, "E-null", (Headers) null, 16L)), null);
        stream.map(MockMapper.noOpKeyValueMapper()).leftJoin(stream2.flatMap(MockMapper.noOpFlatKeyValueMapper()).selectKey(MockMapper.selectKeyKeyValueMapper()), this.valueJoiner, JoinWindows.ofTimeDifferenceAndGrace(Duration.ofSeconds(10L), Duration.ofHours(24L))).to("outputTopic");
        runTestWithDriver(this.inputWithoutOutOfOrderData, asList, properties, streamsBuilder.build(properties));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testOuter(boolean z) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("inputTopicLeft");
        KStream stream2 = streamsBuilder.stream("inputTopicRight");
        Properties properties = setupConfigsAndUtils(z);
        properties.put("application.id", "stream-stream-join-integration-test-outer");
        List<List<TestRecord<Long, String>>> asList = Arrays.asList(null, null, null, Collections.singletonList(new TestRecord(0L, "A-a", (Headers) null, 4L)), Collections.singletonList(new TestRecord(0L, "B-a", (Headers) null, 5L)), Arrays.asList(new TestRecord(0L, "A-b", (Headers) null, 6L), new TestRecord(0L, "B-b", (Headers) null, 6L)), null, null, Arrays.asList(new TestRecord(0L, "C-a", (Headers) null, 9L), new TestRecord(0L, "C-b", (Headers) null, 9L)), Arrays.asList(new TestRecord(0L, "A-c", (Headers) null, 10L), new TestRecord(0L, "B-c", (Headers) null, 10L), new TestRecord(0L, "C-c", (Headers) null, 10L)), null, null, null, Arrays.asList(new TestRecord(0L, "A-d", (Headers) null, 14L), new TestRecord(0L, "B-d", (Headers) null, 14L), new TestRecord(0L, "C-d", (Headers) null, 14L)), Arrays.asList(new TestRecord(0L, "D-a", (Headers) null, 15L), new TestRecord(0L, "D-b", (Headers) null, 15L), new TestRecord(0L, "D-c", (Headers) null, 15L), new TestRecord(0L, "D-d", (Headers) null, 15L)), Arrays.asList(new TestRecord((Object) null, "E-null", (Headers) null, 16L)), Arrays.asList(new TestRecord((Object) null, "null-e", (Headers) null, 17L)));
        stream.outerJoin(stream2, this.valueJoiner, JoinWindows.ofTimeDifferenceAndGrace(Duration.ofSeconds(10L), Duration.ofHours(24L))).to("outputTopic");
        runTestWithDriver(this.inputWithoutOutOfOrderData, asList, properties, streamsBuilder.build(properties));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testOuterRepartitioned(boolean z) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("inputTopicLeft");
        KStream stream2 = streamsBuilder.stream("inputTopicRight");
        Properties properties = setupConfigsAndUtils(z);
        properties.put("application.id", "stream-stream-join-integration-test-outer");
        List<List<TestRecord<Long, String>>> asList = Arrays.asList(null, null, null, Collections.singletonList(new TestRecord(0L, "A-a", (Headers) null, 4L)), Collections.singletonList(new TestRecord(0L, "B-a", (Headers) null, 5L)), Arrays.asList(new TestRecord(0L, "A-b", (Headers) null, 6L), new TestRecord(0L, "B-b", (Headers) null, 6L)), null, null, Arrays.asList(new TestRecord(0L, "C-a", (Headers) null, 9L), new TestRecord(0L, "C-b", (Headers) null, 9L)), Arrays.asList(new TestRecord(0L, "A-c", (Headers) null, 10L), new TestRecord(0L, "B-c", (Headers) null, 10L), new TestRecord(0L, "C-c", (Headers) null, 10L)), null, null, null, Arrays.asList(new TestRecord(0L, "A-d", (Headers) null, 14L), new TestRecord(0L, "B-d", (Headers) null, 14L), new TestRecord(0L, "C-d", (Headers) null, 14L)), Arrays.asList(new TestRecord(0L, "D-a", (Headers) null, 15L), new TestRecord(0L, "D-b", (Headers) null, 15L), new TestRecord(0L, "D-c", (Headers) null, 15L), new TestRecord(0L, "D-d", (Headers) null, 15L)), Arrays.asList(new TestRecord((Object) null, "E-null", (Headers) null, 16L)), Arrays.asList(new TestRecord((Object) null, "null-e", (Headers) null, 17L)));
        stream.map(MockMapper.noOpKeyValueMapper()).outerJoin(stream2.flatMap(MockMapper.noOpFlatKeyValueMapper()).selectKey(MockMapper.selectKeyKeyValueMapper()), this.valueJoiner, JoinWindows.ofTimeDifferenceAndGrace(Duration.ofSeconds(10L), Duration.ofHours(24L))).to("outputTopic");
        runTestWithDriver(this.inputWithoutOutOfOrderData, asList, properties, streamsBuilder.build(properties));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testMultiInner(boolean z) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("inputTopicLeft");
        KStream stream2 = streamsBuilder.stream("inputTopicRight");
        Properties properties = setupConfigsAndUtils(z);
        properties.put("application.id", "stream-stream-join-integration-test-multi-inner");
        List<List<TestRecord<Long, String>>> asList = Arrays.asList(null, null, null, Collections.singletonList(new TestRecord(0L, "A-a-a", (Headers) null, 4L)), Collections.singletonList(new TestRecord(0L, "B-a-a", (Headers) null, 5L)), Arrays.asList(new TestRecord(0L, "A-b-a", (Headers) null, 6L), new TestRecord(0L, "B-b-a", (Headers) null, 6L), new TestRecord(0L, "A-a-b", (Headers) null, 6L), new TestRecord(0L, "B-a-b", (Headers) null, 6L), new TestRecord(0L, "A-b-b", (Headers) null, 6L), new TestRecord(0L, "B-b-b", (Headers) null, 6L)), null, null, Arrays.asList(new TestRecord(0L, "C-a-a", (Headers) null, 9L), new TestRecord(0L, "C-a-b", (Headers) null, 9L), new TestRecord(0L, "C-b-a", (Headers) null, 9L), new TestRecord(0L, "C-b-b", (Headers) null, 9L)), Arrays.asList(new TestRecord(0L, "A-c-a", (Headers) null, 10L), new TestRecord(0L, "A-c-b", (Headers) null, 10L), new TestRecord(0L, "B-c-a", (Headers) null, 10L), new TestRecord(0L, "B-c-b", (Headers) null, 10L), new TestRecord(0L, "C-c-a", (Headers) null, 10L), new TestRecord(0L, "C-c-b", (Headers) null, 10L), new TestRecord(0L, "A-a-c", (Headers) null, 10L), new TestRecord(0L, "B-a-c", (Headers) null, 10L), new TestRecord(0L, "A-b-c", (Headers) null, 10L), new TestRecord(0L, "B-b-c", (Headers) null, 10L), new TestRecord(0L, "C-a-c", (Headers) null, 10L), new TestRecord(0L, "C-b-c", (Headers) null, 10L), new TestRecord(0L, "A-c-c", (Headers) null, 10L), new TestRecord(0L, "B-c-c", (Headers) null, 10L), new TestRecord(0L, "C-c-c", (Headers) null, 10L)), null, null, null, Arrays.asList(new TestRecord(0L, "A-d-a", (Headers) null, 14L), new TestRecord(0L, "A-d-b", (Headers) null, 14L), new TestRecord(0L, "A-d-c", (Headers) null, 14L), new TestRecord(0L, "B-d-a", (Headers) null, 14L), new TestRecord(0L, "B-d-b", (Headers) null, 14L), new TestRecord(0L, "B-d-c", (Headers) null, 14L), new TestRecord(0L, "C-d-a", (Headers) null, 14L), new TestRecord(0L, "C-d-b", (Headers) null, 14L), new TestRecord(0L, "C-d-c", (Headers) null, 14L), new TestRecord(0L, "A-a-d", (Headers) null, 14L), new TestRecord(0L, "B-a-d", (Headers) null, 14L), new TestRecord(0L, "A-b-d", (Headers) null, 14L), new TestRecord(0L, "B-b-d", (Headers) null, 14L), new TestRecord(0L, "C-a-d", (Headers) null, 14L), new TestRecord(0L, "C-b-d", (Headers) null, 14L), new TestRecord(0L, "A-c-d", (Headers) null, 14L), new TestRecord(0L, "B-c-d", (Headers) null, 14L), new TestRecord(0L, "C-c-d", (Headers) null, 14L), new TestRecord(0L, "A-d-d", (Headers) null, 14L), new TestRecord(0L, "B-d-d", (Headers) null, 14L), new TestRecord(0L, "C-d-d", (Headers) null, 14L)), Arrays.asList(new TestRecord(0L, "D-a-a", (Headers) null, 15L), new TestRecord(0L, "D-a-b", (Headers) null, 15L), new TestRecord(0L, "D-a-c", (Headers) null, 15L), new TestRecord(0L, "D-a-d", (Headers) null, 15L), new TestRecord(0L, "D-b-a", (Headers) null, 15L), new TestRecord(0L, "D-b-b", (Headers) null, 15L), new TestRecord(0L, "D-b-c", (Headers) null, 15L), new TestRecord(0L, "D-b-d", (Headers) null, 15L), new TestRecord(0L, "D-c-a", (Headers) null, 15L), new TestRecord(0L, "D-c-b", (Headers) null, 15L), new TestRecord(0L, "D-c-c", (Headers) null, 15L), new TestRecord(0L, "D-c-d", (Headers) null, 15L), new TestRecord(0L, "D-d-a", (Headers) null, 15L), new TestRecord(0L, "D-d-b", (Headers) null, 15L), new TestRecord(0L, "D-d-c", (Headers) null, 15L), new TestRecord(0L, "D-d-d", (Headers) null, 15L)), null, null);
        stream.join(stream2, this.valueJoiner, JoinWindows.ofTimeDifferenceAndGrace(Duration.ofSeconds(10L), Duration.ofHours(24L))).join(stream2, this.valueJoiner, JoinWindows.ofTimeDifferenceAndGrace(Duration.ofSeconds(10L), Duration.ofHours(24L))).to("outputTopic");
        runTestWithDriver(this.inputWithoutOutOfOrderData, asList, properties, streamsBuilder.build(properties));
    }
}
