package org.apache.kafka.test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl;
import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.ThreadCache;

/* loaded from: input_file:org/apache/kafka/test/ProcessorTopologyTestDriver.class */
public class ProcessorTopologyTestDriver {
    private static final String APPLICATION_ID = "test-driver-application";
    private static final int PARTITION_ID = 0;
    private static final TaskId TASK_ID = new TaskId(PARTITION_ID, PARTITION_ID);
    private final ProcessorTopology topology;
    private final MockProducer<byte[], byte[]> producer;
    private final Map<String, TopicPartition> partitionsByTopic = new HashMap();
    private final Map<TopicPartition, AtomicLong> offsetsByTopicPartition = new HashMap();
    private final Map<String, Queue<ProducerRecord<byte[], byte[]>>> outputRecordsByTopic = new HashMap();
    private final Set<String> internalTopics = new HashSet();
    private final Map<String, TopicPartition> globalPartitionsByTopic = new HashMap();
    private StreamTask task;
    private GlobalStateUpdateTask globalStateTask;

    public ProcessorTopologyTestDriver(StreamsConfig streamsConfig, TopologyBuilder topologyBuilder) {
        this.topology = topologyBuilder.setApplicationId(APPLICATION_ID).build((Integer) null);
        ProcessorTopology buildGlobalStateTopology = topologyBuilder.buildGlobalStateTopology();
        MockConsumer mockConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
        ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
        this.producer = new MockProducer<byte[], byte[]>(true, byteArraySerializer, byteArraySerializer) { // from class: org.apache.kafka.test.ProcessorTopologyTestDriver.1
            public List<PartitionInfo> partitionsFor(String str) {
                return Collections.singletonList(new PartitionInfo(str, ProcessorTopologyTestDriver.PARTITION_ID, (Node) null, (Node[]) null, (Node[]) null));
            }
        };
        Iterator it = topologyBuilder.topicGroups().values().iterator();
        while (it.hasNext()) {
            this.internalTopics.addAll(((TopologyBuilder.TopicsInfo) it.next()).repartitionSourceTopics.keySet());
        }
        for (String str : this.topology.sourceTopics()) {
            TopicPartition topicPartition = new TopicPartition(str, PARTITION_ID);
            this.partitionsByTopic.put(str, topicPartition);
            this.offsetsByTopicPartition.put(topicPartition, new AtomicLong());
        }
        mockConsumer.assign(this.offsetsByTopicPartition.keySet());
        StateDirectory stateDirectory = new StateDirectory(APPLICATION_ID, TestUtils.tempDirectory().getPath(), Time.SYSTEM);
        MockStreamsMetrics mockStreamsMetrics = new MockStreamsMetrics(new Metrics());
        ThreadCache threadCache = new ThreadCache("mock", 1048576L, mockStreamsMetrics);
        if (buildGlobalStateTopology != null) {
            MockConsumer<byte[], byte[]> createGlobalConsumer = createGlobalConsumer();
            for (String str2 : buildGlobalStateTopology.sourceTopics()) {
                ArrayList arrayList = new ArrayList();
                arrayList.add(new PartitionInfo(str2, 1, (Node) null, (Node[]) null, (Node[]) null));
                createGlobalConsumer.updatePartitions(str2, arrayList);
                TopicPartition topicPartition2 = new TopicPartition(str2, 1);
                createGlobalConsumer.updateEndOffsets(Collections.singletonMap(topicPartition2, 0L));
                this.globalPartitionsByTopic.put(str2, topicPartition2);
                this.offsetsByTopicPartition.put(topicPartition2, new AtomicLong());
            }
            GlobalStateManagerImpl globalStateManagerImpl = new GlobalStateManagerImpl(buildGlobalStateTopology, createGlobalConsumer, stateDirectory);
            this.globalStateTask = new GlobalStateUpdateTask(buildGlobalStateTopology, new GlobalProcessorContextImpl(streamsConfig, globalStateManagerImpl, mockStreamsMetrics, threadCache), globalStateManagerImpl);
            this.globalStateTask.initialize();
        }
        if (this.partitionsByTopic.isEmpty()) {
            return;
        }
        this.task = new StreamTask(TASK_ID, APPLICATION_ID, this.partitionsByTopic.values(), this.topology, mockConsumer, new StoreChangelogReader(createRestoreConsumer(this.topology.storeToChangelogTopic())), streamsConfig, mockStreamsMetrics, stateDirectory, threadCache, new MockTime(), this.producer);
        this.task.initializeStateStores();
        this.task.initializeTopology();
    }

