package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.class */
public class KStreamGlobalKTableLeftJoinTest {
    private static final KeyValueTimestamp[] EMPTY = new KeyValueTimestamp[0];
    private TestInputTopic<Integer, String> inputStreamTopic;
    private TestInputTopic<String, String> inputTableTopic;
    private MockApiProcessor<Integer, String, Void, Void> processor;
    private TopologyTestDriver driver;
    private StreamsBuilder builder;
    private final String streamTopic = "streamTopic";
    private final String globalTableTopic = "globalTableTopic";
    private final int[] expectedKeys = {0, 1, 2, 3};

    @Before
    public void setUp() {
        init(Optional.empty());
    }

    private void initWithVersionedStore(long j) {
        init(Optional.of(Long.valueOf(j)));
    }

    private void init(Optional<Long> optional) {
        this.builder = new StreamsBuilder();
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        Consumed with = Consumed.with(Serdes.Integer(), Serdes.String());
        Consumed with2 = Consumed.with(Serdes.String(), Serdes.String());
        this.builder.stream("streamTopic", with).leftJoin(optional.isPresent() ? this.builder.globalTable("globalTableTopic", with2, Materialized.as(Stores.persistentVersionedKeyValueStore("table", Duration.ofMillis(optional.get().longValue())))) : this.builder.globalTable("globalTableTopic", with2), (num, str) -> {
            String[] split = str.split(",");
            if (split.length > 1) {
                return split[1];
            }
            return null;
        }, MockValueJoiner.TOSTRING_JOINER).process(mockApiProcessorSupplier, new String[0]);
        this.driver = new TopologyTestDriver(this.builder.build(), StreamsTestUtils.getStreamsConfig((Serde<?>) Serdes.Integer(), (Serde<?>) Serdes.String()));
        this.processor = mockApiProcessorSupplier.theCapturedProcessor();
        this.inputStreamTopic = this.driver.createInputTopic("streamTopic", new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ofMillis(1L));
        this.inputTableTopic = this.driver.createInputTopic("globalTableTopic", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ofMillis(1L));
    }

    @After
    public void cleanup() {
        this.driver.close();
    }

    private void pushToStream(int i, String str, boolean z, boolean z2) {
        for (int i2 = 0; i2 < i; i2++) {
            String str2 = str + this.expectedKeys[i2];
            if (z) {
                str2 = str2 + ",FKey" + this.expectedKeys[i2];
            }
            Integer valueOf = Integer.valueOf(this.expectedKeys[i2]);
            if (z2 && i2 == 0) {
                valueOf = null;
            }
            this.inputStreamTopic.pipeInput(valueOf, str2);
        }
    }

    private void pushToGlobalTable(int i, String str) {
        for (int i2 = 0; i2 < i; i2++) {
            this.inputTableTopic.pipeInput("FKey" + this.expectedKeys[i2], str + this.expectedKeys[i2]);
        }
    }

    private void pushNullValueToGlobalTable(int i) {
        for (int i2 = 0; i2 < i; i2++) {
            this.inputTableTopic.pipeInput("FKey" + this.expectedKeys[i2], (String) null);
        }
    }

    @Test
    public void shouldNotRequireCopartitioning() {
        Assert.assertEquals("KStream-GlobalKTable joins do not need to be co-partitioned", 0L, TopologyWrapper.getInternalTopologyBuilder(this.builder.build()).copartitionGroups().size());
    }

    @Test
    public void shouldNotJoinWithEmptyGlobalTableOnStreamUpdates() {
        pushToStream(2, "X", true, false);
        this.processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+null", 0L), new KeyValueTimestamp<>(1, "X1,FKey1+null", 1L));
    }

    @Test
    public void shouldNotJoinOnGlobalTableUpdates() {
        pushToStream(2, "X", true, false);
        this.processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+null", 0L), new KeyValueTimestamp<>(1, "X1,FKey1+null", 1L));
        pushToGlobalTable(2, "Y");
        this.processor.checkAndClearProcessResult(EMPTY);
        pushToStream(4, "X", true, false);
        this.processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 2L), new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 3L), new KeyValueTimestamp<>(2, "X2,FKey2+null", 4L), new KeyValueTimestamp<>(3, "X3,FKey3+null", 5L));
        pushToGlobalTable(4, "YY");
        this.processor.checkAndClearProcessResult(EMPTY);
        pushToStream(4, "X", true, false);
        this.processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+YY0", 6L), new KeyValueTimestamp<>(1, "X1,FKey1+YY1", 7L), new KeyValueTimestamp<>(2, "X2,FKey2+YY2", 8L), new KeyValueTimestamp<>(3, "X3,FKey3+YY3", 9L));
        pushToGlobalTable(4, "YYY");
        this.processor.checkAndClearProcessResult(EMPTY);
    }

    @Test
    public void shouldJoinRegardlessIfMatchFoundOnStreamUpdates() {
        pushToGlobalTable(2, "Y");
        this.processor.checkAndClearProcessResult(EMPTY);
        pushToStream(4, "X", true, false);
        this.processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 0L), new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1L), new KeyValueTimestamp<>(2, "X2,FKey2+null", 2L), new KeyValueTimestamp<>(3, "X3,FKey3+null", 3L));
    }

    @Test
    public void shouldClearGlobalTableEntryOnNullValueUpdates() {
        pushToGlobalTable(4, "Y");
        this.processor.checkAndClearProcessResult(EMPTY);
        pushToStream(4, "X", true, false);
        this.processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 0L), new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1L), new KeyValueTimestamp<>(2, "X2,FKey2+Y2", 2L), new KeyValueTimestamp<>(3, "X3,FKey3+Y3", 3L));
        pushNullValueToGlobalTable(2);
        this.processor.checkAndClearProcessResult(EMPTY);
        pushToStream(4, "XX", true, false);
        this.processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "XX0,FKey0+null", 4L), new KeyValueTimestamp<>(1, "XX1,FKey1+null", 5L), new KeyValueTimestamp<>(2, "XX2,FKey2+Y2", 6L), new KeyValueTimestamp<>(3, "XX3,FKey3+Y3", 7L));
    }

    @Test
    public void shouldNotJoinOnNullKeyMapperValues() {
        pushToGlobalTable(4, "Y");
        this.processor.checkAndClearProcessResult(EMPTY);
        pushToStream(4, "XXX", false, false);
        this.processor.checkAndClearProcessResult(EMPTY);
    }

    @Test
    public void shouldJoinOnNullKeyWithNonNullKeyMapperValues() {
        pushToGlobalTable(4, "Y");
        this.processor.checkAndClearProcessResult(EMPTY);
        pushToStream(4, "X", true, true);
        this.processor.checkAndClearProcessResult(new KeyValueTimestamp<>(null, "X0,FKey0+Y0", 0L), new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1L), new KeyValueTimestamp<>(2, "X2,FKey2+Y2", 2L), new KeyValueTimestamp<>(3, "X3,FKey3+Y3", 3L));
    }
}
