package org.apache.kafka.streams.processor.internals;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.PartitionGroup;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.internals.ThreadCache;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamTask.class */
public class StreamTask extends AbstractTask implements ProcessorNodePunctuator {
    private static final ConsumerRecord<Object, Object> DUMMY_RECORD = new ConsumerRecord<>(AbstractProcessorContext.NONEXIST_TOPIC, -1, -1, null, null);
    static final byte LATEST_MAGIC_BYTE = 1;
    private final Time time;
    private final long maxTaskIdleMs;
    private final int maxBufferedSize;
    private final StreamsMetricsImpl streamsMetrics;
    private final PartitionGroup partitionGroup;
    private final RecordCollector recordCollector;
    private final PartitionGroup.RecordInfo recordInfo;
    private final Map<TopicPartition, Long> consumedOffsets;
    private final PunctuationQueue streamTimePunctuationQueue;
    private final PunctuationQueue systemTimePunctuationQueue;
    private final ProducerSupplier producerSupplier;
    private final Sensor closeTaskSensor;
    private final Sensor processLatencySensor;
    private final Sensor punctuateLatencySensor;
    private final Sensor commitSensor;
    private final Sensor enforcedProcessingSensor;
    private final Sensor recordLatenessSensor;
    private long idleStartTime;
    private Producer<byte[], byte[]> producer;
    private boolean commitRequested;
    private final String threadId;

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamTask$ProducerSupplier.class */
    public interface ProducerSupplier {
        Producer<byte[], byte[]> get();
    }

    public StreamTask(TaskId taskId, Set<TopicPartition> set, ProcessorTopology processorTopology, Consumer<byte[], byte[]> consumer, ChangelogReader changelogReader, StreamsConfig streamsConfig, StreamsMetricsImpl streamsMetricsImpl, StateDirectory stateDirectory, ThreadCache threadCache, Time time, ProducerSupplier producerSupplier) {
        this(taskId, set, processorTopology, consumer, changelogReader, streamsConfig, streamsMetricsImpl, stateDirectory, threadCache, time, producerSupplier, null);
    }

