package org.apache.kafka.streams;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.internals.KeyValueStoreFacade;
import org.apache.kafka.streams.internals.QuietStreamsConfig;
import org.apache.kafka.streams.internals.WindowStoreFacade;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.GlobalStateManager;
import org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl;
import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger;
import org.apache.kafka.streams.test.TestRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/TopologyTestDriver.class */
public class TopologyTestDriver implements Closeable {
    private final Time mockWallClockTime;
    private final InternalTopologyBuilder internalTopologyBuilder;
    private static final int PARTITION_ID = 0;
    final StreamTask task;
    private final GlobalStateUpdateTask globalStateTask;
    private final GlobalStateManager globalStateManager;
    private final StateDirectory stateDirectory;
    private final Metrics metrics;
    final ProcessorTopology processorTopology;
    final ProcessorTopology globalTopology;
    private final MockProducer<byte[], byte[]> producer;
    private final Set<String> internalTopics;
    private final Map<String, TopicPartition> partitionsByInputTopic;
    private final Map<String, TopicPartition> globalPartitionsByInputTopic;
    private final Map<TopicPartition, AtomicLong> offsetsByTopicOrPatternPartition;
    private final Map<String, Queue<ProducerRecord<byte[], byte[]>>> outputRecordsByTopic;
    private final boolean eosEnabled;
    private Consumer<ProducerRecord<byte[], byte[]>> outputCallback;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) TopologyTestDriver.class);
    private static final TaskId TASK_ID = new TaskId(0, 0);

    /* loaded from: input_file:org/apache/kafka/streams/TopologyTestDriver$MockTime.class */
    static class MockTime implements Time {
        private final AtomicLong timeMs;
        private final AtomicLong highResTimeNs;

        MockTime(long j) {
            this.timeMs = new AtomicLong(j);
            this.highResTimeNs = new AtomicLong(j * 1000 * 1000);
        }

        @Override // org.apache.kafka.common.utils.Time
        public long milliseconds() {
            return this.timeMs.get();
        }

        @Override // org.apache.kafka.common.utils.Time
        public long nanoseconds() {
            return this.highResTimeNs.get();
        }

        @Override // org.apache.kafka.common.utils.Time
        public long hiResClockMs() {
            return TimeUnit.NANOSECONDS.toMillis(nanoseconds());
        }

        @Override // org.apache.kafka.common.utils.Time
        public void sleep(long j) {
            if (j < 0) {
                throw new IllegalArgumentException("Sleep ms cannot be negative.");
            }
            this.timeMs.addAndGet(j);
            this.highResTimeNs.addAndGet(TimeUnit.MILLISECONDS.toNanos(j));
        }

        @Override // org.apache.kafka.common.utils.Time
        public void waitObject(Object obj, Supplier<Boolean> supplier, long j) {
            throw new UnsupportedOperationException();
        }
    }

    public TopologyTestDriver(Topology topology, Properties properties) {
        this(topology, properties, (Instant) null);
    }

    @Deprecated
    public TopologyTestDriver(Topology topology, Properties properties, long j) {
        this(topology.internalTopologyBuilder, properties, j);
    }

    public TopologyTestDriver(Topology topology, Properties properties, Instant instant) {
        this(topology.internalTopologyBuilder, properties, instant == null ? System.currentTimeMillis() : instant.toEpochMilli());
    }

    private TopologyTestDriver(InternalTopologyBuilder internalTopologyBuilder, Properties properties, long j) {
        this.internalTopics = new HashSet();
        this.partitionsByInputTopic = new HashMap();
        this.globalPartitionsByInputTopic = new HashMap();
        this.offsetsByTopicOrPatternPartition = new HashMap();
        this.outputRecordsByTopic = new HashMap();
        log.warn("TopologyTestDriver: " + properties);
        QuietStreamsConfig quietStreamsConfig = new QuietStreamsConfig(properties);
        logIfTaskIdleEnabled(quietStreamsConfig);
        log.warn("TopologyTestDriver: " + quietStreamsConfig);
        this.mockWallClockTime = new MockTime(j);
        this.internalTopologyBuilder = internalTopologyBuilder;
        this.internalTopologyBuilder.rewriteTopology(quietStreamsConfig);
        this.processorTopology = this.internalTopologyBuilder.build((Integer) null);
        this.globalTopology = this.internalTopologyBuilder.buildGlobalStateTopology();
        boolean z = this.processorTopology.hasPersistentLocalStore() || (this.globalTopology != null && this.globalTopology.hasPersistentGlobalStore());
        ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
        this.producer = new MockProducer<byte[], byte[]>(true, byteArraySerializer, byteArraySerializer) { // from class: org.apache.kafka.streams.TopologyTestDriver.1
            @Override // org.apache.kafka.clients.producer.MockProducer, org.apache.kafka.clients.producer.Producer
            public List<PartitionInfo> partitionsFor(String str) {
                return Collections.singletonList(new PartitionInfo(str, 0, null, null, null));
            }
        };
        MockConsumer mockConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
        this.stateDirectory = new StateDirectory(quietStreamsConfig, this.mockWallClockTime, z);
        this.metrics = new Metrics(new MetricConfig().samples(quietStreamsConfig.getInt("metrics.num.samples").intValue()).recordLevel(Sensor.RecordingLevel.forName(quietStreamsConfig.getString("metrics.recording.level"))).timeWindow(quietStreamsConfig.getLong("metrics.sample.window.ms").longValue(), TimeUnit.MILLISECONDS), this.mockWallClockTime);
        String name = Thread.currentThread().getName();
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(this.metrics, "test-client", quietStreamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG));
        streamsMetricsImpl.setRocksDBMetricsRecordingTrigger(new RocksDBMetricsRecordingTrigger());
        TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(name, TASK_ID.toString(), streamsMetricsImpl);
        ThreadCache threadCache = new ThreadCache(new LogContext("topology-test-driver "), Math.max(0L, quietStreamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG).longValue()), streamsMetricsImpl);
        StateRestoreListener stateRestoreListener = new StateRestoreListener() { // from class: org.apache.kafka.streams.TopologyTestDriver.2
            @Override // org.apache.kafka.streams.processor.StateRestoreListener
            public void onRestoreStart(TopicPartition topicPartition, String str, long j2, long j3) {
            }

            @Override // org.apache.kafka.streams.processor.StateRestoreListener
            public void onBatchRestored(TopicPartition topicPartition, String str, long j2, long j3) {
            }

            @Override // org.apache.kafka.streams.processor.StateRestoreListener
            public void onRestoreEnd(TopicPartition topicPartition, String str, long j2) {
            }
        };
        Iterator<InternalTopologyBuilder.TopicsInfo> it = this.internalTopologyBuilder.topicGroups().values().iterator();
        while (it.hasNext()) {
            this.internalTopics.addAll(it.next().repartitionSourceTopics.keySet());
        }
        for (String str : this.processorTopology.sourceTopics()) {
            TopicPartition topicPartition = new TopicPartition(str, 0);
            this.partitionsByInputTopic.put(str, topicPartition);
            this.offsetsByTopicOrPatternPartition.put(topicPartition, new AtomicLong());
        }
        mockConsumer.assign(this.partitionsByInputTopic.values());
        HashMap hashMap = new HashMap();
        Iterator<TopicPartition> it2 = this.partitionsByInputTopic.values().iterator();
        while (it2.hasNext()) {
            hashMap.put(it2.next(), 0L);
        }
        mockConsumer.updateBeginningOffsets(hashMap);
        if (this.globalTopology != null) {
            MockConsumer mockConsumer2 = new MockConsumer(OffsetResetStrategy.NONE);
            for (String str2 : this.globalTopology.sourceTopics()) {
                TopicPartition topicPartition2 = new TopicPartition(str2, 0);
                this.globalPartitionsByInputTopic.put(str2, topicPartition2);
                this.offsetsByTopicOrPatternPartition.put(topicPartition2, new AtomicLong());
                mockConsumer2.updatePartitions(str2, Collections.singletonList(new PartitionInfo(str2, 0, null, null, null)));
                mockConsumer2.updateBeginningOffsets(Collections.singletonMap(topicPartition2, 0L));
                mockConsumer2.updateEndOffsets(Collections.singletonMap(topicPartition2, 0L));
            }
            this.globalStateManager = new GlobalStateManagerImpl(new LogContext("mock "), this.globalTopology, mockConsumer2, this.stateDirectory, stateRestoreListener, quietStreamsConfig);
            GlobalProcessorContextImpl globalProcessorContextImpl = new GlobalProcessorContextImpl(quietStreamsConfig, this.globalStateManager, streamsMetricsImpl, threadCache);
            this.globalStateManager.setGlobalProcessorContext(globalProcessorContextImpl);
            this.globalStateTask = new GlobalStateUpdateTask(this.globalTopology, globalProcessorContextImpl, this.globalStateManager, new LogAndContinueExceptionHandler(), new LogContext());
            this.globalStateTask.initialize();
            globalProcessorContextImpl.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, AbstractProcessorContext.NONEXIST_TOPIC, new RecordHeaders()));
        } else {
            this.globalStateManager = null;
            this.globalStateTask = null;
        }
        if (this.partitionsByInputTopic.isEmpty()) {
            this.task = null;
        } else {
            this.task = new StreamTask(TASK_ID, new HashSet(this.partitionsByInputTopic.values()), this.processorTopology, mockConsumer, new StoreChangelogReader(createRestoreConsumer(this.processorTopology.storeToChangelogTopic()), Duration.ZERO, stateRestoreListener, new LogContext("topology-test-driver ")), quietStreamsConfig, streamsMetricsImpl, this.stateDirectory, threadCache, this.mockWallClockTime, () -> {
                return this.producer;
            });
            this.task.initializeStateStores();
            this.task.initializeTopology();
            ((InternalProcessorContext) this.task.context()).setRecordContext(new ProcessorRecordContext(0L, -1L, -1, AbstractProcessorContext.NONEXIST_TOPIC, new RecordHeaders()));
        }
        this.eosEnabled = quietStreamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals(StreamsConfig.EXACTLY_ONCE);
    }

    private static void logIfTaskIdleEnabled(StreamsConfig streamsConfig) {
        Long l = streamsConfig.getLong(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
        if (l.longValue() > 0) {
            log.info("Detected {} config in use with TopologyTestDriver (set to {}ms). This means you might need to use TopologyTestDriver#advanceWallClockTime() or enqueue records on all partitions to allow Steams to make progress. TopologyTestDriver will log a message each time it cannot process enqueued records due to {}.", StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, l, StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
        }
    }

    public Map<MetricName, ? extends Metric> metrics() {
        return Collections.unmodifiableMap(this.metrics.metrics());
    }

    @Deprecated
    public void pipeInput(ConsumerRecord<byte[], byte[]> consumerRecord) {
        pipeRecord(consumerRecord.topic(), consumerRecord.timestamp(), consumerRecord.key(), consumerRecord.value(), consumerRecord.headers());
    }

    public void pipeRawRecord(String str, long j, byte[] bArr, byte[] bArr2) {
        pipeRecord(str, j, bArr, bArr2, new RecordHeaders());
    }

    private void pipeRecord(String str, long j, byte[] bArr, byte[] bArr2, Headers headers) {
        TopicPartition inputTopicOrPatternPartition = getInputTopicOrPatternPartition(str);
        TopicPartition topicPartition = this.globalPartitionsByInputTopic.get(str);
        if (inputTopicOrPatternPartition == null && topicPartition == null) {
            throw new IllegalArgumentException("Unknown topic: " + str);
        }
        if (inputTopicOrPatternPartition != null) {
            enqueueTaskRecord(str, inputTopicOrPatternPartition, j, bArr, bArr2, headers);
            completeAllProcessableWork();
        }
        if (topicPartition != null) {
            processGlobalRecord(topicPartition, j, bArr, bArr2, headers);
        }
    }

    private void enqueueTaskRecord(String str, TopicPartition topicPartition, long j, byte[] bArr, byte[] bArr2, Headers headers) {
        this.task.addRecords(topicPartition, Collections.singleton(new ConsumerRecord(str, topicPartition.partition(), this.offsetsByTopicOrPatternPartition.get(topicPartition).incrementAndGet() - 1, j, TimestampType.CREATE_TIME, -1L, bArr == null ? -1 : bArr.length, bArr2 == null ? -1 : bArr2.length, bArr, bArr2, headers)));
    }

    private void completeAllProcessableWork() {
        captureOutputsAndReEnqueueInternalResults();
        if (this.task != null) {
            while (this.task.hasRecordsQueued() && this.task.isProcessable(this.mockWallClockTime.milliseconds())) {
                if (this.task.isProcessable(this.mockWallClockTime.milliseconds())) {
                    this.task.process();
                }
                this.task.maybePunctuateStreamTime();
                this.task.commit();
                captureOutputsAndReEnqueueInternalResults();
            }
            if (this.task.hasRecordsQueued()) {
                log.info("Due to the {} configuration, there are currently some records that cannot be processed. Advancing wall-clock time or enqueuing records on the empty topics will allow Streams to process more.", StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
            }
        }
    }

    private void processGlobalRecord(TopicPartition topicPartition, long j, byte[] bArr, byte[] bArr2, Headers headers) {
        this.globalStateTask.update(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), this.offsetsByTopicOrPatternPartition.get(topicPartition).incrementAndGet() - 1, j, TimestampType.CREATE_TIME, -1L, bArr == null ? -1 : bArr.length, bArr2 == null ? -1 : bArr2.length, bArr, bArr2, headers));
        this.globalStateTask.flushState();
    }

    private void validateSourceTopicNameRegexPattern(String str) {
        for (String str2 : this.internalTopologyBuilder.sourceTopicNames()) {
            if (!str2.equals(str) && Pattern.compile(str2).matcher(str).matches()) {
                throw new TopologyException("Topology add source of type String for topic: " + str2 + " cannot contain regex pattern for input record topic: " + str + " and hence cannot process the message.");
            }
        }
    }

    private TopicPartition getInputTopicOrPatternPartition(String str) {
        if (!this.internalTopologyBuilder.sourceTopicNames().isEmpty()) {
            validateSourceTopicNameRegexPattern(str);
        }
        TopicPartition topicPartition = this.partitionsByInputTopic.get(str);
        if (topicPartition == null) {
            for (Map.Entry<String, TopicPartition> entry : this.partitionsByInputTopic.entrySet()) {
                if (Pattern.compile(entry.getKey()).matcher(str).matches()) {
                    return entry.getValue();
                }
            }
        }
        return topicPartition;
    }

    public void setOutputListener(Consumer<ProducerRecord<byte[], byte[]>> consumer) {
        this.outputCallback = consumer;
    }

    public void publishOutputMessage(ProducerRecord<byte[], byte[]> producerRecord) {
        if (this.outputCallback != null) {
            this.outputCallback.accept(producerRecord);
        }
    }

    private void captureOutputsAndReEnqueueInternalResults() {
        List<ProducerRecord<byte[], byte[]>> history = this.producer.history();
        this.producer.clear();
        if (this.eosEnabled && !this.producer.closed()) {
            this.producer.initTransactions();
            this.producer.beginTransaction();
        }
        for (ProducerRecord<byte[], byte[]> producerRecord : history) {
            if (this.outputCallback == null) {
                this.outputRecordsByTopic.computeIfAbsent(producerRecord.topic(), str -> {
                    return new LinkedList();
                }).add(producerRecord);
            } else {
                publishOutputMessage(producerRecord);
            }
            String str2 = producerRecord.topic();
            TopicPartition inputTopicOrPatternPartition = getInputTopicOrPatternPartition(str2);
            TopicPartition topicPartition = this.globalPartitionsByInputTopic.get(str2);
            if (inputTopicOrPatternPartition != null) {
                enqueueTaskRecord(str2, inputTopicOrPatternPartition, producerRecord.timestamp().longValue(), producerRecord.key(), producerRecord.value(), producerRecord.headers());
            }
            if (topicPartition != null) {
                processGlobalRecord(topicPartition, producerRecord.timestamp().longValue(), producerRecord.key(), producerRecord.value(), producerRecord.headers());
            }
        }
    }

    @Deprecated
    public void pipeInput(List<ConsumerRecord<byte[], byte[]>> list) {
        Iterator<ConsumerRecord<byte[], byte[]>> it = list.iterator();
        while (it.hasNext()) {
            pipeInput(it.next());
        }
    }

    @Deprecated
    public void advanceWallClockTime(long j) {
        advanceWallClockTime(Duration.ofMillis(j));
    }

    public void advanceWallClockTime(Duration duration) {
        Objects.requireNonNull(duration, "advance cannot be null");
        this.mockWallClockTime.sleep(duration.toMillis());
        if (this.task != null) {
            this.task.maybePunctuateSystemTime();
            this.task.commit();
        }
        completeAllProcessableWork();
    }

    @Deprecated
    public ProducerRecord<byte[], byte[]> readOutput(String str) {
        Queue<ProducerRecord<byte[], byte[]>> queue = this.outputRecordsByTopic.get(str);
        if (queue == null) {
            return null;
        }
        return queue.poll();
    }

    @Deprecated
    public <K, V> ProducerRecord<K, V> readOutput(String str, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        ProducerRecord<byte[], byte[]> readOutput = readOutput(str);
        if (readOutput == null) {
            return null;
        }
        return new ProducerRecord<>(readOutput.topic(), readOutput.partition(), readOutput.timestamp(), deserializer.deserialize(readOutput.topic(), readOutput.key()), deserializer2.deserialize(readOutput.topic(), readOutput.value()), readOutput.headers());
    }

    private Queue<ProducerRecord<byte[], byte[]>> getRecordsQueue(String str) {
        Queue<ProducerRecord<byte[], byte[]>> queue = this.outputRecordsByTopic.get(str);
        if (queue != null || this.processorTopology.sinkTopics().contains(str)) {
            return queue;
        }
        throw new IllegalArgumentException("Unknown topic: " + str + " available sinkTopics: " + this.processorTopology.sinkTopics());
    }

    public final <K, V> TestInputTopic<K, V> createInputTopic(String str, Serializer<K> serializer, Serializer<V> serializer2) {
        return new TestInputTopic<>(this, str, serializer, serializer2, Instant.now(), Duration.ZERO);
    }

    public final <K, V> TestInputTopic<K, V> createInputTopic(String str, Serializer<K> serializer, Serializer<V> serializer2, Instant instant, Duration duration) {
        return new TestInputTopic<>(this, str, serializer, serializer2, instant, duration);
    }

    public final <K, V> TestOutputTopic<K, V> createOutputTopic(String str, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        return new TestOutputTopic<>(this, str, deserializer, deserializer2);
    }

    ProducerRecord<byte[], byte[]> readRecord(String str) {
        Queue<ProducerRecord<byte[], byte[]>> recordsQueue = getRecordsQueue(str);
        if (recordsQueue == null) {
            return null;
        }
        return recordsQueue.poll();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <K, V> TestRecord<K, V> readRecord(String str, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        Queue<ProducerRecord<byte[], byte[]>> recordsQueue = getRecordsQueue(str);
        if (recordsQueue == null) {
            throw new NoSuchElementException("Uninitialized topic: " + str);
        }
        ProducerRecord<byte[], byte[]> poll = recordsQueue.poll();
        if (poll == null) {
            throw new NoSuchElementException("Empty topic: " + str);
        }
        return new TestRecord<>(deserializer.deserialize(poll.topic(), poll.key()), deserializer2.deserialize(poll.topic(), poll.value()), poll.headers(), poll.timestamp());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <K, V> void pipeRecord(String str, TestRecord<K, V> testRecord, Serializer<K> serializer, Serializer<V> serializer2, Instant instant) {
        long longValue;
        byte[] serialize = serializer.serialize(str, testRecord.headers(), testRecord.key());
        byte[] serialize2 = serializer2.serialize(str, testRecord.headers(), testRecord.value());
        if (instant != null) {
            longValue = instant.toEpochMilli();
        } else {
            if (testRecord.timestamp() == null) {
                throw new IllegalStateException("Provided `TestRecord` does not have a timestamp and no timestamp overwrite was provided via `time` parameter.");
            }
            longValue = testRecord.timestamp().longValue();
        }
        pipeRecord(str, longValue, serialize, serialize2, testRecord.headers());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final long getQueueSize(String str) {
        if (getRecordsQueue(str) == null) {
            return 0L;
        }
        return r0.size();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final boolean isEmpty(String str) {
        return getQueueSize(str) == 0;
    }

    public Map<String, StateStore> getAllStateStores() {
        HashMap hashMap = new HashMap();
        for (String str : this.internalTopologyBuilder.allStateStoreName()) {
            hashMap.put(str, getStateStore(str, false));
        }
        return hashMap;
    }

    public StateStore getStateStore(String str) throws IllegalArgumentException {
        return getStateStore(str, true);
    }

    private StateStore getStateStore(String str, boolean z) {
        StateStore globalStore;
        StateStore store;
        if (this.task != null && (store = ((ProcessorContextImpl) this.task.context()).getStateMgr().getStore(str)) != null) {
            if (z) {
                throwIfBuiltInStore(store);
            }
            return store;
        }
        if (this.globalStateManager == null || (globalStore = this.globalStateManager.getGlobalStore(str)) == null) {
            return null;
        }
        if (z) {
            throwIfBuiltInStore(globalStore);
        }
        return globalStore;
    }

    private void throwIfBuiltInStore(StateStore stateStore) {
        if (stateStore instanceof TimestampedKeyValueStore) {
            throw new IllegalArgumentException("Store " + stateStore.name() + " is a timestamped key-value store and should be accessed via `getTimestampedKeyValueStore()`");
        }
        if (stateStore instanceof ReadOnlyKeyValueStore) {
            throw new IllegalArgumentException("Store " + stateStore.name() + " is a key-value store and should be accessed via `getKeyValueStore()`");
        }
        if (stateStore instanceof TimestampedWindowStore) {
            throw new IllegalArgumentException("Store " + stateStore.name() + " is a timestamped window store and should be accessed via `getTimestampedWindowStore()`");
        }
        if (stateStore instanceof ReadOnlyWindowStore) {
            throw new IllegalArgumentException("Store " + stateStore.name() + " is a window store and should be accessed via `getWindowStore()`");
        }
        if (stateStore instanceof ReadOnlySessionStore) {
            throw new IllegalArgumentException("Store " + stateStore.name() + " is a session store and should be accessed via `getSessionStore()`");
        }
    }

    public <K, V> KeyValueStore<K, V> getKeyValueStore(String str) {
        StateStore stateStore = getStateStore(str, false);
        if (stateStore instanceof TimestampedKeyValueStore) {
            log.info("Method #getTimestampedKeyValueStore() should be used to access a TimestampedKeyValueStore.");
            return new KeyValueStoreFacade((TimestampedKeyValueStore) stateStore);
        }
        if (stateStore instanceof KeyValueStore) {
            return (KeyValueStore) stateStore;
        }
        return null;
    }

    public <K, V> KeyValueStore<K, ValueAndTimestamp<V>> getTimestampedKeyValueStore(String str) {
        StateStore stateStore = getStateStore(str, false);
        if (stateStore instanceof TimestampedKeyValueStore) {
            return (TimestampedKeyValueStore) stateStore;
        }
        return null;
    }

    public <K, V> WindowStore<K, V> getWindowStore(String str) {
        StateStore stateStore = getStateStore(str, false);
        if (stateStore instanceof TimestampedWindowStore) {
            log.info("Method #getTimestampedWindowStore() should be used to access a TimestampedWindowStore.");
            return new WindowStoreFacade((TimestampedWindowStore) stateStore);
        }
        if (stateStore instanceof WindowStore) {
            return (WindowStore) stateStore;
        }
        return null;
    }

    public <K, V> WindowStore<K, ValueAndTimestamp<V>> getTimestampedWindowStore(String str) {
        StateStore stateStore = getStateStore(str, false);
        if (stateStore instanceof TimestampedWindowStore) {
            return (TimestampedWindowStore) stateStore;
        }
        return null;
    }

    public <K, V> SessionStore<K, V> getSessionStore(String str) {
        StateStore stateStore = getStateStore(str, false);
        if (stateStore instanceof SessionStore) {
            return (SessionStore) stateStore;
        }
        return null;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.task != null) {
            this.task.close(true, false);
        }
        if (this.globalStateTask != null) {
            try {
                this.globalStateTask.close();
            } catch (IOException e) {
            }
        }
        completeAllProcessableWork();
        if (this.task != null && this.task.hasRecordsQueued()) {
            log.warn("Found some records that cannot be processed due to the {} configuration during TopologyTestDriver#close().", StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
        }
        if (!this.eosEnabled) {
            this.producer.close();
        }
        this.stateDirectory.clean();
    }

    private MockConsumer<byte[], byte[]> createRestoreConsumer(Map<String, String> map) {
        MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST) { // from class: org.apache.kafka.streams.TopologyTestDriver.3
            @Override // org.apache.kafka.clients.consumer.MockConsumer, org.apache.kafka.clients.consumer.Consumer
            public synchronized void seekToEnd(Collection<TopicPartition> collection) {
            }

            @Override // org.apache.kafka.clients.consumer.MockConsumer, org.apache.kafka.clients.consumer.Consumer
            public synchronized void seekToBeginning(Collection<TopicPartition> collection) {
            }

            @Override // org.apache.kafka.clients.consumer.MockConsumer, org.apache.kafka.clients.consumer.Consumer
            public synchronized long position(TopicPartition topicPartition) {
                return 0L;
            }
        };
        Iterator<Map.Entry<String, String>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            String value = it.next().getValue();
            ArrayList arrayList = new ArrayList();
            arrayList.add(new PartitionInfo(value, 0, null, null, null));
            mockConsumer.updatePartitions(value, arrayList);
            mockConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(value, 0), 0L));
        }
        return mockConsumer;
    }
}
