package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper;
import org.apache.kafka.streams.processor.api.MockProcessorContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.state.internals.Murmur3;
import org.apache.kafka.test.MockInternalProcessorContext;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.class */
public class SubscriptionSendProcessorSupplierTest {
    private final Processor<String, Change<LeftValue>, String, SubscriptionWrapper<String>> leftJoinProcessor = new SubscriptionSendProcessorSupplier(ForeignKeyExtractor.fromFunction((v0) -> {
        return v0.getForeignKey();
    }), () -> {
        return "subscription-topic-fk";
    }, () -> {
        return "value-serde-topic";
    }, Serdes.String(), new LeftValueSerializer(), true).get();
    private final Processor<String, Change<LeftValue>, String, SubscriptionWrapper<String>> innerJoinProcessor = new SubscriptionSendProcessorSupplier(ForeignKeyExtractor.fromFunction((v0) -> {
        return v0.getForeignKey();
    }), () -> {
        return "subscription-topic-fk";
    }, () -> {
        return "value-serde-topic";
    }, Serdes.String(), new LeftValueSerializer(), false).get();
    private final String pk = "pk";
    private final String fk1 = "fk1";
    private final String fk2 = "fk2";
    private final Processor<String, Change<LeftValue>, String, SubscriptionWrapper<String>> biFunctionLeftJoinProcessor = new SubscriptionSendProcessorSupplier(ForeignKeyExtractor.fromBiFunction((str, leftValue) -> {
        if (leftValue.getForeignKey() == null) {
            return null;
        }
        return str + leftValue.getForeignKey();
    }), () -> {
        return "subscription-topic-fk";
    }, () -> {
        return "value-serde-topic";
    }, Serdes.String(), new LeftValueSerializer(), true).get();
    private final Processor<String, Change<LeftValue>, String, SubscriptionWrapper<String>> biFunctionInnerJoinProcessor = new SubscriptionSendProcessorSupplier(ForeignKeyExtractor.fromBiFunction((str, leftValue) -> {
        if (leftValue.getForeignKey() == null) {
            return null;
        }
        return str + leftValue.getForeignKey();
    }), () -> {
        return "subscription-topic-fk";
    }, () -> {
        return "value-serde-topic";
    }, Serdes.String(), new LeftValueSerializer(), false).get();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest$LeftValue.class */
    public static final class LeftValue {
        private final String foreignKey;

        public LeftValue(String str) {
            this.foreignKey = str;
        }

        public String getForeignKey() {
            return this.foreignKey;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest$LeftValueSerializer.class */
    public static class LeftValueSerializer implements Serializer<LeftValue> {
        private LeftValueSerializer() {
        }

        public byte[] serialize(String str, LeftValue leftValue) {
            if (leftValue == null) {
                return null;
            }
            return leftValue.foreignKey == null ? "null".getBytes() : new StringSerializer().serialize(str, leftValue.getForeignKey());
        }
    }

    @Test
    public void leftJoinShouldPropagateNewPrimaryKeyWithNonNullFK() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.leftJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        LeftValue leftValue = new LeftValue("fk1");
        this.leftJoinProcessor.process(new Record("pk", new Change(leftValue, (Object) null), 0L));
        MatcherAssert.assertThat(Integer.valueOf(mockInternalProcessorContext.forwarded().size()), Matchers.is(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), Matchers.is(new Record("fk1", new SubscriptionWrapper(hash(leftValue), SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, "pk", 0), 0L)));
    }

    @Test
    public void leftJoinShouldPropagateNewPrimaryKeyWithNullFK() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.leftJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        LeftValue leftValue = new LeftValue(null);
        this.leftJoinProcessor.process(new Record("pk", new Change(leftValue, (Object) null), 0L));
        MatcherAssert.assertThat(Integer.valueOf(mockInternalProcessorContext.forwarded().size()), Matchers.is(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), Matchers.is(new Record((Object) null, new SubscriptionWrapper(hash(leftValue), SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, "pk", 0), 0L)));
    }

