package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StateTestUtils;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.ProcessorTopologyTestDriver;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.class */
public class ProcessorTopologyTest {
    protected static final String INPUT_TOPIC = "input-topic";
    protected static final String OUTPUT_TOPIC_1 = "output-topic-1";
    protected static final String OUTPUT_TOPIC_2 = "output-topic-2";
    private ProcessorTopologyTestDriver driver;
    private StreamsConfig config;
    private static final Serializer<String> STRING_SERIALIZER = new StringSerializer();
    private static final Deserializer<String> STRING_DESERIALIZER = new StringDeserializer();
    private static long timestamp = 1000;

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorTopologyTest$CustomTimestampExtractor.class */
    public static class CustomTimestampExtractor implements TimestampExtractor {
        public long extract(ConsumerRecord<Object, Object> consumerRecord) {
            return ProcessorTopologyTest.timestamp;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorTopologyTest$ForwardingProcessor.class */
    public static class ForwardingProcessor extends AbstractProcessor<String, String> {
        protected ForwardingProcessor() {
        }

        public void process(String str, String str2) {
            context().forward(str, str2);
        }

        public void punctuate(long j) {
            context().forward(Long.toString(j), "punctuate");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorTopologyTest$MultiplexByNameProcessor.class */
    public static class MultiplexByNameProcessor extends AbstractProcessor<String, String> {
        private final int numChildren;

        public MultiplexByNameProcessor(int i) {
            this.numChildren = i;
        }

        public void process(String str, String str2) {
            for (int i = 0; i != this.numChildren; i++) {
                context().forward(str, str2 + "(" + (i + 1) + ")", "sink" + i);
            }
        }

        public void punctuate(long j) {
            for (int i = 0; i != this.numChildren; i++) {
                context().forward(Long.toString(j), "punctuate(" + (i + 1) + ")", "sink" + i);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorTopologyTest$MultiplexingProcessor.class */
    public static class MultiplexingProcessor extends AbstractProcessor<String, String> {
        private final int numChildren;

        public MultiplexingProcessor(int i) {
            this.numChildren = i;
        }

        public void process(String str, String str2) {
            for (int i = 0; i != this.numChildren; i++) {
                context().forward(str, str2 + "(" + (i + 1) + ")", i);
            }
        }

        public void punctuate(long j) {
            for (int i = 0; i != this.numChildren; i++) {
                context().forward(Long.toString(j), "punctuate(" + (i + 1) + ")", i);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorTopologyTest$StatefulProcessor.class */
    public static class StatefulProcessor extends AbstractProcessor<String, String> {
        private KeyValueStore<String, String> store;
        private final String storeName;

        public StatefulProcessor(String str) {
            this.storeName = str;
        }

        public void init(ProcessorContext processorContext) {
            super.init(processorContext);
            this.store = processorContext.getStateStore(this.storeName);
        }

        public void process(String str, String str2) {
            this.store.put(str, str2);
        }

        public void punctuate(long j) {
            int i = 0;
            KeyValueIterator all = this.store.all();
            Throwable th = null;
            while (all.hasNext()) {
                try {
                    try {
                        all.next();
                        i++;
                    } finally {
                    }
                } catch (Throwable th2) {
                    if (all != null) {
                        if (th != null) {
                            try {
                                all.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            all.close();
                        }
                    }
                    throw th2;
                }
            }
            if (all != null) {
                if (0 != 0) {
                    try {
                        all.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    all.close();
                }
            }
            context().forward(Long.toString(j), Integer.valueOf(i));
        }

        public void close() {
            this.store.close();
        }
    }

    @Before
    public void setup() {
        File tempDir = StateTestUtils.tempDir();
        Properties properties = new Properties();
        properties.setProperty("application.id", "processor-topology-test");
        properties.setProperty("bootstrap.servers", "localhost:9091");
        properties.setProperty("state.dir", tempDir.getAbsolutePath());
        properties.setProperty("key.serde", Serdes.String().getClass().getName());
        properties.setProperty("value.serde", Serdes.String().getClass().getName());
        properties.setProperty("timestamp.extractor", CustomTimestampExtractor.class.getName());
        this.config = new StreamsConfig(properties);
    }

    @After
    public void cleanup() {
        if (this.driver != null) {
            this.driver.close();
        }
        this.driver = null;
    }

    @Test
    public void testTopologyMetadata() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.addSource("source-1", new String[]{"topic-1"});
        topologyBuilder.addSource("source-2", new String[]{"topic-2", "topic-3"});
        topologyBuilder.addProcessor("processor-1", new MockProcessorSupplier(), new String[]{"source-1"});
        topologyBuilder.addProcessor("processor-2", new MockProcessorSupplier(), new String[]{"source-1", "source-2"});
        topologyBuilder.addSink("sink-1", "topic-3", new String[]{"processor-1"});
        topologyBuilder.addSink("sink-2", "topic-4", new String[]{"processor-1", "processor-2"});
        ProcessorTopology build = topologyBuilder.build("X", (Integer) null);
        Assert.assertEquals(6L, build.processors().size());
        Assert.assertEquals(2L, build.sources().size());
        Assert.assertEquals(3L, build.sourceTopics().size());
        Assert.assertNotNull(build.source("topic-1"));
        Assert.assertNotNull(build.source("topic-2"));
        Assert.assertNotNull(build.source("topic-3"));
        Assert.assertEquals(build.source("topic-2"), build.source("topic-3"));
    }

    @Test
    public void testDrivingSimpleTopology() {
        this.driver = new ProcessorTopologyTestDriver(this.config, createSimpleTopology(10), new String[0]);
        this.driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", 10);
        assertNoOutputRecord(OUTPUT_TOPIC_2);
        this.driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", 10);
        assertNoOutputRecord(OUTPUT_TOPIC_2);
        this.driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
        this.driver.process(INPUT_TOPIC, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
        this.driver.process(INPUT_TOPIC, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER);
        assertNoOutputRecord(OUTPUT_TOPIC_2);
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", 10);
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4", 10);
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5", 10);
    }

    @Test
    public void testDrivingMultiplexingTopology() {
        this.driver = new ProcessorTopologyTestDriver(this.config, createMultiplexingTopology(), new String[0]);
        this.driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
        assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
        this.driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2(1)");
        assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2(2)");
        this.driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
        this.driver.process(INPUT_TOPIC, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
        this.driver.process(INPUT_TOPIC, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER);
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3(1)");
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4(1)");
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5(1)");
        assertNextOutputRecord(OUTPUT_TOPIC_2, "key3", "value3(2)");
        assertNextOutputRecord(OUTPUT_TOPIC_2, "key4", "value4(2)");
        assertNextOutputRecord(OUTPUT_TOPIC_2, "key5", "value5(2)");
    }

    @Test
    public void testDrivingMultiplexByNameTopology() {
        this.driver = new ProcessorTopologyTestDriver(this.config, createMultiplexByNameTopology(), new String[0]);
        this.driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
        assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
        this.driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2(1)");
        assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2(2)");
        this.driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
        this.driver.process(INPUT_TOPIC, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
        this.driver.process(INPUT_TOPIC, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER);
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3(1)");
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4(1)");
        assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5(1)");
        assertNextOutputRecord(OUTPUT_TOPIC_2, "key3", "value3(2)");
        assertNextOutputRecord(OUTPUT_TOPIC_2, "key4", "value4(2)");
        assertNextOutputRecord(OUTPUT_TOPIC_2, "key5", "value5(2)");
    }

    @Test
    public void testDrivingStatefulTopology() {
        this.driver = new ProcessorTopologyTestDriver(this.config, createStatefulTopology("entries"), "entries");
        this.driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
        this.driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
        this.driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
        this.driver.process(INPUT_TOPIC, "key1", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
        assertNoOutputRecord(OUTPUT_TOPIC_1);
        KeyValueStore keyValueStore = this.driver.getKeyValueStore("entries");
        Assert.assertEquals("value4", keyValueStore.get("key1"));
        Assert.assertEquals("value2", keyValueStore.get("key2"));
        Assert.assertEquals("value3", keyValueStore.get("key3"));
        Assert.assertNull(keyValueStore.get("key4"));
    }

    protected void assertNextOutputRecord(String str, String str2, String str3) {
        ProducerRecord readOutput = this.driver.readOutput(str, STRING_DESERIALIZER, STRING_DESERIALIZER);
        Assert.assertEquals(str, readOutput.topic());
        Assert.assertEquals(str2, readOutput.key());
        Assert.assertEquals(str3, readOutput.value());
        Assert.assertNull(readOutput.partition());
    }

    protected void assertNextOutputRecord(String str, String str2, String str3, Integer num) {
        ProducerRecord readOutput = this.driver.readOutput(str, STRING_DESERIALIZER, STRING_DESERIALIZER);
        Assert.assertEquals(str, readOutput.topic());
        Assert.assertEquals(str2, readOutput.key());
        Assert.assertEquals(str3, readOutput.value());
        Assert.assertEquals(num, readOutput.partition());
    }

    protected void assertNoOutputRecord(String str) {
        Assert.assertNull(this.driver.readOutput(str));
    }

    protected <K, V> StreamPartitioner<K, V> constantPartitioner(final Integer num) {
        return new StreamPartitioner<K, V>() { // from class: org.apache.kafka.streams.processor.internals.ProcessorTopologyTest.1
            public Integer partition(K k, V v, int i) {
                return num;
            }
        };
    }

    protected TopologyBuilder createSimpleTopology(int i) {
        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC}).addProcessor("processor", define(new ForwardingProcessor()), new String[]{"source"}).addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(Integer.valueOf(i)), new String[]{"processor"});
    }

    protected TopologyBuilder createMultiplexingTopology() {
        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC}).addProcessor("processor", define(new MultiplexingProcessor(2)), new String[]{"source"}).addSink("sink1", OUTPUT_TOPIC_1, new String[]{"processor"}).addSink("sink2", OUTPUT_TOPIC_2, new String[]{"processor"});
    }

    protected TopologyBuilder createMultiplexByNameTopology() {
        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC}).addProcessor("processor", define(new MultiplexByNameProcessor(2)), new String[]{"source"}).addSink("sink0", OUTPUT_TOPIC_1, new String[]{"processor"}).addSink("sink1", OUTPUT_TOPIC_2, new String[]{"processor"});
    }

    protected TopologyBuilder createStatefulTopology(String str) {
        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC}).addProcessor("processor", define(new StatefulProcessor(str)), new String[]{"source"}).addStateStore(Stores.create(str).withStringKeys().withStringValues().inMemory().build(), new String[]{"processor"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor"});
    }

    protected <K, V> ProcessorSupplier<K, V> define(final Processor<K, V> processor) {
        return new ProcessorSupplier<K, V>() { // from class: org.apache.kafka.streams.processor.internals.ProcessorTopologyTest.2
            public Processor<K, V> get() {
                return processor;
            }
        };
    }
}
