package org.apache.kafka.streams.kstream.internals;

import java.io.File;
import java.util.HashMap;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableAggregateTest.class */
public class KTableAggregateTest {
    private final Serde<String> stringSerde = Serdes.String();
    private final Consumed<String, String> consumed = Consumed.with(this.stringSerde, this.stringSerde);
    private final Serialized<String, String> stringSerialzied = Serialized.with(this.stringSerde, this.stringSerde);
    private File stateDir = null;

    @Rule
    public EmbeddedKafkaCluster cluster = null;

    @Rule
    public final KStreamTestDriver driver = new KStreamTestDriver();

    @Before
    public void setUp() {
        this.stateDir = TestUtils.tempDirectory("kafka-test");
    }

    @Test
    public void testAggBasic() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        streamsBuilder.table("topic1", this.consumed).groupBy(MockMapper.noOpKeyValueMapper(), this.stringSerialzied).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, this.stringSerde, "topic1-Canonized").toStream().process(mockProcessorSupplier, new String[0]);
        this.driver.setUp(streamsBuilder, this.stateDir, Serdes.String(), Serdes.String());
        this.driver.process("topic1", "A", "1");
        this.driver.flushState();
        this.driver.process("topic1", "B", "2");
        this.driver.flushState();
        this.driver.process("topic1", "A", "3");
        this.driver.flushState();
        this.driver.process("topic1", "B", "4");
        this.driver.flushState();
        this.driver.process("topic1", "C", "5");
        this.driver.flushState();
        this.driver.process("topic1", "D", "6");
        this.driver.flushState();
        this.driver.process("topic1", "B", "7");
        this.driver.flushState();
        this.driver.process("topic1", "C", "8");
        this.driver.flushState();
        Assert.assertEquals(Utils.mkList(new String[]{"A:0+1", "B:0+2", "A:0+1-1+3", "B:0+2-2+4", "C:0+5", "D:0+6", "B:0+2-2+4-4+7", "C:0+5-5+8"}), mockProcessorSupplier.processed);
    }

    @Test
    public void testAggCoalesced() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        streamsBuilder.table("topic1", this.consumed).groupBy(MockMapper.noOpKeyValueMapper(), this.stringSerialzied).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, this.stringSerde, "topic1-Canonized").toStream().process(mockProcessorSupplier, new String[0]);
        this.driver.setUp(streamsBuilder, this.stateDir);
        this.driver.process("topic1", "A", "1");
        this.driver.process("topic1", "A", "3");
        this.driver.process("topic1", "A", "4");
        this.driver.flushState();
        Assert.assertEquals(Utils.mkList(new String[]{"A:0+4"}), mockProcessorSupplier.processed);
    }

    @Test
    public void testAggRepartition() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        streamsBuilder.table("topic1", this.consumed).groupBy(new KeyValueMapper<String, String, KeyValue<String, String>>() { // from class: org.apache.kafka.streams.kstream.internals.KTableAggregateTest.1
            public KeyValue<String, String> apply(String str, String str2) {
                boolean z = -1;
                switch (str.hashCode()) {
                    case 2407815:
                        if (str.equals("NULL")) {
                            z = true;
                            break;
                        }
                        break;
                    case 3392903:
                        if (str.equals("null")) {
                            z = false;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        return KeyValue.pair((Object) null, str2);
                    case true:
                        return null;
                    default:
                        return KeyValue.pair(str2, str2);
                }
            }
        }, this.stringSerialzied).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, this.stringSerde, "topic1-Canonized").toStream().process(mockProcessorSupplier, new String[0]);
        this.driver.setUp(streamsBuilder, this.stateDir);
        this.driver.process("topic1", "A", "1");
        this.driver.flushState();
        this.driver.process("topic1", "A", null);
        this.driver.flushState();
        this.driver.process("topic1", "A", "1");
        this.driver.flushState();
        this.driver.process("topic1", "B", "2");
        this.driver.flushState();
        this.driver.process("topic1", "null", "3");
        this.driver.flushState();
        this.driver.process("topic1", "B", "4");
        this.driver.flushState();
        this.driver.process("topic1", "NULL", "5");
        this.driver.flushState();
        this.driver.process("topic1", "B", "7");
        this.driver.flushState();
        Assert.assertEquals(Utils.mkList(new String[]{"1:0+1", "1:0+1-1", "1:0+1-1+1", "2:0+2", "2:0+2-2", "4:0+4", "4:0+4-4", "7:0+7"}), mockProcessorSupplier.processed);
    }

    private void testCountHelper(StreamsBuilder streamsBuilder, String str, MockProcessorSupplier<String, Long> mockProcessorSupplier) {
        this.driver.setUp(streamsBuilder, this.stateDir);
        this.driver.process(str, "A", "green");
        this.driver.flushState();
        this.driver.process(str, "B", "green");
        this.driver.flushState();
        this.driver.process(str, "A", "blue");
        this.driver.flushState();
        this.driver.process(str, "C", "yellow");
        this.driver.flushState();
        this.driver.process(str, "D", "green");
        this.driver.flushState();
        this.driver.flushState();
        Assert.assertEquals(Utils.mkList(new String[]{"green:1", "green:2", "green:1", "blue:1", "yellow:1", "green:2"}), mockProcessorSupplier.processed);
    }

    @Test
    public void testCount() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        MockProcessorSupplier<String, Long> mockProcessorSupplier = new MockProcessorSupplier<>();
        streamsBuilder.table("count-test-input", this.consumed).groupBy(MockMapper.selectValueKeyValueMapper(), this.stringSerialzied).count("count").toStream().process(mockProcessorSupplier, new String[0]);
        testCountHelper(streamsBuilder, "count-test-input", mockProcessorSupplier);
    }

    @Test
    public void testCountWithInternalStore() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        MockProcessorSupplier<String, Long> mockProcessorSupplier = new MockProcessorSupplier<>();
        streamsBuilder.table("count-test-input", this.consumed).groupBy(MockMapper.selectValueKeyValueMapper(), this.stringSerialzied).count().toStream().process(mockProcessorSupplier, new String[0]);
        testCountHelper(streamsBuilder, "count-test-input", mockProcessorSupplier);
    }

    @Test
    public void testCountCoalesced() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        streamsBuilder.table("count-test-input", this.consumed).groupBy(MockMapper.selectValueKeyValueMapper(), this.stringSerialzied).count("count").toStream().process(mockProcessorSupplier, new String[0]);
        this.driver.setUp(streamsBuilder, this.stateDir);
        this.driver.process("count-test-input", "A", "green");
        this.driver.process("count-test-input", "B", "green");
        this.driver.process("count-test-input", "A", "blue");
        this.driver.process("count-test-input", "C", "yellow");
        this.driver.process("count-test-input", "D", "green");
        this.driver.flushState();
        Assert.assertEquals(Utils.mkList(new String[]{"blue:1", "yellow:1", "green:2"}), mockProcessorSupplier.processed);
    }

    @Test
    public void testRemoveOldBeforeAddNew() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        streamsBuilder.table("count-test-input", this.consumed).groupBy(new KeyValueMapper<String, String, KeyValue<String, String>>() { // from class: org.apache.kafka.streams.kstream.internals.KTableAggregateTest.5
            public KeyValue<String, String> apply(String str, String str2) {
                return KeyValue.pair(String.valueOf(str.charAt(0)), String.valueOf(str.charAt(1)));
            }
        }, this.stringSerialzied).aggregate(new Initializer<String>() { // from class: org.apache.kafka.streams.kstream.internals.KTableAggregateTest.2
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public String m28apply() {
                return "";
            }
        }, new Aggregator<String, String, String>() { // from class: org.apache.kafka.streams.kstream.internals.KTableAggregateTest.3
            public String apply(String str, String str2, String str3) {
                return str3 + str2;
            }
        }, new Aggregator<String, String, String>() { // from class: org.apache.kafka.streams.kstream.internals.KTableAggregateTest.4
            public String apply(String str, String str2, String str3) {
                return str3.replaceAll(str2, "");
            }
        }, Serdes.String(), "someStore").toStream().process(mockProcessorSupplier, new String[0]);
        this.driver.setUp(streamsBuilder, this.stateDir);
        this.driver.process("count-test-input", "11", "A");
        this.driver.flushState();
        this.driver.process("count-test-input", "12", "B");
        this.driver.flushState();
        this.driver.process("count-test-input", "11", null);
        this.driver.flushState();
        this.driver.process("count-test-input", "12", "C");
        this.driver.flushState();
        Assert.assertEquals(Utils.mkList(new String[]{"1:1", "1:12", "1:2", "1:2"}), mockProcessorSupplier.processed);
    }

    @Test
    public void shouldForwardToCorrectProcessorNodeWhenMultiCacheEvictions() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        final HashMap hashMap = new HashMap();
        KTable table = streamsBuilder.table("tableOne", this.consumed);
        KTable reduce = streamsBuilder.table("tableTwo", Consumed.with(Serdes.Long(), Serdes.String())).groupBy(new KeyValueMapper<Long, String, KeyValue<String, Long>>() { // from class: org.apache.kafka.streams.kstream.internals.KTableAggregateTest.8
            public KeyValue<String, Long> apply(Long l, String str) {
                return new KeyValue<>(str, l);
            }
        }, Serialized.with(Serdes.String(), Serdes.Long())).reduce(new Reducer<Long>() { // from class: org.apache.kafka.streams.kstream.internals.KTableAggregateTest.6
            public Long apply(Long l, Long l2) {
                return Long.valueOf(l.longValue() + l2.longValue());
            }
        }, new Reducer<Long>() { // from class: org.apache.kafka.streams.kstream.internals.KTableAggregateTest.7
            public Long apply(Long l, Long l2) {
                return Long.valueOf(l.longValue() - l2.longValue());
            }
        }, "reducer-store");
        reduce.toStream().foreach(new ForeachAction<String, Long>() { // from class: org.apache.kafka.streams.kstream.internals.KTableAggregateTest.9
            public void apply(String str, Long l) {
                hashMap.put(str, l);
            }
        });
        table.leftJoin(reduce, new ValueJoiner<String, Long, String>() { // from class: org.apache.kafka.streams.kstream.internals.KTableAggregateTest.11
            public String apply(String str, Long l) {
                return str + ":" + l;
            }
        }).mapValues(new ValueMapper<String, String>() { // from class: org.apache.kafka.streams.kstream.internals.KTableAggregateTest.10
            public String apply(String str) {
                return str;
            }
        });
        this.driver.setUp(streamsBuilder, this.stateDir, 111L);
        this.driver.process("TestDriver-reducer-store-repartition", "1", new Change(1L, (Object) null));
        this.driver.process("tableOne", "2", "2");
        this.driver.process("TestDriver-reducer-store-repartition", "2", new Change(2L, (Object) null));
        this.driver.process("TestDriver-reducer-store-repartition", "2", new Change(2L, (Object) null));
        Assert.assertEquals(2L, hashMap.get("2"));
        this.driver.process("tableOne", "1", "5");
        Assert.assertEquals(4L, hashMap.get("2"));
    }
}
