package org.apache.kafka.streams.kstream.internals;

import java.io.File;
import java.lang.reflect.Field;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableImplTest.class */
public class KTableImplTest {
    private final Serde<String> stringSerde = Serdes.String();
    private final Consumed<String, String> consumed = Consumed.with(this.stringSerde, this.stringSerde);

    @Rule
    public final KStreamTestDriver driver = new KStreamTestDriver();
    private File stateDir = null;
    private StreamsBuilder builder;
    private KTable<String, String> table;

    @Before
    public void setUp() {
        this.stateDir = TestUtils.tempDirectory("kafka-test");
        this.builder = new StreamsBuilder();
        this.table = this.builder.table("test");
    }

    @Test
    public void testKTable() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable table = streamsBuilder.table("topic1", this.consumed);
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        table.toStream().process(mockProcessorSupplier, new String[0]);
        KTable mapValues = table.mapValues(new ValueMapper<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableImplTest.1
            public Integer apply(String str) {
                return new Integer(str);
            }
        });
        MockProcessorSupplier mockProcessorSupplier2 = new MockProcessorSupplier();
        mapValues.toStream().process(mockProcessorSupplier2, new String[0]);
        KTable filter = mapValues.filter(new Predicate<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableImplTest.2
            public boolean test(String str, Integer num) {
                return num.intValue() % 2 == 0;
            }
        });
        MockProcessorSupplier mockProcessorSupplier3 = new MockProcessorSupplier();
        filter.toStream().process(mockProcessorSupplier3, new String[0]);
        KTable through = table.through(this.stringSerde, this.stringSerde, "topic2", "storeName2");
        MockProcessorSupplier mockProcessorSupplier4 = new MockProcessorSupplier();
        through.toStream().process(mockProcessorSupplier4, new String[0]);
        this.driver.setUp(streamsBuilder, this.stateDir);
        this.driver.process("topic1", "A", "01");
        this.driver.flushState();
        this.driver.process("topic1", "B", "02");
        this.driver.flushState();
        this.driver.process("topic1", "C", "03");
        this.driver.flushState();
        this.driver.process("topic1", "D", "04");
        this.driver.flushState();
        this.driver.flushState();
        Assert.assertEquals(Utils.mkList(new String[]{"A:01", "B:02", "C:03", "D:04"}), mockProcessorSupplier.processed);
        Assert.assertEquals(Utils.mkList(new String[]{"A:1", "B:2", "C:3", "D:4"}), mockProcessorSupplier2.processed);
        Assert.assertEquals(Utils.mkList(new String[]{"A:null", "B:2", "C:null", "D:4"}), mockProcessorSupplier3.processed);
        Assert.assertEquals(Utils.mkList(new String[]{"A:01", "B:02", "C:03", "D:04"}), mockProcessorSupplier4.processed);
    }

    @Test
    public void testValueGetter() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl table = streamsBuilder.table("topic1", this.consumed);
        KTableImpl mapValues = table.mapValues(new ValueMapper<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableImplTest.3
            public Integer apply(String str) {
                return new Integer(str);
            }
        });
        KTableImpl filter = mapValues.filter(new Predicate<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableImplTest.4
            public boolean test(String str, Integer num) {
                return num.intValue() % 2 == 0;
            }
        });
        KTableImpl through = table.through(this.stringSerde, this.stringSerde, "topic2", "storeName2");
        KTableValueGetterSupplier valueGetterSupplier = table.valueGetterSupplier();
        KTableValueGetterSupplier valueGetterSupplier2 = mapValues.valueGetterSupplier();
        KTableValueGetterSupplier valueGetterSupplier3 = filter.valueGetterSupplier();
        KTableValueGetterSupplier valueGetterSupplier4 = through.valueGetterSupplier();
        this.driver.setUp(streamsBuilder, this.stateDir, (Serde<?>) null, (Serde<?>) null);
        Assert.assertEquals(2L, this.driver.allStateStores().size());
        KTableValueGetter kTableValueGetter = valueGetterSupplier.get();
        kTableValueGetter.init(this.driver.context());
        KTableValueGetter kTableValueGetter2 = valueGetterSupplier2.get();
        kTableValueGetter2.init(this.driver.context());
        KTableValueGetter kTableValueGetter3 = valueGetterSupplier3.get();
        kTableValueGetter3.init(this.driver.context());
        KTableValueGetter kTableValueGetter4 = valueGetterSupplier4.get();
        kTableValueGetter4.init(this.driver.context());
        this.driver.process("topic1", "A", "01");
        this.driver.process("topic1", "B", "01");
        this.driver.process("topic1", "C", "01");
        this.driver.flushState();
        Assert.assertEquals("01", kTableValueGetter.get("A"));
        Assert.assertEquals("01", kTableValueGetter.get("B"));
        Assert.assertEquals("01", kTableValueGetter.get("C"));
        Assert.assertEquals(new Integer(1), kTableValueGetter2.get("A"));
        Assert.assertEquals(new Integer(1), kTableValueGetter2.get("B"));
        Assert.assertEquals(new Integer(1), kTableValueGetter2.get("C"));
        Assert.assertNull(kTableValueGetter3.get("A"));
        Assert.assertNull(kTableValueGetter3.get("B"));
        Assert.assertNull(kTableValueGetter3.get("C"));
        Assert.assertEquals("01", kTableValueGetter4.get("A"));
        Assert.assertEquals("01", kTableValueGetter4.get("B"));
        Assert.assertEquals("01", kTableValueGetter4.get("C"));
        this.driver.process("topic1", "A", "02");
        this.driver.process("topic1", "B", "02");
        this.driver.flushState();
        Assert.assertEquals("02", kTableValueGetter.get("A"));
        Assert.assertEquals("02", kTableValueGetter.get("B"));
        Assert.assertEquals("01", kTableValueGetter.get("C"));
        Assert.assertEquals(new Integer(2), kTableValueGetter2.get("A"));
        Assert.assertEquals(new Integer(2), kTableValueGetter2.get("B"));
        Assert.assertEquals(new Integer(1), kTableValueGetter2.get("C"));
        Assert.assertEquals(new Integer(2), kTableValueGetter3.get("A"));
        Assert.assertEquals(new Integer(2), kTableValueGetter3.get("B"));
        Assert.assertNull(kTableValueGetter3.get("C"));
        Assert.assertEquals("02", kTableValueGetter4.get("A"));
        Assert.assertEquals("02", kTableValueGetter4.get("B"));
        Assert.assertEquals("01", kTableValueGetter4.get("C"));
        this.driver.process("topic1", "A", "03");
        this.driver.flushState();
        Assert.assertEquals("03", kTableValueGetter.get("A"));
        Assert.assertEquals("02", kTableValueGetter.get("B"));
        Assert.assertEquals("01", kTableValueGetter.get("C"));
        Assert.assertEquals(new Integer(3), kTableValueGetter2.get("A"));
        Assert.assertEquals(new Integer(2), kTableValueGetter2.get("B"));
        Assert.assertEquals(new Integer(1), kTableValueGetter2.get("C"));
        Assert.assertNull(kTableValueGetter3.get("A"));
        Assert.assertEquals(new Integer(2), kTableValueGetter3.get("B"));
        Assert.assertNull(kTableValueGetter3.get("C"));
        Assert.assertEquals("03", kTableValueGetter4.get("A"));
        Assert.assertEquals("02", kTableValueGetter4.get("B"));
        Assert.assertEquals("01", kTableValueGetter4.get("C"));
        this.driver.process("topic1", "A", null);
        this.driver.flushState();
        Assert.assertNull(kTableValueGetter.get("A"));
        Assert.assertEquals("02", kTableValueGetter.get("B"));
        Assert.assertEquals("01", kTableValueGetter.get("C"));
        Assert.assertNull(kTableValueGetter2.get("A"));
        Assert.assertEquals(new Integer(2), kTableValueGetter2.get("B"));
        Assert.assertEquals(new Integer(1), kTableValueGetter2.get("C"));
        Assert.assertNull(kTableValueGetter3.get("A"));
        Assert.assertEquals(new Integer(2), kTableValueGetter3.get("B"));
        Assert.assertNull(kTableValueGetter3.get("C"));
        Assert.assertNull(kTableValueGetter4.get("A"));
        Assert.assertEquals("02", kTableValueGetter4.get("B"));
        Assert.assertEquals("01", kTableValueGetter4.get("C"));
    }

    @Test
    public void testStateStoreLazyEval() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl table = streamsBuilder.table("topic1", this.consumed);
        streamsBuilder.table("topic2", this.consumed);
        table.mapValues(new ValueMapper<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableImplTest.5
            public Integer apply(String str) {
                return new Integer(str);
            }
        }).filter(new Predicate<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableImplTest.6
            public boolean test(String str, Integer num) {
                return num.intValue() % 2 == 0;
            }
        });
        this.driver.setUp(streamsBuilder, this.stateDir, (Serde<?>) null, (Serde<?>) null);
        this.driver.setTime(0L);
        Assert.assertEquals(2L, this.driver.allStateStores().size());
    }

    @Test
    public void testStateStore() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table("topic2", this.consumed).join(streamsBuilder.table("topic1", this.consumed).mapValues(new ValueMapper<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableImplTest.7
            public Integer apply(String str) {
                return new Integer(str);
            }
        }).filter(new Predicate<String, Integer>() { // from class: org.apache.kafka.streams.kstream.internals.KTableImplTest.8
            public boolean test(String str, Integer num) {
                return num.intValue() % 2 == 0;
            }
        }), new ValueJoiner<String, Integer, String>() { // from class: org.apache.kafka.streams.kstream.internals.KTableImplTest.9
            public String apply(String str, Integer num) {
                return str + num;
            }
        });
        this.driver.setUp(streamsBuilder, this.stateDir, (Serde<?>) null, (Serde<?>) null);
        this.driver.setTime(0L);
        Assert.assertEquals(2L, this.driver.allStateStores().size());
    }

    @Test
    public void testRepartition() throws NoSuchFieldException, IllegalAccessException {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl table = streamsBuilder.table("topic1", this.consumed, Materialized.as("storeName1").withKeySerde(this.stringSerde).withValueSerde(this.stringSerde));
        table.groupBy(MockMapper.noOpKeyValueMapper()).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, "mock-result1");
        table.groupBy(MockMapper.noOpKeyValueMapper()).reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result2");
        this.driver.setUp(streamsBuilder, this.stateDir, this.stringSerde, this.stringSerde);
        this.driver.setTime(0L);
        Assert.assertEquals(3L, this.driver.allStateStores().size());
        Assert.assertTrue(this.driver.allProcessorNames().contains("KSTREAM-SINK-0000000003"));
        Assert.assertTrue(this.driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000004"));
        Assert.assertTrue(this.driver.allProcessorNames().contains("KSTREAM-SINK-0000000007"));
        Assert.assertTrue(this.driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000008"));
        Field declaredField = this.driver.processor("KSTREAM-SINK-0000000003").getClass().getDeclaredField("valSerializer");
        Field declaredField2 = this.driver.processor("KSTREAM-SOURCE-0000000004").getClass().getDeclaredField("valDeserializer");
        declaredField.setAccessible(true);
        declaredField2.setAccessible(true);
        Assert.assertNotNull(((ChangedSerializer) declaredField.get(this.driver.processor("KSTREAM-SINK-0000000003"))).inner());
        Assert.assertNotNull(((ChangedDeserializer) declaredField2.get(this.driver.processor("KSTREAM-SOURCE-0000000004"))).inner());
        Assert.assertNotNull(((ChangedSerializer) declaredField.get(this.driver.processor("KSTREAM-SINK-0000000007"))).inner());
        Assert.assertNotNull(((ChangedDeserializer) declaredField2.get(this.driver.processor("KSTREAM-SOURCE-0000000008"))).inner());
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullSelectorOnToStream() {
        this.table.toStream((KeyValueMapper) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullTopicOnTo() {
        this.table.to((String) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullPredicateOnFilter() {
        this.table.filter((Predicate) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullPredicateOnFilterNot() {
        this.table.filterNot((Predicate) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullMapperOnMapValues() {
        this.table.mapValues((ValueMapper) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullMapperOnMapValueWithKey() {
        this.table.mapValues((ValueMapperWithKey) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullFilePathOnWriteAsText() {
        this.table.writeAsText((String) null);
    }

    @Test(expected = TopologyException.class)
    public void shouldNotAllowEmptyFilePathOnWriteAsText() {
        this.table.writeAsText("\t  \t");
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullActionOnForEach() {
        this.table.foreach((ForeachAction) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldAllowNullTopicInThrough() {
        this.table.through((String) null, "store");
    }

    @Test
    public void shouldAllowNullStoreInThrough() {
        this.table.through("topic", (String) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullSelectorOnGroupBy() {
        this.table.groupBy((KeyValueMapper) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullOtherTableOnJoin() {
        this.table.join((KTable) null, MockValueJoiner.TOSTRING_JOINER);
    }

    @Test
    public void shouldAllowNullStoreInJoin() {
        this.table.join(this.table, MockValueJoiner.TOSTRING_JOINER, (Serde) null, (String) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullStoreSupplierInJoin() {
        this.table.join(this.table, MockValueJoiner.TOSTRING_JOINER, (StateStoreSupplier) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullStoreSupplierInLeftJoin() {
        this.table.leftJoin(this.table, MockValueJoiner.TOSTRING_JOINER, (StateStoreSupplier) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullStoreSupplierInOuterJoin() {
        this.table.outerJoin(this.table, MockValueJoiner.TOSTRING_JOINER, (StateStoreSupplier) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullJoinerJoin() {
        this.table.join(this.table, (ValueJoiner) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullOtherTableOnOuterJoin() {
        this.table.outerJoin((KTable) null, MockValueJoiner.TOSTRING_JOINER);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullJoinerOnOuterJoin() {
        this.table.outerJoin(this.table, (ValueJoiner) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullJoinerOnLeftJoin() {
        this.table.leftJoin(this.table, (ValueJoiner) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldNotAllowNullOtherTableOnLeftJoin() {
        this.table.leftJoin((KTable) null, MockValueJoiner.TOSTRING_JOINER);
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnFilterWhenMaterializedIsNull() {
        this.table.filter(new Predicate<String, String>() { // from class: org.apache.kafka.streams.kstream.internals.KTableImplTest.10
            public boolean test(String str, String str2) {
                return false;
            }
        }, (Materialized) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnFilterNotWhenMaterializedIsNull() {
        this.table.filterNot(new Predicate<String, String>() { // from class: org.apache.kafka.streams.kstream.internals.KTableImplTest.11
            public boolean test(String str, String str2) {
                return false;
            }
        }, (Materialized) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnJoinWhenMaterializedIsNull() {
        this.table.join(this.table, MockValueJoiner.TOSTRING_JOINER, (Materialized) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnLeftJoinWhenMaterializedIsNull() {
        this.table.leftJoin(this.table, MockValueJoiner.TOSTRING_JOINER, (Materialized) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnOuterJoinWhenMaterializedIsNull() {
        this.table.leftJoin(this.table, MockValueJoiner.TOSTRING_JOINER, (Materialized) null);
    }
}