    private void process(String str, byte[] bArr, byte[] bArr2, long j) {
        TopicPartition topicPartition = this.partitionsByTopic.get(str);
        if (topicPartition == null) {
            TopicPartition topicPartition2 = this.globalPartitionsByTopic.get(str);
            if (topicPartition2 == null) {
                throw new IllegalArgumentException("Unexpected topic: " + str);
            }
            this.globalStateTask.update(new ConsumerRecord(topicPartition2.topic(), topicPartition2.partition(), this.offsetsByTopicPartition.get(topicPartition2).incrementAndGet(), j, TimestampType.CREATE_TIME, 0L, PARTITION_ID, PARTITION_ID, bArr, bArr2));
            this.globalStateTask.flushState();
            return;
        }
        long incrementAndGet = this.offsetsByTopicPartition.get(topicPartition).incrementAndGet();
        this.task.addRecords(topicPartition, records(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), incrementAndGet, j, TimestampType.CREATE_TIME, 0L, PARTITION_ID, PARTITION_ID, bArr, bArr2)));
        this.producer.clear();
        this.task.process();
        this.task.context().setRecordContext(new ProcessorRecordContext(j, incrementAndGet, topicPartition.partition(), str));
        this.task.commit();
        for (ProducerRecord<byte[], byte[]> producerRecord : this.producer.history()) {
            Queue<ProducerRecord<byte[], byte[]>> queue = this.outputRecordsByTopic.get(producerRecord.topic());
            if (queue == null) {
                queue = new LinkedList();
                this.outputRecordsByTopic.put(producerRecord.topic(), queue);
            }
            queue.add(producerRecord);
            if (this.internalTopics.contains(producerRecord.topic()) || this.topology.sourceTopics().contains(producerRecord.topic())) {
                process(producerRecord.topic(), (byte[]) producerRecord.key(), (byte[]) producerRecord.value(), producerRecord.timestamp().longValue());
            }
        }
    }

    public void process(String str, byte[] bArr, byte[] bArr2) {
        process(str, bArr, bArr2, 0L);
    }

    public <K, V> void process(String str, K k, V v, Serializer<K> serializer, Serializer<V> serializer2) {
        process(str, serializer.serialize(str, k), serializer2.serialize(str, v));
    }

    public ProducerRecord<byte[], byte[]> readOutput(String str) {
        Queue<ProducerRecord<byte[], byte[]>> queue = this.outputRecordsByTopic.get(str);
        if (queue == null) {
            return null;
        }
        return queue.poll();
    }

    public <K, V> ProducerRecord<K, V> readOutput(String str, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        ProducerRecord<byte[], byte[]> readOutput = readOutput(str);
        if (readOutput == null) {
            return null;
        }
        return new ProducerRecord<>(readOutput.topic(), readOutput.partition(), readOutput.timestamp(), deserializer.deserialize(readOutput.topic(), (byte[]) readOutput.key()), deserializer2.deserialize(readOutput.topic(), (byte[]) readOutput.value()));
    }

    private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]> consumerRecord) {
        return Collections.singleton(consumerRecord);
    }

    private StateStore getStateStore(String str) {
        return this.task.context().getStateMgr().getStore(str);
    }

    public <K, V> KeyValueStore<K, V> getKeyValueStore(String str) {
        if (getStateStore(str) instanceof KeyValueStore) {
            return getStateStore(str);
        }
        return null;
    }

    public void close() {
        if (this.task != null) {
            this.task.close(true, false);
        }
        if (this.globalStateTask != null) {
            try {
                this.globalStateTask.close();
            } catch (IOException e) {
            }
        }
    }

    private MockConsumer<byte[], byte[]> createRestoreConsumer(Map<String, String> map) {
        MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST) { // from class: org.apache.kafka.test.ProcessorTopologyTestDriver.2
            public synchronized void seekToEnd(Collection<TopicPartition> collection) {
            }

            public synchronized void seekToBeginning(Collection<TopicPartition> collection) {
            }

            public synchronized long position(TopicPartition topicPartition) {
                return 0L;
            }
        };
        Iterator<Map.Entry<String, String>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            String value = it.next().getValue();
            ArrayList arrayList = new ArrayList();
            arrayList.add(new PartitionInfo(value, PARTITION_ID, (Node) null, (Node[]) null, (Node[]) null));
            mockConsumer.updatePartitions(value, arrayList);
            mockConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(value, PARTITION_ID), 0L));
        }
        return mockConsumer;
    }

    private MockConsumer<byte[], byte[]> createGlobalConsumer() {
        return new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST) { // from class: org.apache.kafka.test.ProcessorTopologyTestDriver.3
            public synchronized void seekToEnd(Collection<TopicPartition> collection) {
            }

            public synchronized void seekToBeginning(Collection<TopicPartition> collection) {
            }

            public synchronized long position(TopicPartition topicPartition) {
                return 0L;
            }
        };
    }
}