    @Test
    public void leftJoinShouldPropagateChangeOfFKFromNonNullToNonNullValue() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.leftJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        LeftValue leftValue = new LeftValue("fk2");
        this.leftJoinProcessor.process(new Record("pk", new Change(leftValue, new LeftValue("fk1")), 0L));
        MatcherAssert.assertThat(Integer.valueOf(mockInternalProcessorContext.forwarded().size()), Matchers.is(2));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(1)).record(), Matchers.is(new Record("fk2", new SubscriptionWrapper(hash(leftValue), SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, "pk", 0), 0L)));
    }

    @Test
    public void leftJoinShouldPropagateNewRecordOfUnchangedFK() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.leftJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        LeftValue leftValue = new LeftValue("fk1");
        this.leftJoinProcessor.process(new Record("pk", new Change(leftValue, leftValue), 0L));
        MatcherAssert.assertThat(Integer.valueOf(mockInternalProcessorContext.forwarded().size()), Matchers.is(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), Matchers.is(new Record("fk1", new SubscriptionWrapper(hash(leftValue), SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, "pk", 0), 0L)));
    }

    @Test
    public void leftJoinShouldPropagateChangeOfFKFromNonNullToNullValue() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.leftJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        LeftValue leftValue = new LeftValue(null);
        this.leftJoinProcessor.process(new Record("pk", new Change(leftValue, new LeftValue("fk1")), 0L));
        MatcherAssert.assertThat(Integer.valueOf(mockInternalProcessorContext.forwarded().size()), Matchers.greaterThan(0));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), Matchers.is(new Record("fk1", new SubscriptionWrapper(hash(leftValue), SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, "pk", 0), 0L)));
    }

    @Test
    public void leftJoinShouldPropagateChangeFromNullFKToNonNullFKValue() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.leftJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        LeftValue leftValue = new LeftValue("fk1");
        this.leftJoinProcessor.process(new Record("pk", new Change(leftValue, new LeftValue(null)), 0L));
        MatcherAssert.assertThat(Integer.valueOf(mockInternalProcessorContext.forwarded().size()), Matchers.is(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), Matchers.is(new Record("fk1", new SubscriptionWrapper(hash(leftValue), SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, "pk", 0), 0L)));
    }

    @Test
    public void leftJoinShouldPropagateChangeFromNullFKToNullFKValue() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.leftJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        LeftValue leftValue = new LeftValue(null);
        this.leftJoinProcessor.process(new Record("pk", new Change(leftValue, leftValue), 0L));
        MatcherAssert.assertThat(Integer.valueOf(mockInternalProcessorContext.forwarded().size()), Matchers.is(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), Matchers.is(new Record((Object) null, new SubscriptionWrapper(hash(leftValue), SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, "pk", 0), 0L)));
    }

    @Test
    public void leftJoinShouldPropagateDeletionOfAPrimaryKey() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.leftJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        this.leftJoinProcessor.process(new Record("pk", new Change((Object) null, new LeftValue("fk1")), 0L));
        MatcherAssert.assertThat(Integer.valueOf(mockInternalProcessorContext.forwarded().size()), Matchers.greaterThan(0));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), Matchers.is(new Record("fk1", new SubscriptionWrapper((long[]) null, SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, "pk", 0), 0L)));
    }

    @Test
    public void leftJoinShouldPropagateDeletionOfAPrimaryKeyThatHadNullFK() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.leftJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        this.leftJoinProcessor.process(new Record("pk", new Change((Object) null, new LeftValue(null)), 0L));
        MatcherAssert.assertThat(Integer.valueOf(mockInternalProcessorContext.forwarded().size()), Matchers.is(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), Matchers.is(new Record((Object) null, new SubscriptionWrapper((long[]) null, SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, "pk", 0), 0L)));
    }

    @Test
    public void leftJoinShouldPropagateNothingWhenOldAndNewLeftValueIsNull() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.leftJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        this.leftJoinProcessor.process(new Record("pk", new Change((Object) null, (Object) null), 0L));
        MatcherAssert.assertThat(mockInternalProcessorContext.forwarded(), Matchers.empty());
    }

    @Test
    public void innerJoinShouldPropagateNewPrimaryKey() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.innerJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        LeftValue leftValue = new LeftValue("fk1");
        this.innerJoinProcessor.process(new Record("pk", new Change(leftValue, (Object) null), 0L));
        MatcherAssert.assertThat(Integer.valueOf(mockInternalProcessorContext.forwarded().size()), Matchers.is(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), Matchers.is(new Record("fk1", new SubscriptionWrapper(hash(leftValue), SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, "pk", 0), 0L)));
    }

    @Test
    public void innerJoinShouldNotPropagateNewPrimaryKeyWithNullFK() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.innerJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        this.innerJoinProcessor.process(new Record("pk", new Change(new LeftValue(null), (Object) null), 0L));
        MatcherAssert.assertThat(mockInternalProcessorContext.forwarded(), Matchers.empty());
        Assertions.assertEquals(Double.valueOf(1.0d), ResponseJoinProcessorSupplierTest.getDroppedRecordsTotalMetric(mockInternalProcessorContext));
        Assertions.assertNotEquals(Double.valueOf(0.0d), ResponseJoinProcessorSupplierTest.getDroppedRecordsRateMetric(mockInternalProcessorContext));
    }

    @Test
    public void innerJoinShouldDeleteOldAndPropagateNewFK() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.innerJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        LeftValue leftValue = new LeftValue("fk2");
        this.innerJoinProcessor.process(new Record("pk", new Change(leftValue, new LeftValue("fk1")), 0L));
        MatcherAssert.assertThat(Integer.valueOf(mockInternalProcessorContext.forwarded().size()), Matchers.is(2));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), Matchers.is(new Record("fk1", new SubscriptionWrapper(hash(leftValue), SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE, "pk", 0), 0L)));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(1)).record(), Matchers.is(new Record("fk2", new SubscriptionWrapper(hash(leftValue), SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, "pk", 0), 0L)));
    }

    @Test
    public void innerJoinShouldPropagateNothingWhenOldAndNewFKIsNull() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.innerJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        LeftValue leftValue = new LeftValue(null);
        this.innerJoinProcessor.process(new Record("pk", new Change(leftValue, leftValue), 0L));
        MatcherAssert.assertThat(mockInternalProcessorContext.forwarded(), Matchers.empty());
        Assertions.assertEquals(Double.valueOf(1.0d), ResponseJoinProcessorSupplierTest.getDroppedRecordsTotalMetric(mockInternalProcessorContext));
        Assertions.assertNotEquals(Double.valueOf(0.0d), ResponseJoinProcessorSupplierTest.getDroppedRecordsRateMetric(mockInternalProcessorContext));
    }

    @Test
    public void innerJoinShouldPropagateDeletionOfPrimaryKey() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.innerJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        this.innerJoinProcessor.process(new Record("pk", new Change((Object) null, new LeftValue("fk1")), 0L));
        MatcherAssert.assertThat(Integer.valueOf(mockInternalProcessorContext.forwarded().size()), Matchers.is(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), Matchers.is(new Record("fk1", new SubscriptionWrapper((long[]) null, SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, "pk", 0), 0L)));
    }

    @Test
    public void innerJoinShouldPropagateNothingWhenOldAndNewLeftValueIsNull() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.innerJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        this.innerJoinProcessor.process(new Record("pk", new Change((Object) null, (Object) null), 0L));
        MatcherAssert.assertThat(mockInternalProcessorContext.forwarded(), Matchers.empty());
    }

    @Test
    public void biFunctionLeftJoinShouldPropagateNewPrimaryKeyWithNonNullFK() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.biFunctionLeftJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        LeftValue leftValue = new LeftValue("fk1");
        this.biFunctionLeftJoinProcessor.process(new Record("pk", new Change(leftValue, (Object) null), 0L));
        MatcherAssert.assertThat(Integer.valueOf(mockInternalProcessorContext.forwarded().size()), Matchers.is(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), Matchers.is(new Record("pkfk1", new SubscriptionWrapper(hash(leftValue), SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, "pk", 0), 0L)));
    }

    @Test
    public void biFunctionLeftJoinShouldPropagateNewPrimaryKeyWithNullFK() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.biFunctionLeftJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        LeftValue leftValue = new LeftValue(null);
        this.biFunctionLeftJoinProcessor.process(new Record("pk", new Change(leftValue, (Object) null), 0L));
        MatcherAssert.assertThat(Integer.valueOf(mockInternalProcessorContext.forwarded().size()), Matchers.is(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), Matchers.is(new Record((Object) null, new SubscriptionWrapper(hash(leftValue), SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, "pk", 0), 0L)));
    }

    @Test
    public void biFunctionLeftJoinShouldPropagateChangeOfFKFromNonNullToNonNullValue() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.biFunctionLeftJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        LeftValue leftValue = new LeftValue("fk2");
        this.biFunctionLeftJoinProcessor.process(new Record("pk", new Change(leftValue, new LeftValue("fk1")), 0L));
        MatcherAssert.assertThat(Integer.valueOf(mockInternalProcessorContext.forwarded().size()), Matchers.is(2));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(1)).record(), Matchers.is(new Record("pkfk2", new SubscriptionWrapper(hash(leftValue), SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, "pk", 0), 0L)));
    }

    @Test
    public void biFunctionLeftJoinShouldPropagateNewRecordOfUnchangedFK() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.biFunctionLeftJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        LeftValue leftValue = new LeftValue("fk1");
        this.biFunctionLeftJoinProcessor.process(new Record("pk", new Change(leftValue, leftValue), 0L));
        MatcherAssert.assertThat(Integer.valueOf(mockInternalProcessorContext.forwarded().size()), Matchers.is(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), Matchers.is(new Record("pkfk1", new SubscriptionWrapper(hash(leftValue), SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, "pk", 0), 0L)));
    }

    @Test
    public void biFunctionLeftJoinShouldPropagateChangeOfFKFromNonNullToNullValue() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.biFunctionLeftJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        LeftValue leftValue = new LeftValue(null);
        this.biFunctionLeftJoinProcessor.process(new Record("pk", new Change(leftValue, new LeftValue("fk1")), 0L));
        MatcherAssert.assertThat(Integer.valueOf(mockInternalProcessorContext.forwarded().size()), Matchers.greaterThan(0));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), Matchers.is(new Record("pkfk1", new SubscriptionWrapper(hash(leftValue), SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, "pk", 0), 0L)));
    }

    @Test
    public void biFunctionLeftJoinShouldPropagateChangeFromNullFKToNonNullFKValue() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.biFunctionLeftJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        LeftValue leftValue = new LeftValue("fk1");
        this.biFunctionLeftJoinProcessor.process(new Record("pk", new Change(leftValue, new LeftValue(null)), 0L));
        MatcherAssert.assertThat(Integer.valueOf(mockInternalProcessorContext.forwarded().size()), Matchers.is(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), Matchers.is(new Record("pkfk1", new SubscriptionWrapper(hash(leftValue), SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, "pk", 0), 0L)));
    }

    @Test
    public void biFunctionLeftJoinShouldPropagateChangeFromNullFKToNullFKValue() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.biFunctionLeftJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        LeftValue leftValue = new LeftValue(null);
        this.biFunctionLeftJoinProcessor.process(new Record("pk", new Change(leftValue, leftValue), 0L));
        MatcherAssert.assertThat(Integer.valueOf(mockInternalProcessorContext.forwarded().size()), Matchers.is(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), Matchers.is(new Record((Object) null, new SubscriptionWrapper(hash(leftValue), SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, "pk", 0), 0L)));
    }

    @Test
    public void biFunctionLeftJoinShouldPropagateDeletionOfAPrimaryKey() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.biFunctionLeftJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        this.biFunctionLeftJoinProcessor.process(new Record("pk", new Change((Object) null, new LeftValue("fk1")), 0L));
        MatcherAssert.assertThat(Integer.valueOf(mockInternalProcessorContext.forwarded().size()), Matchers.greaterThan(0));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), Matchers.is(new Record("pkfk1", new SubscriptionWrapper((long[]) null, SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, "pk", 0), 0L)));
    }

    @Test
    public void biFunctionLeftJoinShouldPropagateDeletionOfAPrimaryKeyThatHadNullFK() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.biFunctionLeftJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        this.biFunctionLeftJoinProcessor.process(new Record("pk", new Change((Object) null, new LeftValue(null)), 0L));
        MatcherAssert.assertThat(Integer.valueOf(mockInternalProcessorContext.forwarded().size()), Matchers.is(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), Matchers.is(new Record((Object) null, new SubscriptionWrapper((long[]) null, SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, "pk", 0), 0L)));
    }

    @Test
    public void biFunctionLeftJoinShouldPropagateNothingWhenOldAndNewLeftValueIsNull() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.biFunctionLeftJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        this.biFunctionLeftJoinProcessor.process(new Record("pk", new Change((Object) null, (Object) null), 0L));
        MatcherAssert.assertThat(mockInternalProcessorContext.forwarded(), Matchers.empty());
    }

    @Test
    public void biFunctionInnerJoinShouldPropagateNewPrimaryKey() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.biFunctionInnerJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        LeftValue leftValue = new LeftValue("fk1");
        this.biFunctionInnerJoinProcessor.process(new Record("pk", new Change(leftValue, (Object) null), 0L));
        MatcherAssert.assertThat(Integer.valueOf(mockInternalProcessorContext.forwarded().size()), Matchers.is(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), Matchers.is(new Record("pkfk1", new SubscriptionWrapper(hash(leftValue), SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, "pk", 0), 0L)));
    }

    @Test
    public void biFunctionInnerJoinShouldNotPropagateNewPrimaryKeyWithNullFK() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.biFunctionInnerJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        this.biFunctionInnerJoinProcessor.process(new Record("pk", new Change(new LeftValue(null), (Object) null), 0L));
        MatcherAssert.assertThat(mockInternalProcessorContext.forwarded(), Matchers.empty());
        Assertions.assertEquals(Double.valueOf(1.0d), ResponseJoinProcessorSupplierTest.getDroppedRecordsTotalMetric(mockInternalProcessorContext));
        Assertions.assertNotEquals(Double.valueOf(0.0d), ResponseJoinProcessorSupplierTest.getDroppedRecordsRateMetric(mockInternalProcessorContext));
    }

    @Test
    public void biFunctionInnerJoinShouldDeleteOldAndPropagateNewFK() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.biFunctionInnerJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        LeftValue leftValue = new LeftValue("fk2");
        this.biFunctionInnerJoinProcessor.process(new Record("pk", new Change(leftValue, new LeftValue("fk1")), 0L));
        MatcherAssert.assertThat(Integer.valueOf(mockInternalProcessorContext.forwarded().size()), Matchers.is(2));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), Matchers.is(new Record("pkfk1", new SubscriptionWrapper(hash(leftValue), SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE, "pk", 0), 0L)));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(1)).record(), Matchers.is(new Record("pkfk2", new SubscriptionWrapper(hash(leftValue), SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, "pk", 0), 0L)));
    }

    @Test
    public void biFunctionInnerJoinShouldPropagateNothingWhenOldAndNewFKIsNull() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.biFunctionInnerJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        LeftValue leftValue = new LeftValue(null);
        this.biFunctionInnerJoinProcessor.process(new Record("pk", new Change(leftValue, leftValue), 0L));
        MatcherAssert.assertThat(mockInternalProcessorContext.forwarded(), Matchers.empty());
        Assertions.assertEquals(Double.valueOf(1.0d), ResponseJoinProcessorSupplierTest.getDroppedRecordsTotalMetric(mockInternalProcessorContext));
        Assertions.assertNotEquals(Double.valueOf(0.0d), ResponseJoinProcessorSupplierTest.getDroppedRecordsRateMetric(mockInternalProcessorContext));
    }

    @Test
    public void biFunctionInnerJoinShouldPropagateDeletionOfPrimaryKey() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.biFunctionInnerJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        this.biFunctionInnerJoinProcessor.process(new Record("pk", new Change((Object) null, new LeftValue("fk1")), 0L));
        MatcherAssert.assertThat(Integer.valueOf(mockInternalProcessorContext.forwarded().size()), Matchers.is(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), Matchers.is(new Record("pkfk1", new SubscriptionWrapper((long[]) null, SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, "pk", 0), 0L)));
    }

    @Test
    public void biFunctionInnerJoinShouldPropagateNothingWhenOldAndNewLeftValueIsNull() {
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext();
        this.biFunctionInnerJoinProcessor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        this.biFunctionInnerJoinProcessor.process(new Record("pk", new Change((Object) null, (Object) null), 0L));
        MatcherAssert.assertThat(mockInternalProcessorContext.forwarded(), Matchers.empty());
    }

    private static long[] hash(LeftValue leftValue) {
        return Murmur3.hash128(new LeftValueSerializer().serialize("value-serde-topic", leftValue));
    }
}
