package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;

import java.io.File;
import java.io.IOException;
import java.util.function.Supplier;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.MockProcessorContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.MockInternalProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.class */
public class ForeignTableJoinProcessorSupplierTests {
    private static final Supplier<String> PK_SERDE_TOPIC_SUPPLIER = () -> {
        return "pk-topic";
    };
    private static final CombinedKeySchema<String, String> COMBINED_KEY_SCHEMA = new CombinedKeySchema<>(() -> {
        return "fk-topic";
    }, Serdes.String(), PK_SERDE_TOPIC_SUPPLIER, Serdes.String());
    private File stateDir;
    private MockInternalProcessorContext<String, SubscriptionResponseWrapper<String>> context = null;
    private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>> stateStore = null;
    private Processor<String, Change<String>, String, SubscriptionResponseWrapper<String>> processor = null;
    private final String pk1 = "pk1";
    private final String pk2 = "pk2";
    private final String fk1 = "fk1";
    private final long[] hash = {1, 2};

    @BeforeEach
    public void setUp() {
        this.stateDir = TestUtils.tempDirectory();
        this.context = new MockInternalProcessorContext<>(StreamsTestUtils.getStreamsConfig((Serde<?>) Serdes.String(), (Serde<?>) Serdes.String()), new TaskId(0, 0), this.stateDir);
        StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder = storeBuilder();
        this.processor = new ForeignTableJoinProcessorSupplier(StoreBuilderWrapper.wrapStoreBuilder(storeBuilder()), COMBINED_KEY_SCHEMA).get();
        this.stateStore = storeBuilder.build();
        this.context.addStateStore(this.stateStore);
        this.stateStore.init(this.context, this.stateStore);
        this.processor.init(this.context);
    }

    @AfterEach
    public void cleanUp() throws IOException {
        if (this.stateStore != null) {
            this.stateStore.close();
        }
        Utils.delete(this.stateDir);
    }

    @Test
    public void shouldPropagateRightRecordForEachMatchingPrimaryKey() {
        putInStore("fk1", "pk1");
        putInStore("fk1", "pk2");
        this.processor.process(new Record("fk1", new Change("new_value", (Object) null), 0L));
        MatcherAssert.assertThat(Integer.valueOf(this.context.forwarded().size()), Matchers.is(2));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) this.context.forwarded().get(0)).record(), Matchers.is(new Record("pk1", new SubscriptionResponseWrapper(this.hash, "new_value", (Integer) null), 0L)));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) this.context.forwarded().get(1)).record(), Matchers.is(new Record("pk2", new SubscriptionResponseWrapper(this.hash, "new_value", (Integer) null), 0L)));
    }

    @Test
    public void shouldPropagateNothingIfNoMatchingPrimaryKey() {
        putInStore("fk1", "pk1");
        this.processor.process(new Record("fk2", new Change("new_value", (Object) null), 0L));
        MatcherAssert.assertThat(this.context.forwarded(), Matchers.empty());
    }

    @Test
    public void shouldPropagateTombstoneRightRecordForEachMatchingPrimaryKey() {
        putInStore("fk1", "pk1");
        putInStore("fk1", "pk2");
        this.processor.process(new Record("fk1", new Change((Object) null, "new_value"), 0L));
        MatcherAssert.assertThat(Integer.valueOf(this.context.forwarded().size()), Matchers.is(2));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) this.context.forwarded().get(0)).record(), Matchers.is(new Record("pk1", new SubscriptionResponseWrapper(this.hash, (Object) null, (Integer) null), 0L)));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) this.context.forwarded().get(1)).record(), Matchers.is(new Record("pk2", new SubscriptionResponseWrapper(this.hash, (Object) null, (Integer) null), 0L)));
    }

    @Test
    public void shouldNotMatchForeignKeysHavingThisFKAsPrefix() {
        putInStore("fk1", "pk1");
        putInStore("fk", "pk2");
        this.processor.process(new Record("fk", new Change("new_value", (Object) null), 0L));
        MatcherAssert.assertThat(Integer.valueOf(this.context.forwarded().size()), Matchers.is(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) this.context.forwarded().get(0)).record(), Matchers.is(new Record("pk2", new SubscriptionResponseWrapper(this.hash, "new_value", (Integer) null), 0L)));
    }

    @Test
    public void shouldIgnoreRecordWithNullKey() {
        putInStore("fk1", "pk1");
        this.processor.process(new Record((Object) null, new Change("new_value", (Object) null), 0L));
        MatcherAssert.assertThat(this.context.forwarded(), Matchers.empty());
        Assertions.assertEquals(Double.valueOf(1.0d), ResponseJoinProcessorSupplierTest.getDroppedRecordsTotalMetric(this.context));
        Assertions.assertNotEquals(Double.valueOf(0.0d), ResponseJoinProcessorSupplierTest.getDroppedRecordsRateMetric(this.context));
    }

    private void putInStore(String str, String str2) {
        ValueAndTimestamp make = ValueAndTimestamp.make(new SubscriptionWrapper(this.hash, SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, str2, (byte) 0, (Integer) null), 0L);
        this.stateStore.put(COMBINED_KEY_SCHEMA.toBytes(str, str2), make);
    }

    private StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder() {
        return Stores.timestampedKeyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore("Store"), new Serdes.BytesSerde(), new SubscriptionWrapperSerde(PK_SERDE_TOPIC_SUPPLIER, Serdes.String()));
    }
}
