package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyTestDriverWrapper;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableFilterTest.class */
public class KTableFilterTest {
    private final Consumed<String, Integer> consumed = Consumed.with(Serdes.String(), Serdes.Integer());
    private final Properties props = StreamsTestUtils.getStreamsConfig((Serde<?>) Serdes.String(), (Serde<?>) Serdes.Integer());
    private final Predicate<String, Integer> predicate = (str, num) -> {
        return num.intValue() % 2 == 0;
    };

    @BeforeEach
    public void setUp() {
        this.props.setProperty("statestore.cache.max.bytes", "0");
    }

    private void doTestKTable(StreamsBuilder streamsBuilder, KTable<String, Integer> kTable, KTable<String, Integer> kTable2, String str) {
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        kTable.toStream().process(mockApiProcessorSupplier, new String[0]);
        kTable2.toStream().process(mockApiProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(str, new StringSerializer(), new IntegerSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            createInputTopic.pipeInput("A", 1, 10L);
            createInputTopic.pipeInput("B", 2, 5L);
            createInputTopic.pipeInput("C", 3, 8L);
            createInputTopic.pipeInput("D", 4, 14L);
            createInputTopic.pipeInput("A", (Object) null, 18L);
            createInputTopic.pipeInput("B", (Object) null, 15L);
            topologyTestDriver.close();
            List capturedProcessors = mockApiProcessorSupplier.capturedProcessors(2);
            ((MockApiProcessor) capturedProcessors.get(0)).checkAndClearProcessResult(new KeyValueTimestamp<>("A", null, 10L), new KeyValueTimestamp<>("B", 2, 5L), new KeyValueTimestamp<>("C", null, 8L), new KeyValueTimestamp<>("D", 4, 14L), new KeyValueTimestamp<>("A", null, 18L), new KeyValueTimestamp<>("B", null, 15L));
            ((MockApiProcessor) capturedProcessors.get(1)).checkAndClearProcessResult(new KeyValueTimestamp<>("A", 1, 10L), new KeyValueTimestamp<>("B", null, 5L), new KeyValueTimestamp<>("C", 3, 8L), new KeyValueTimestamp<>("D", null, 14L), new KeyValueTimestamp<>("A", null, 18L), new KeyValueTimestamp<>("B", null, 15L));
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldPassThroughWithoutMaterialization() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable table = streamsBuilder.table(AssignmentTestUtils.TP_1_NAME, this.consumed);
        KTable<String, Integer> filter = table.filter(this.predicate);
        KTable<String, Integer> filterNot = table.filterNot(this.predicate);
        Assertions.assertNull(table.queryableStoreName());
        Assertions.assertNull(filter.queryableStoreName());
        Assertions.assertNull(filterNot.queryableStoreName());
        doTestKTable(streamsBuilder, filter, filterNot, AssignmentTestUtils.TP_1_NAME);
    }

    @Test
    public void shouldPassThroughOnMaterialization() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable table = streamsBuilder.table(AssignmentTestUtils.TP_1_NAME, this.consumed);
        KTable<String, Integer> filter = table.filter(this.predicate, Materialized.as("store2"));
        KTable<String, Integer> filterNot = table.filterNot(this.predicate);
        Assertions.assertNull(table.queryableStoreName());
        Assertions.assertEquals("store2", filter.queryableStoreName());
        Assertions.assertNull(filterNot.queryableStoreName());
        doTestKTable(streamsBuilder, filter, filterNot, AssignmentTestUtils.TP_1_NAME);
    }

    private void doTestValueGetter(StreamsBuilder streamsBuilder, KTableImpl<String, Integer, Integer> kTableImpl, KTableImpl<String, Integer, Integer> kTableImpl2, String str) {
        Topology build = streamsBuilder.build();
        KTableValueGetterSupplier valueGetterSupplier = kTableImpl.valueGetterSupplier();
        KTableValueGetterSupplier valueGetterSupplier2 = kTableImpl2.valueGetterSupplier();
        InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(build);
        internalTopologyBuilder.connectProcessorAndStateStores(kTableImpl.name, valueGetterSupplier.storeNames());
        internalTopologyBuilder.connectProcessorAndStateStores(kTableImpl2.name, valueGetterSupplier2.storeNames());
        TopologyTestDriverWrapper topologyTestDriverWrapper = new TopologyTestDriverWrapper(build, this.props);
        try {
            TestInputTopic createInputTopic = topologyTestDriverWrapper.createInputTopic(str, new StringSerializer(), new IntegerSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            KTableValueGetter kTableValueGetter = valueGetterSupplier.get();
            KTableValueGetter kTableValueGetter2 = valueGetterSupplier2.get();
            kTableValueGetter.init(topologyTestDriverWrapper.setCurrentNodeForProcessorContext(kTableImpl.name));
            kTableValueGetter2.init(topologyTestDriverWrapper.setCurrentNodeForProcessorContext(kTableImpl2.name));
            createInputTopic.pipeInput("A", 1, 5L);
            createInputTopic.pipeInput("B", 1, 10L);
            createInputTopic.pipeInput("C", 1, 15L);
            Assertions.assertNull(kTableValueGetter.get("A"));
            Assertions.assertNull(kTableValueGetter.get("B"));
            Assertions.assertNull(kTableValueGetter.get("C"));
            Assertions.assertEquals(ValueAndTimestamp.make(1, 5L), kTableValueGetter2.get("A"));
            Assertions.assertEquals(ValueAndTimestamp.make(1, 10L), kTableValueGetter2.get("B"));
            Assertions.assertEquals(ValueAndTimestamp.make(1, 15L), kTableValueGetter2.get("C"));
            createInputTopic.pipeInput("A", 2, 10L);
            createInputTopic.pipeInput("B", 2, 5L);
            Assertions.assertEquals(ValueAndTimestamp.make(2, 10L), kTableValueGetter.get("A"));
            Assertions.assertEquals(ValueAndTimestamp.make(2, 5L), kTableValueGetter.get("B"));
            Assertions.assertNull(kTableValueGetter.get("C"));
            Assertions.assertNull(kTableValueGetter2.get("A"));
            Assertions.assertNull(kTableValueGetter2.get("B"));
            Assertions.assertEquals(ValueAndTimestamp.make(1, 15L), kTableValueGetter2.get("C"));
            createInputTopic.pipeInput("A", 3, 15L);
            Assertions.assertNull(kTableValueGetter.get("A"));
            Assertions.assertEquals(ValueAndTimestamp.make(2, 5L), kTableValueGetter.get("B"));
            Assertions.assertNull(kTableValueGetter.get("C"));
            Assertions.assertEquals(ValueAndTimestamp.make(3, 15L), kTableValueGetter2.get("A"));
            Assertions.assertNull(kTableValueGetter2.get("B"));
            Assertions.assertEquals(ValueAndTimestamp.make(1, 15L), kTableValueGetter2.get("C"));
            createInputTopic.pipeInput("A", (Object) null, 10L);
            createInputTopic.pipeInput("B", (Object) null, 20L);
            Assertions.assertNull(kTableValueGetter.get("A"));
            Assertions.assertNull(kTableValueGetter.get("B"));
            Assertions.assertNull(kTableValueGetter.get("C"));
            Assertions.assertNull(kTableValueGetter2.get("A"));
            Assertions.assertNull(kTableValueGetter2.get("B"));
            Assertions.assertEquals(ValueAndTimestamp.make(1, 15L), kTableValueGetter2.get("C"));
            topologyTestDriverWrapper.close();
        } catch (Throwable th) {
            try {
                topologyTestDriverWrapper.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldGetValuesOnMaterialization() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl table = streamsBuilder.table(AssignmentTestUtils.TP_1_NAME, this.consumed);
        KTableImpl<String, Integer, Integer> kTableImpl = (KTableImpl) table.filter(this.predicate, Materialized.as("store2"));
        KTableImpl<String, Integer, Integer> kTableImpl2 = (KTableImpl) table.filterNot(this.predicate, Materialized.as("store3"));
        KTableImpl filterNot = table.filterNot(this.predicate);
        Assertions.assertNull(table.queryableStoreName());
        Assertions.assertEquals("store2", kTableImpl.queryableStoreName());
        Assertions.assertEquals("store3", kTableImpl2.queryableStoreName());
        Assertions.assertNull(filterNot.queryableStoreName());
        doTestValueGetter(streamsBuilder, kTableImpl, kTableImpl2, AssignmentTestUtils.TP_1_NAME);
    }

    private void doTestNotSendingOldValue(StreamsBuilder streamsBuilder, KTableImpl<String, Integer, Integer> kTableImpl, KTableImpl<String, Integer, Integer> kTableImpl2, String str) {
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        streamsBuilder.build().addProcessor("proc1", mockApiProcessorSupplier, new String[]{kTableImpl.name});
        streamsBuilder.build().addProcessor("proc2", mockApiProcessorSupplier, new String[]{kTableImpl2.name});
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(str, new StringSerializer(), new IntegerSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            createInputTopic.pipeInput("A", 1, 5L);
            createInputTopic.pipeInput("B", 1, 10L);
            createInputTopic.pipeInput("C", 1, 15L);
            List capturedProcessors = mockApiProcessorSupplier.capturedProcessors(2);
            ((MockApiProcessor) capturedProcessors.get(0)).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change(1, (Object) null), 5L), new KeyValueTimestamp<>("B", new Change(1, (Object) null), 10L), new KeyValueTimestamp<>("C", new Change(1, (Object) null), 15L));
            ((MockApiProcessor) capturedProcessors.get(1)).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change((Object) null, (Object) null), 5L), new KeyValueTimestamp<>("B", new Change((Object) null, (Object) null), 10L), new KeyValueTimestamp<>("C", new Change((Object) null, (Object) null), 15L));
            createInputTopic.pipeInput("A", 2, 15L);
            createInputTopic.pipeInput("B", 2, 8L);
            ((MockApiProcessor) capturedProcessors.get(0)).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change(2, (Object) null), 15L), new KeyValueTimestamp<>("B", new Change(2, (Object) null), 8L));
            ((MockApiProcessor) capturedProcessors.get(1)).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change(2, (Object) null), 15L), new KeyValueTimestamp<>("B", new Change(2, (Object) null), 8L));
            createInputTopic.pipeInput("A", 3, 20L);
            ((MockApiProcessor) capturedProcessors.get(0)).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change(3, (Object) null), 20L));
            ((MockApiProcessor) capturedProcessors.get(1)).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change((Object) null, (Object) null), 20L));
            createInputTopic.pipeInput("A", (Object) null, 10L);
            createInputTopic.pipeInput("B", (Object) null, 20L);
            ((MockApiProcessor) capturedProcessors.get(0)).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change((Object) null, (Object) null), 10L), new KeyValueTimestamp<>("B", new Change((Object) null, (Object) null), 20L));
            ((MockApiProcessor) capturedProcessors.get(1)).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change((Object) null, (Object) null), 10L), new KeyValueTimestamp<>("B", new Change((Object) null, (Object) null), 20L));
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldNotSendOldValuesWithoutMaterialization() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl<String, Integer, Integer> kTableImpl = (KTableImpl) streamsBuilder.table(AssignmentTestUtils.TP_1_NAME, this.consumed);
        doTestNotSendingOldValue(streamsBuilder, kTableImpl, (KTableImpl) kTableImpl.filter(this.predicate), AssignmentTestUtils.TP_1_NAME);
    }

    @Test
    public void shouldNotSendOldValuesOnMaterialization() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl<String, Integer, Integer> kTableImpl = (KTableImpl) streamsBuilder.table(AssignmentTestUtils.TP_1_NAME, this.consumed);
        doTestNotSendingOldValue(streamsBuilder, kTableImpl, (KTableImpl) kTableImpl.filter(this.predicate, Materialized.as("store2")), AssignmentTestUtils.TP_1_NAME);
    }

    @Test
    public void shouldNotEnableSendingOldValuesIfNotAlreadyMaterializedAndNotForcedToMaterialize() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl<String, Integer, Integer> kTableImpl = (KTableImpl) streamsBuilder.table(AssignmentTestUtils.TP_1_NAME, this.consumed);
        KTableImpl<String, Integer, Integer> kTableImpl2 = (KTableImpl) kTableImpl.filter(this.predicate);
        kTableImpl2.enableSendingOldValues(false);
        doTestNotSendingOldValue(streamsBuilder, kTableImpl, kTableImpl2, AssignmentTestUtils.TP_1_NAME);
    }

    private void doTestSendingOldValue(StreamsBuilder streamsBuilder, KTableImpl<String, Integer, Integer> kTableImpl, KTableImpl<String, Integer, Integer> kTableImpl2, String str) {
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        Topology build = streamsBuilder.build();
        build.addProcessor("proc1", mockApiProcessorSupplier, new String[]{kTableImpl.name});
        build.addProcessor("proc2", mockApiProcessorSupplier, new String[]{kTableImpl2.name});
        boolean sendingOldValueEnabled = kTableImpl.sendingOldValueEnabled();
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(build, this.props);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(str, new StringSerializer(), new IntegerSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            createInputTopic.pipeInput("A", 1, 5L);
            createInputTopic.pipeInput("B", 1, 10L);
            createInputTopic.pipeInput("C", 1, 15L);
            List capturedProcessors = mockApiProcessorSupplier.capturedProcessors(2);
            MockApiProcessor mockApiProcessor = (MockApiProcessor) capturedProcessors.get(0);
            MockApiProcessor mockApiProcessor2 = (MockApiProcessor) capturedProcessors.get(1);
            mockApiProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change(1, (Object) null), 5L), new KeyValueTimestamp<>("B", new Change(1, (Object) null), 10L), new KeyValueTimestamp<>("C", new Change(1, (Object) null), 15L));
            mockApiProcessor2.checkEmptyAndClearProcessResult();
            createInputTopic.pipeInput("A", 2, 15L);
            createInputTopic.pipeInput("B", 2, 8L);
            KeyValueTimestamp<?, ?>[] keyValueTimestampArr = new KeyValueTimestamp[2];
            keyValueTimestampArr[0] = new KeyValueTimestamp<>("A", new Change(2, sendingOldValueEnabled ? 1 : null), 15L);
            keyValueTimestampArr[1] = new KeyValueTimestamp<>("B", new Change(2, sendingOldValueEnabled ? 1 : null), 8L);
            mockApiProcessor.checkAndClearProcessResult(keyValueTimestampArr);
            mockApiProcessor2.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change(2, (Object) null), 15L), new KeyValueTimestamp<>("B", new Change(2, (Object) null), 8L));
            createInputTopic.pipeInput("A", 3, 20L);
            KeyValueTimestamp<?, ?>[] keyValueTimestampArr2 = new KeyValueTimestamp[1];
            keyValueTimestampArr2[0] = new KeyValueTimestamp<>("A", new Change(3, sendingOldValueEnabled ? 2 : null), 20L);
            mockApiProcessor.checkAndClearProcessResult(keyValueTimestampArr2);
            mockApiProcessor2.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change((Object) null, 2), 20L));
            createInputTopic.pipeInput("A", (Object) null, 10L);
            createInputTopic.pipeInput("B", (Object) null, 20L);
            KeyValueTimestamp<?, ?>[] keyValueTimestampArr3 = new KeyValueTimestamp[2];
            keyValueTimestampArr3[0] = new KeyValueTimestamp<>("A", new Change((Object) null, sendingOldValueEnabled ? 3 : null), 10L);
            keyValueTimestampArr3[1] = new KeyValueTimestamp<>("B", new Change((Object) null, sendingOldValueEnabled ? 2 : null), 20L);
            mockApiProcessor.checkAndClearProcessResult(keyValueTimestampArr3);
            mockApiProcessor2.checkAndClearProcessResult(new KeyValueTimestamp<>("B", new Change((Object) null, 2), 20L));
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldEnableSendOldValuesWhenNotMaterializedAlreadyButForcedToMaterialize() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl<String, Integer, Integer> kTableImpl = (KTableImpl) streamsBuilder.table(AssignmentTestUtils.TP_1_NAME, this.consumed);
        KTableImpl<String, Integer, Integer> kTableImpl2 = (KTableImpl) kTableImpl.filter(this.predicate);
        kTableImpl2.enableSendingOldValues(true);
        MatcherAssert.assertThat(Boolean.valueOf(kTableImpl.sendingOldValueEnabled()), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(kTableImpl2.sendingOldValueEnabled()), Matchers.is(true));
        doTestSendingOldValue(streamsBuilder, kTableImpl, kTableImpl2, AssignmentTestUtils.TP_1_NAME);
    }

    @Test
    public void shouldEnableSendOldValuesWhenMaterializedAlreadyAndForcedToMaterialize() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl<String, Integer, Integer> kTableImpl = (KTableImpl) streamsBuilder.table(AssignmentTestUtils.TP_1_NAME, this.consumed);
        KTableImpl<String, Integer, Integer> kTableImpl2 = (KTableImpl) kTableImpl.filter(this.predicate, Materialized.as("store2"));
        kTableImpl2.enableSendingOldValues(true);
        MatcherAssert.assertThat(Boolean.valueOf(kTableImpl.sendingOldValueEnabled()), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(kTableImpl2.sendingOldValueEnabled()), Matchers.is(true));
        doTestSendingOldValue(streamsBuilder, kTableImpl, kTableImpl2, AssignmentTestUtils.TP_1_NAME);
    }

    @Test
    public void shouldSendOldValuesWhenEnabledOnUpStreamMaterialization() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl<String, Integer, Integer> kTableImpl = (KTableImpl) streamsBuilder.table(AssignmentTestUtils.TP_1_NAME, this.consumed, Materialized.as("store2"));
        KTableImpl<String, Integer, Integer> kTableImpl2 = (KTableImpl) kTableImpl.filter(this.predicate);
        kTableImpl2.enableSendingOldValues(false);
        MatcherAssert.assertThat(Boolean.valueOf(kTableImpl.sendingOldValueEnabled()), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(kTableImpl2.sendingOldValueEnabled()), Matchers.is(true));
        doTestSendingOldValue(streamsBuilder, kTableImpl, kTableImpl2, AssignmentTestUtils.TP_1_NAME);
    }

    private void doTestSkipNullOnMaterialization(StreamsBuilder streamsBuilder, KTableImpl<String, String, String> kTableImpl, KTableImpl<String, String, String> kTableImpl2, String str, boolean z) {
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        Topology build = streamsBuilder.build();
        build.addProcessor("proc1", mockApiProcessorSupplier, new String[]{kTableImpl.name});
        build.addProcessor("proc2", mockApiProcessorSupplier, new String[]{kTableImpl2.name});
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(build, this.props);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(str, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            createInputTopic.pipeInput("A", "reject", 5L);
            createInputTopic.pipeInput("B", "reject", 10L);
            createInputTopic.pipeInput("C", "reject", 20L);
            topologyTestDriver.close();
            List capturedProcessors = mockApiProcessorSupplier.capturedProcessors(2);
            ((MockApiProcessor) capturedProcessors.get(0)).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change("reject", (Object) null), 5L), new KeyValueTimestamp<>("B", new Change("reject", (Object) null), 10L), new KeyValueTimestamp<>("C", new Change("reject", (Object) null), 20L));
            if (z) {
                ((MockApiProcessor) capturedProcessors.get(1)).checkEmptyAndClearProcessResult();
            } else {
                ((MockApiProcessor) capturedProcessors.get(1)).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change((Object) null, (Object) null), 5L), new KeyValueTimestamp<>("B", new Change((Object) null, (Object) null), 10L), new KeyValueTimestamp<>("C", new Change((Object) null, (Object) null), 20L));
            }
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldSkipNullToRepartitionWithoutMaterialization() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl<String, String, String> kTableImpl = (KTableImpl) streamsBuilder.table(AssignmentTestUtils.TP_1_NAME, Consumed.with(Serdes.String(), Serdes.String()));
        KTableImpl<String, String, String> kTableImpl2 = (KTableImpl) kTableImpl.filter((str, str2) -> {
            return str2.equalsIgnoreCase("accept");
        });
        kTableImpl2.groupBy(MockMapper.noOpKeyValueMapper()).reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER);
        doTestSkipNullOnMaterialization(streamsBuilder, kTableImpl, kTableImpl2, AssignmentTestUtils.TP_1_NAME, true);
    }

    @Test
    public void shouldSkipNullToRepartitionOnMaterialization() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl<String, String, String> kTableImpl = (KTableImpl) streamsBuilder.table(AssignmentTestUtils.TP_1_NAME, Consumed.with(Serdes.String(), Serdes.String()));
        KTableImpl<String, String, String> kTableImpl2 = (KTableImpl) kTableImpl.filter((str, str2) -> {
            return str2.equalsIgnoreCase("accept");
        }, Materialized.as("store2"));
        kTableImpl2.groupBy(MockMapper.noOpKeyValueMapper()).reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, Materialized.as("mock-result"));
        doTestSkipNullOnMaterialization(streamsBuilder, kTableImpl, kTableImpl2, AssignmentTestUtils.TP_1_NAME, true);
    }

    @Test
    public void shouldNotSkipNullIfVersionedUpstream() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl<String, String, String> kTableImpl = (KTableImpl) streamsBuilder.table(AssignmentTestUtils.TP_1_NAME, Consumed.with(Serdes.String(), Serdes.String()), Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5L))));
        KTableImpl<String, String, String> kTableImpl2 = (KTableImpl) kTableImpl.filter((str, str2) -> {
            return str2.equalsIgnoreCase("accept");
        });
        kTableImpl2.groupBy(MockMapper.noOpKeyValueMapper()).reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER);
        doTestSkipNullOnMaterialization(streamsBuilder, kTableImpl, kTableImpl2, AssignmentTestUtils.TP_1_NAME, false);
    }

    @Test
    public void shouldSkipNullIfVersionedDownstream() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        Materialized as = Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5L)));
        KTableImpl<String, String, String> kTableImpl = (KTableImpl) streamsBuilder.table(AssignmentTestUtils.TP_1_NAME, Consumed.with(Serdes.String(), Serdes.String()), Materialized.as("store"));
        KTableImpl<String, String, String> kTableImpl2 = (KTableImpl) kTableImpl.filter((str, str2) -> {
            return str2.equalsIgnoreCase("accept");
        }, as);
        kTableImpl2.groupBy(MockMapper.noOpKeyValueMapper()).reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER);
        doTestSkipNullOnMaterialization(streamsBuilder, kTableImpl, kTableImpl2, AssignmentTestUtils.TP_1_NAME, true);
    }

    @Test
    public void testTypeVariance() {
        Predicate predicate = (number, obj) -> {
            return false;
        };
        new StreamsBuilder().table("empty").filter(predicate).filterNot(predicate).toStream().to("nirvana");
    }
}