    public StreamTask(TaskId taskId, Set<TopicPartition> set, ProcessorTopology processorTopology, Consumer<byte[], byte[]> consumer, ChangelogReader changelogReader, StreamsConfig streamsConfig, StreamsMetricsImpl streamsMetricsImpl, StateDirectory stateDirectory, ThreadCache threadCache, Time time, ProducerSupplier producerSupplier, RecordCollector recordCollector) {
        super(taskId, set, processorTopology, consumer, changelogReader, false, stateDirectory, streamsConfig);
        this.commitRequested = false;
        this.time = time;
        this.producerSupplier = producerSupplier;
        this.producer = producerSupplier.get();
        this.streamsMetrics = streamsMetricsImpl;
        this.threadId = Thread.currentThread().getName();
        String taskId2 = taskId.toString();
        this.closeTaskSensor = ThreadMetrics.closeTaskSensor(this.threadId, streamsMetricsImpl);
        if (streamsMetricsImpl.version() == StreamsMetricsImpl.Version.FROM_0100_TO_24) {
            Sensor commitOverTasksSensor = ThreadMetrics.commitOverTasksSensor(this.threadId, streamsMetricsImpl);
            this.commitSensor = TaskMetrics.commitSensor(this.threadId, taskId2, streamsMetricsImpl, commitOverTasksSensor);
            this.enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(this.threadId, taskId2, streamsMetricsImpl, commitOverTasksSensor);
        } else {
            this.commitSensor = TaskMetrics.commitSensor(this.threadId, taskId2, streamsMetricsImpl, new Sensor[0]);
            this.enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(this.threadId, taskId2, streamsMetricsImpl, new Sensor[0]);
        }
        this.processLatencySensor = TaskMetrics.processLatencySensor(this.threadId, taskId2, streamsMetricsImpl);
        this.punctuateLatencySensor = TaskMetrics.punctuateSensor(this.threadId, taskId2, streamsMetricsImpl);
        this.recordLatenessSensor = TaskMetrics.recordLatenessSensor(this.threadId, taskId2, streamsMetricsImpl);
        TaskMetrics.droppedRecordsSensor(this.threadId, taskId2, streamsMetricsImpl);
        ProductionExceptionHandler defaultProductionExceptionHandler = streamsConfig.defaultProductionExceptionHandler();
        if (recordCollector == null) {
            this.recordCollector = new RecordCollectorImpl(taskId2, this.logContext, defaultProductionExceptionHandler, TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(this.threadId, taskId2, streamsMetricsImpl));
        } else {
            this.recordCollector = recordCollector;
        }
        this.recordCollector.init(this.producer);
        this.streamTimePunctuationQueue = new PunctuationQueue();
        this.systemTimePunctuationQueue = new PunctuationQueue();
        this.maxTaskIdleMs = streamsConfig.getLong(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG).longValue();
        this.maxBufferedSize = streamsConfig.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG).intValue();
        this.consumedOffsets = new HashMap();
        HashMap hashMap = new HashMap();
        this.processorContext = new ProcessorContextImpl(taskId, this, streamsConfig, this.recordCollector, this.stateMgr, streamsMetricsImpl, threadCache);
        TimestampExtractor defaultTimestampExtractor = streamsConfig.defaultTimestampExtractor();
        DeserializationExceptionHandler defaultDeserializationExceptionHandler = streamsConfig.defaultDeserializationExceptionHandler();
        for (TopicPartition topicPartition : set) {
            SourceNode source = processorTopology.source(topicPartition.topic());
            hashMap.put(topicPartition, new RecordQueue(topicPartition, source, source.getTimestampExtractor() != null ? source.getTimestampExtractor() : defaultTimestampExtractor, defaultDeserializationExceptionHandler, this.processorContext, this.logContext));
        }
        this.recordInfo = new PartitionGroup.RecordInfo();
        this.partitionGroup = new PartitionGroup(hashMap, this.recordLatenessSensor);
        this.stateMgr.registerGlobalStateStores(processorTopology.globalStateStores());
        if (this.eosEnabled) {
            initializeTransactions();
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void initializeMetadata() {
        try {
            Map<TopicPartition, OffsetAndMetadata> map = (Map) this.consumer.committed(this.partitions).entrySet().stream().filter(entry -> {
                return entry.getValue() != null;
            }).collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            }));
            initializeCommittedOffsets(map);
            initializeTaskTime(map);
        } catch (AuthorizationException e) {
            throw new ProcessorStateException(String.format("task [%s] AuthorizationException when initializing offsets for %s", this.id, this.partitions), e);
        } catch (WakeupException e2) {
            throw e2;
        } catch (KafkaException e3) {
            throw new ProcessorStateException(String.format("task [%s] Failed to initialize offsets for %s", this.id, this.partitions), e3);
        }
    }

    private void initializeCommittedOffsets(Map<TopicPartition, OffsetAndMetadata> map) {
        HashSet hashSet = new HashSet(this.topology.storeToChangelogTopic().values());
        Map<TopicPartition, Long> map2 = (Map) map.entrySet().stream().filter(entry -> {
            return hashSet.contains(((TopicPartition) entry.getKey()).topic());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry2 -> {
            return Long.valueOf(((OffsetAndMetadata) entry2.getValue()).offset());
        }));
        Iterator<TopicPartition> it = this.partitions.iterator();
        while (it.hasNext()) {
            map2.putIfAbsent(it.next(), 0L);
        }
        this.stateMgr.putOffsetLimits(map2);
    }

    private void initializeTaskTime(Map<TopicPartition, OffsetAndMetadata> map) {
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : map.entrySet()) {
            TopicPartition key = entry.getKey();
            OffsetAndMetadata value = entry.getValue();
            if (value != null) {
                long decodeTimestamp = decodeTimestamp(value.metadata());
                this.partitionGroup.setPartitionTime(key, decodeTimestamp);
                this.log.debug("A committed timestamp was detected: setting the partition time of partition {} to {} in stream task {}", key, Long.valueOf(decodeTimestamp), this.id);
            } else {
                this.log.debug("No committed timestamp was found in metadata for partition {}", key);
            }
        }
        HashSet hashSet = new HashSet(this.partitions);
        hashSet.removeAll(map.keySet());
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            this.log.debug("No committed offset for partition {}, therefore no timestamp can be found for this partition", (TopicPartition) it.next());
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public boolean initializeStateStores() {
        this.log.debug("Initializing state stores");
        registerStateStores();
        return changelogPartitions().isEmpty();
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void initializeTopology() {
        initTopology();
        if (this.eosEnabled) {
            try {
                this.producer.beginTransaction();
            } catch (ProducerFencedException | UnknownProducerIdException e) {
                throw new TaskMigratedException(this, e);
            }
        }
        this.processorContext.initialize();
        this.taskInitialized = true;
        this.idleStartTime = -1L;
        this.stateMgr.ensureStoresRegistered();
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void resume() {
        this.log.debug("Resuming");
        if (this.eosEnabled) {
            if (this.producer != null) {
                throw new IllegalStateException("Task producer should be null.");
            }
            this.producer = this.producerSupplier.get();
            initializeTransactions();
            this.recordCollector.init(this.producer);
            try {
                this.stateMgr.clearCheckpoints();
            } catch (IOException e) {
                throw new ProcessorStateException(String.format("%sError while deleting the checkpoint file", this.logPrefix), e);
            }
        }
        initializeMetadata();
    }

    public boolean isProcessable(long j) {
        if (this.partitionGroup.allPartitionsBuffered()) {
            this.idleStartTime = -1L;
            return true;
        }
        if (this.partitionGroup.numBuffered() <= 0) {
            this.idleStartTime = -1L;
            return false;
        }
        if (this.idleStartTime == -1) {
            this.idleStartTime = j;
        }
        if (j - this.idleStartTime < this.maxTaskIdleMs) {
            return false;
        }
        this.enforcedProcessingSensor.record();
        return true;
    }

    public boolean process() {
        StampedRecord nextRecord = this.partitionGroup.nextRecord(this.recordInfo);
        try {
            if (nextRecord == null) {
                return false;
            }
            try {
                ProcessorNode node = this.recordInfo.node();
                TopicPartition partition = this.recordInfo.partition();
                this.log.trace("Start processing one record [{}]", nextRecord);
                updateProcessorContext(nextRecord, node);
                StreamsMetricsImpl.maybeMeasureLatency(() -> {
                    node.process(nextRecord.key(), nextRecord.value());
                }, this.time, this.processLatencySensor);
                this.log.trace("Completed processing one record [{}]", nextRecord);
                this.consumedOffsets.put(partition, Long.valueOf(nextRecord.offset()));
                this.commitNeeded = true;
                if (this.recordInfo.queue().size() == this.maxBufferedSize) {
                    this.consumer.resume(Collections.singleton(partition));
                }
                return true;
            } catch (RecoverableClientException e) {
                throw new TaskMigratedException(this, e);
            } catch (KafkaException e2) {
                throw new StreamsException(String.format("Exception caught in process. taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d, stacktrace=%s", id(), this.processorContext.currentNode().name(), nextRecord.topic(), Integer.valueOf(nextRecord.partition()), Long.valueOf(nextRecord.offset()), getStacktraceString(e2)), e2);
            }
        } finally {
            this.processorContext.setCurrentNode(null);
        }
    }

    /* JADX WARN: Failed to calculate best type for var: r7v1 ??
    java.lang.NullPointerException
     */
    /* JADX WARN: Failed to calculate best type for var: r8v0 ??
    java.lang.NullPointerException
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 7, insn: 0x00aa: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r7 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:53:0x00aa */
    /* JADX WARN: Not initialized variable reg: 8, insn: 0x00ae: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r8 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:55:0x00ae */
    /* JADX WARN: Type inference failed for: r7v1, types: [java.io.StringWriter] */
    /* JADX WARN: Type inference failed for: r8v0, types: [java.lang.Throwable] */
    private String getStacktraceString(KafkaException kafkaException) {
        String str = null;
        try {
            try {
                StringWriter stringWriter = new StringWriter();
                Throwable th = null;
                PrintWriter printWriter = new PrintWriter(stringWriter);
                Throwable th2 = null;
                try {
                    try {
                        kafkaException.printStackTrace(printWriter);
                        str = stringWriter.toString();
                        if (printWriter != null) {
                            if (0 != 0) {
                                try {
                                    printWriter.close();
                                } catch (Throwable th3) {
                                    th2.addSuppressed(th3);
                                }
                            } else {
                                printWriter.close();
                            }
                        }
                        if (stringWriter != null) {
                            if (0 != 0) {
                                try {
                                    stringWriter.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                stringWriter.close();
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th5) {
                    if (printWriter != null) {
                        if (th2 != null) {
                            try {
                                printWriter.close();
                            } catch (Throwable th6) {
                                th2.addSuppressed(th6);
                            }
                        } else {
                            printWriter.close();
                        }
                    }
                    throw th5;
                }
            } finally {
            }
        } catch (IOException e) {
            this.log.error("Encountered error extracting stacktrace from this exception", (Throwable) e);
        }
        return str;
    }

    @Override // org.apache.kafka.streams.processor.internals.ProcessorNodePunctuator
    public void punctuate(ProcessorNode processorNode, long j, PunctuationType punctuationType, Punctuator punctuator) {
        if (this.processorContext.currentNode() != null) {
            throw new IllegalStateException(String.format("%sCurrent node is not null", this.logPrefix));
        }
        updateProcessorContext(new StampedRecord(DUMMY_RECORD, j), processorNode);
        if (this.log.isTraceEnabled()) {
            this.log.trace("Punctuating processor {} with timestamp {} and punctuation type {}", processorNode.name(), Long.valueOf(j), punctuationType);
        }
        try {
            try {
                try {
                    StreamsMetricsImpl.maybeMeasureLatency(() -> {
                        processorNode.punctuate(j, punctuator);
                    }, this.time, this.punctuateLatencySensor);
                    this.processorContext.setCurrentNode(null);
                } catch (RecoverableClientException e) {
                    throw new TaskMigratedException(this, e);
                }
            } catch (KafkaException e2) {
                throw new StreamsException(String.format("%sException caught while punctuating processor '%s'", this.logPrefix, processorNode.name()), e2);
            }
        } catch (Throwable th) {
            this.processorContext.setCurrentNode(null);
            throw th;
        }
    }

    private void updateProcessorContext(StampedRecord stampedRecord, ProcessorNode processorNode) {
        this.processorContext.setRecordContext(new ProcessorRecordContext(stampedRecord.timestamp, stampedRecord.offset(), stampedRecord.partition(), stampedRecord.topic(), stampedRecord.headers()));
        this.processorContext.setCurrentNode(processorNode);
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void commit() {
        commit(true, extractPartitionTimes());
    }

    void commit(boolean z, Map<TopicPartition, Long> map) {
        long nanoseconds = this.time.nanoseconds();
        this.log.debug("Committing");
        flushState();
        if (!this.eosEnabled) {
            this.stateMgr.checkpoint(activeTaskCheckpointableOffsets());
        }
        HashMap hashMap = new HashMap(this.consumedOffsets.size());
        Iterator<Map.Entry<TopicPartition, Long>> it = this.consumedOffsets.entrySet().iterator();
        while (it.hasNext()) {
            TopicPartition key = it.next().getKey();
            Long headRecordOffset = this.partitionGroup.headRecordOffset(key);
            if (headRecordOffset == null) {
                try {
                    headRecordOffset = Long.valueOf(this.consumer.position(key));
                } catch (TimeoutException e) {
                    throw new IllegalStateException(e);
                } catch (KafkaException e2) {
                    throw new StreamsException(e2);
                }
            }
            hashMap.put(key, new OffsetAndMetadata(headRecordOffset.longValue(), encodeTimestamp(map.get(key).longValue())));
        }
        try {
            if (this.eosEnabled) {
                this.producer.sendOffsetsToTransaction(hashMap, this.applicationId);
                this.producer.commitTransaction();
                if (z) {
                    this.producer.beginTransaction();
                }
            } else {
                this.consumer.commitSync(hashMap);
            }
        } catch (CommitFailedException | ProducerFencedException | UnknownProducerIdException e3) {
            throw new TaskMigratedException(this, e3);
        } catch (RebalanceInProgressException e4) {
            this.log.info("Committing failed with a non-fatal error: {}, we can ignore this since commit may succeed still", e4.toString());
        }
        this.commitNeeded = false;
        this.commitRequested = false;
        this.commitSensor.record(this.time.nanoseconds() - nanoseconds);
    }

    private Map<TopicPartition, Long> activeTaskCheckpointableOffsets() {
        HashMap hashMap = new HashMap(this.recordCollector.offsets());
        for (Map.Entry<TopicPartition, Long> entry : this.consumedOffsets.entrySet()) {
            hashMap.putIfAbsent(entry.getKey(), entry.getValue());
        }
        return hashMap;
    }

    @Override // org.apache.kafka.streams.processor.internals.AbstractTask
    protected void flushState() {
        this.log.trace("Flushing state and producer");
        super.flushState();
        try {
            this.recordCollector.flush();
        } catch (RecoverableClientException e) {
            throw new TaskMigratedException(this, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<TopicPartition, Long> purgableOffsets() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, Long> entry : this.consumedOffsets.entrySet()) {
            TopicPartition key = entry.getKey();
            if (this.topology.isRepartitionTopic(key.topic())) {
                hashMap.put(key, Long.valueOf(entry.getValue().longValue() + 1));
            }
        }
        return hashMap;
    }

    private void initTopology() {
        this.log.trace("Initializing processor nodes of the topology");
        for (ProcessorNode processorNode : this.topology.processors()) {
            this.processorContext.setCurrentNode(processorNode);
            try {
                processorNode.init(this.processorContext);
            } finally {
                this.processorContext.setCurrentNode(null);
            }
        }
    }

    public void suspend() {
        this.log.debug("Suspending");
        suspend(true);
    }

    void suspend(boolean z) {
        Map<TopicPartition, Long> extractPartitionTimes = extractPartitionTimes();
        try {
            closeTopology();
        } catch (RuntimeException e) {
            if (z) {
                throw e;
            }
        }
        if (!z) {
            try {
                this.stateMgr.flush();
            } catch (ProcessorStateException e2) {
            }
            if (this.eosEnabled) {
                closeRecordCollector();
                return;
            }
            return;
        }
        TaskMigratedException taskMigratedException = null;
        try {
            commit(false, extractPartitionTimes);
            this.partitionGroup.clear();
            if (this.eosEnabled) {
                this.stateMgr.checkpoint(activeTaskCheckpointableOffsets());
                taskMigratedException = closeRecordCollector();
            }
            if (taskMigratedException != null) {
                throw taskMigratedException;
            }
        } catch (Throwable th) {
            this.partitionGroup.clear();
            if (this.eosEnabled) {
                this.stateMgr.checkpoint(activeTaskCheckpointableOffsets());
                closeRecordCollector();
            }
            throw th;
        }
    }

    private TaskMigratedException closeRecordCollector() {
        TaskMigratedException taskMigratedException = null;
        try {
            this.recordCollector.close();
        } catch (RecoverableClientException e) {
            taskMigratedException = new TaskMigratedException(this, e);
        } catch (Throwable th) {
            this.log.error("Failed to close producer due to the following error:", th);
        } finally {
            this.producer = null;
        }
        return taskMigratedException;
    }

    private void closeTopology() {
        this.log.trace("Closing processor topology");
        RuntimeException runtimeException = null;
        if (this.taskInitialized) {
            for (ProcessorNode processorNode : this.topology.processors()) {
                this.processorContext.setCurrentNode(processorNode);
                try {
                    processorNode.close();
                    this.processorContext.setCurrentNode(null);
                } catch (RuntimeException e) {
                    runtimeException = e;
                    this.processorContext.setCurrentNode(null);
                } catch (Throwable th) {
                    this.processorContext.setCurrentNode(null);
                    throw th;
                }
            }
        }
        if (runtimeException != null) {
            throw runtimeException;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeSuspended(boolean z, RuntimeException runtimeException) {
        try {
            closeStateManager(z);
        } catch (RuntimeException e) {
            if (runtimeException == null) {
                runtimeException = e;
            }
            this.log.error("Could not close state manager due to the following error:", (Throwable) e);
        }
        this.partitionGroup.close();
        this.streamsMetrics.removeAllTaskLevelSensors(this.threadId, this.id.toString());
        this.closeTaskSensor.record();
        if (runtimeException != null) {
            throw runtimeException;
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void close(boolean z, boolean z2) {
        this.log.debug("Closing");
        RuntimeException runtimeException = null;
        try {
            suspend(z);
        } catch (RuntimeException e) {
            z = false;
            runtimeException = e;
            this.log.error("Could not close task due to the following error:", (Throwable) e);
        }
        closeSuspended(z, runtimeException);
        this.taskClosed = true;
    }

    public void addRecords(TopicPartition topicPartition, Iterable<ConsumerRecord<byte[], byte[]>> iterable) {
        int addRawRecords = this.partitionGroup.addRawRecords(topicPartition, iterable);
        if (this.log.isTraceEnabled()) {
            this.log.trace("Added records into the buffered queue of partition {}, new queue size is {}", topicPartition, Integer.valueOf(addRawRecords));
        }
        if (addRawRecords > this.maxBufferedSize) {
            this.consumer.pause(Collections.singleton(topicPartition));
        }
    }

    public Cancellable schedule(long j, PunctuationType punctuationType, Punctuator punctuator) {
        switch (punctuationType) {
            case STREAM_TIME:
                return schedule(0L, j, punctuationType, punctuator);
            case WALL_CLOCK_TIME:
                return schedule(this.time.milliseconds() + j, j, punctuationType, punctuator);
            default:
                throw new IllegalArgumentException("Unrecognized PunctuationType: " + punctuationType);
        }
    }

    Cancellable schedule(long j, long j2, PunctuationType punctuationType, Punctuator punctuator) {
        if (this.processorContext.currentNode() == null) {
            throw new IllegalStateException(String.format("%sCurrent node is null", this.logPrefix));
        }
        PunctuationSchedule punctuationSchedule = new PunctuationSchedule(this.processorContext.currentNode(), j, j2, punctuator);
        switch (punctuationType) {
            case STREAM_TIME:
                return this.streamTimePunctuationQueue.schedule(punctuationSchedule);
            case WALL_CLOCK_TIME:
                return this.systemTimePunctuationQueue.schedule(punctuationSchedule);
            default:
                throw new IllegalArgumentException("Unrecognized PunctuationType: " + punctuationType);
        }
    }

    int numBuffered() {
        return this.partitionGroup.numBuffered();
    }

    public boolean maybePunctuateStreamTime() {
        long streamTime = this.partitionGroup.streamTime();
        if (streamTime == -1) {
            return false;
        }
        boolean mayPunctuate = this.streamTimePunctuationQueue.mayPunctuate(streamTime, PunctuationType.STREAM_TIME, this);
        if (mayPunctuate) {
            this.commitNeeded = true;
        }
        return mayPunctuate;
    }

    public boolean maybePunctuateSystemTime() {
        boolean mayPunctuate = this.systemTimePunctuationQueue.mayPunctuate(this.time.milliseconds(), PunctuationType.WALL_CLOCK_TIME, this);
        if (mayPunctuate) {
            this.commitNeeded = true;
        }
        return mayPunctuate;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void requestCommit() {
        this.commitRequested = true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean commitRequested() {
        return this.commitRequested;
    }

    RecordCollector recordCollector() {
        return this.recordCollector;
    }

    long streamTime() {
        return this.partitionGroup.streamTime();
    }

    long partitionTime(TopicPartition topicPartition) {
        return this.partitionGroup.partitionTimestamp(topicPartition);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Producer<byte[], byte[]> getProducer() {
        return this.producer;
    }

    private void initializeTransactions() {
        try {
            this.producer.initTransactions();
        } catch (TimeoutException e) {
            this.log.error("Timeout exception caught when initializing transactions for task {}. This might happen if the broker is slow to respond, if the network connection to the broker was interrupted, or if similar circumstances arise. You can increase producer parameter `max.block.ms` to increase this timeout.", this.id, e);
            throw new StreamsException(String.format("%sFailed to initialize task %s due to timeout.", this.logPrefix, this.id), e);
        }
    }

    String encodeTimestamp(long j) {
        ByteBuffer allocate = ByteBuffer.allocate(9);
        allocate.put((byte) 1);
        allocate.putLong(j);
        return Base64.getEncoder().encodeToString(allocate.array());
    }

    long decodeTimestamp(String str) {
        if (str.length() == 0) {
            return -1L;
        }
        ByteBuffer wrap = ByteBuffer.wrap(Base64.getDecoder().decode(str));
        byte b = wrap.get();
        switch (b) {
            case 1:
                return wrap.getLong();
            default:
                this.log.warn("Unsupported offset metadata version found. Supported version {}. Found version {}.", (Object) (byte) 1, (Object) Byte.valueOf(b));
                return -1L;
        }
    }

    private Map<TopicPartition, Long> extractPartitionTimes() {
        HashMap hashMap = new HashMap();
        for (TopicPartition topicPartition : this.partitionGroup.partitions()) {
            hashMap.put(topicPartition, Long.valueOf(partitionTime(topicPartition)));
        }
        return hashMap;
    }

    public Map<TopicPartition, Long> restoredOffsets() {
        return this.stateMgr.changelogReader().restoredOffsets();
    }

    public boolean hasRecordsQueued() {
        return numBuffered() > 0;
    }
}
