package org.apache.kafka.streams.processor;

import java.util.LinkedList;
import java.util.Objects;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/ReadOnlyStoreTest.class */
public class ReadOnlyStoreTest {
    @Test
    public void shouldConnectProcessorAndWriteDataToReadOnlyStore() {
        Topology topology = new Topology();
        topology.addReadOnlyStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("readOnlyStore"), new Serdes.IntegerSerde(), new Serdes.StringSerde()), "readOnlySource", new IntegerDeserializer(), new StringDeserializer(), "storeTopic", "readOnlyProcessor", () -> {
            return new Processor<Integer, String, Void, Void>() { // from class: org.apache.kafka.streams.processor.ReadOnlyStoreTest.1
                KeyValueStore<Integer, String> store;

                public void init(ProcessorContext<Void, Void> processorContext) {
                    this.store = processorContext.getStateStore("readOnlyStore");
                }

                public void process(Record<Integer, String> record) {
                    this.store.put((Integer) record.key(), (String) record.value());
                }
            };
        });
        topology.addSource("source", new IntegerDeserializer(), new StringDeserializer(), new String[]{"inputTopic"});
        topology.addProcessor("processor", () -> {
            return new Processor<Integer, String, Integer, String>() { // from class: org.apache.kafka.streams.processor.ReadOnlyStoreTest.2
                ProcessorContext<Integer, String> context;
                KeyValueStore<Integer, String> store;

                public void init(ProcessorContext<Integer, String> processorContext) {
                    this.context = processorContext;
                    this.store = processorContext.getStateStore("readOnlyStore");
                }

                public void process(Record<Integer, String> record) {
                    this.context.forward(record.withValue(((String) record.value()) + " -- " + ((String) this.store.get((Integer) record.key()))));
                }
            };
        }, new String[]{"source"});
        topology.connectProcessorAndStateStores("processor", new String[]{"readOnlyStore"});
        topology.addSink("sink", "outputTopic", new IntegerSerializer(), new StringSerializer(), new String[]{"processor"});
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("storeTopic", new IntegerSerializer(), new StringSerializer());
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic("inputTopic", new IntegerSerializer(), new StringSerializer());
            TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic("outputTopic", new IntegerDeserializer(), new StringDeserializer());
            createInputTopic.pipeInput(1, "foo");
            createInputTopic.pipeInput(2, "bar");
            createInputTopic2.pipeInput(1, "bar");
            createInputTopic2.pipeInput(2, "foo");
            KeyValueIterator all = topologyTestDriver.getKeyValueStore("readOnlyStore").all();
            try {
                LinkedList linkedList = new LinkedList();
                Objects.requireNonNull(linkedList);
                all.forEachRemaining((v1) -> {
                    r1.add(v1);
                });
                LinkedList linkedList2 = new LinkedList();
                linkedList2.add(KeyValue.pair(1, "foo"));
                linkedList2.add(KeyValue.pair(2, "bar"));
                MatcherAssert.assertThat(linkedList, Matchers.equalTo(linkedList2));
                if (all != null) {
                    all.close();
                }
                LinkedList linkedList3 = new LinkedList();
                linkedList3.add(KeyValue.pair(1, "bar -- foo"));
                linkedList3.add(KeyValue.pair(2, "foo -- bar"));
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToList(), Matchers.equalTo(linkedList3));
                topologyTestDriver.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }
}
