package org.apache.kafka.streams.processor.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import net.sourceforge.argparse4j.ArgumentParsers;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread.class */
public class StreamThread extends Thread {
    private final Time time;
    private final Logger log;
    private final String logPrefix;
    private final Object stateLock;
    private final Duration pollTime;
    private final long commitTimeMs;
    private final int maxPollTimeMs;
    private final String originalReset;
    private final TaskManager taskManager;
    private final AtomicInteger assignmentErrorCode;
    private final StreamsMetricsImpl streamsMetrics;
    private final Sensor commitSensor;
    private final Sensor pollSensor;
    private final Sensor punctuateSensor;
    private final Sensor processSensor;
    private long now;
    private long lastPollMs;
    private long lastCommitMs;
    private int numIterations;
    private Throwable rebalanceException;
    private boolean processStandbyRecords;
    private volatile State state;
    private volatile ThreadMetadata threadMetadata;
    private StateListener stateListener;
    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
    final ConsumerRebalanceListener rebalanceListener;
    final Producer<byte[], byte[]> producer;
    final Consumer<byte[], byte[]> restoreConsumer;
    final Consumer<byte[], byte[]> consumer;
    final InternalTopologyBuilder builder;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread$AbstractTaskCreator.class */
    public static abstract class AbstractTaskCreator<T extends Task> {
        final String applicationId;
        final InternalTopologyBuilder builder;
        final StreamsConfig config;
        final StreamsMetricsImpl streamsMetrics;
        final StateDirectory stateDirectory;
        final ChangelogReader storeChangelogReader;
        final Time time;
        final Logger log;

        AbstractTaskCreator(InternalTopologyBuilder internalTopologyBuilder, StreamsConfig streamsConfig, StreamsMetricsImpl streamsMetricsImpl, StateDirectory stateDirectory, ChangelogReader changelogReader, Time time, Logger logger) {
            this.applicationId = streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG);
            this.builder = internalTopologyBuilder;
            this.config = streamsConfig;
            this.streamsMetrics = streamsMetricsImpl;
            this.stateDirectory = stateDirectory;
            this.storeChangelogReader = changelogReader;
            this.time = time;
            this.log = logger;
        }

        public InternalTopologyBuilder builder() {
            return this.builder;
        }

        public StateDirectory stateDirectory() {
            return this.stateDirectory;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Collection<T> createTasks(Consumer<byte[], byte[]> consumer, Map<TaskId, Set<TopicPartition>> map) {
            ArrayList arrayList = new ArrayList();
            for (Map.Entry<TaskId, Set<TopicPartition>> entry : map.entrySet()) {
                TaskId key = entry.getKey();
                Set<TopicPartition> value = entry.getValue();
                T createTask = createTask(consumer, key, value);
                if (createTask != null) {
                    this.log.trace("Created task {} with assigned partitions {}", key, value);
                    arrayList.add(createTask);
                }
            }
            return arrayList;
        }

        abstract T createTask(Consumer<byte[], byte[]> consumer, TaskId taskId, Set<TopicPartition> set);

        public void close() {
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread$InternalConsumerConfig.class */
    private static final class InternalConsumerConfig extends ConsumerConfig {
        private InternalConsumerConfig(Map<String, Object> map) {
            super(ConsumerConfig.addDeserializerToConfig(map, new ByteArrayDeserializer(), new ByteArrayDeserializer()), false);
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread$StandbyTaskCreator.class */
    static class StandbyTaskCreator extends AbstractTaskCreator<StandbyTask> {
        private final Sensor createTaskSensor;

        StandbyTaskCreator(InternalTopologyBuilder internalTopologyBuilder, StreamsConfig streamsConfig, StreamsMetricsImpl streamsMetricsImpl, StateDirectory stateDirectory, ChangelogReader changelogReader, Time time, String str, Logger logger) {
            super(internalTopologyBuilder, streamsConfig, streamsMetricsImpl, stateDirectory, changelogReader, time, logger);
            this.createTaskSensor = ThreadMetrics.createTaskSensor(str, streamsMetricsImpl);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.kafka.streams.processor.internals.StreamThread.AbstractTaskCreator
        StandbyTask createTask(Consumer<byte[], byte[]> consumer, TaskId taskId, Set<TopicPartition> set) {
            this.createTaskSensor.record();
            ProcessorTopology build = this.builder.build(Integer.valueOf(taskId.topicGroupId));
            if (!build.stateStores().isEmpty() && !build.storeToChangelogTopic().isEmpty()) {
                return new StandbyTask(taskId, set, build, consumer, this.storeChangelogReader, this.config, this.streamsMetrics, this.stateDirectory);
            }
            this.log.trace("Skipped standby task {} with assigned partitions {} since it does not have any state stores to materialize", taskId, set);
            return null;
        }

        @Override // org.apache.kafka.streams.processor.internals.StreamThread.AbstractTaskCreator
        /* bridge */ /* synthetic */ StandbyTask createTask(Consumer consumer, TaskId taskId, Set set) {
            return createTask((Consumer<byte[], byte[]>) consumer, taskId, (Set<TopicPartition>) set);
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread$State.class */
    public enum State implements ThreadStateTransitionValidator {
        CREATED(1, 5),
        STARTING(2, 3, 5),
        PARTITIONS_REVOKED(2, 3, 5),
        PARTITIONS_ASSIGNED(2, 3, 4, 5),
        RUNNING(2, 3, 5),
        PENDING_SHUTDOWN(6),
        DEAD(new Integer[0]);

        private final Set<Integer> validTransitions = new HashSet();

        State(Integer... numArr) {
            this.validTransitions.addAll(Arrays.asList(numArr));
        }

        public boolean isAlive() {
            return equals(RUNNING) || equals(STARTING) || equals(PARTITIONS_REVOKED) || equals(PARTITIONS_ASSIGNED);
        }

        @Override // org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator
        public boolean isValidTransition(ThreadStateTransitionValidator threadStateTransitionValidator) {
            return this.validTransitions.contains(Integer.valueOf(((State) threadStateTransitionValidator).ordinal()));
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread$StateListener.class */
    public interface StateListener {
        void onChange(Thread thread, ThreadStateTransitionValidator threadStateTransitionValidator, ThreadStateTransitionValidator threadStateTransitionValidator2);
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread$TaskCreator.class */
    static class TaskCreator extends AbstractTaskCreator<StreamTask> {
        private final ThreadCache cache;
        private final KafkaClientSupplier clientSupplier;
        private final String threadId;
        private final Producer<byte[], byte[]> threadProducer;
        private final Sensor createTaskSensor;

        TaskCreator(InternalTopologyBuilder internalTopologyBuilder, StreamsConfig streamsConfig, StreamsMetricsImpl streamsMetricsImpl, StateDirectory stateDirectory, ChangelogReader changelogReader, ThreadCache threadCache, Time time, KafkaClientSupplier kafkaClientSupplier, Producer<byte[], byte[]> producer, String str, Logger logger) {
            super(internalTopologyBuilder, streamsConfig, streamsMetricsImpl, stateDirectory, changelogReader, time, logger);
            this.cache = threadCache;
            this.clientSupplier = kafkaClientSupplier;
            this.threadProducer = producer;
            this.threadId = str;
            this.createTaskSensor = ThreadMetrics.createTaskSensor(str, streamsMetricsImpl);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.kafka.streams.processor.internals.StreamThread.AbstractTaskCreator
        StreamTask createTask(Consumer<byte[], byte[]> consumer, TaskId taskId, Set<TopicPartition> set) {
            this.createTaskSensor.record();
            return new StreamTask(taskId, set, this.builder.build(Integer.valueOf(taskId.topicGroupId)), consumer, this.storeChangelogReader, this.config, this.streamsMetrics, this.stateDirectory, this.cache, this.time, () -> {
                return createProducer(taskId);
            });
        }

        private Producer<byte[], byte[]> createProducer(TaskId taskId) {
            if (this.threadProducer != null) {
                return this.threadProducer;
            }
            Map<String, Object> producerConfigs = this.config.getProducerConfigs(StreamThread.getTaskProducerClientId(this.threadId, taskId));
            this.log.info("Creating producer client for task {}", taskId);
            producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, this.applicationId + ArgumentParsers.DEFAULT_PREFIX_CHARS + taskId);
            return this.clientSupplier.getProducer(producerConfigs);
        }

        @Override // org.apache.kafka.streams.processor.internals.StreamThread.AbstractTaskCreator
        public void close() {
            if (this.threadProducer != null) {
                try {
                    this.threadProducer.close();
                } catch (Throwable th) {
                    this.log.error("Failed to close producer due to the following error:", th);
                }
            }
        }

        @Override // org.apache.kafka.streams.processor.internals.StreamThread.AbstractTaskCreator
        /* bridge */ /* synthetic */ StreamTask createTask(Consumer consumer, TaskId taskId, Set set) {
            return createTask((Consumer<byte[], byte[]>) consumer, taskId, (Set<TopicPartition>) set);
        }
    }

    public void setStateListener(StateListener stateListener) {
        this.stateListener = stateListener;
    }

    public State state() {
        return this.state;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public State setState(State state) {
        synchronized (this.stateLock) {
            State state2 = this.state;
            if (this.state == State.PENDING_SHUTDOWN && state != State.DEAD) {
                this.log.debug("Ignoring request to transit from PENDING_SHUTDOWN to {}: only DEAD state is a valid next state", state);
                return null;
            }
            if (this.state == State.DEAD) {
                this.log.debug("Ignoring request to transit from DEAD to {}: no valid next state after DEAD", state);
                return null;
            }
            if (!this.state.isValidTransition(state)) {
                this.log.error("Unexpected state transition from {} to {}", state2, state);
                throw new StreamsException(this.logPrefix + "Unexpected state transition from " + state2 + " to " + state);
            }
            this.log.info("State transition from {} to {}", state2, state);
            this.state = state;
            if (state == State.RUNNING) {
                updateThreadMetadata(this.taskManager.activeTasks(), this.taskManager.standbyTasks());
            } else {
                updateThreadMetadata(Collections.emptyMap(), Collections.emptyMap());
            }
            if (this.stateListener != null) {
                this.stateListener.onChange(this, this.state, state2);
            }
            return state2;
        }
    }

    public boolean isRunning() {
        boolean isAlive;
        synchronized (this.stateLock) {
            isAlive = this.state.isAlive();
        }
        return isAlive;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getAssignmentErrorCode() {
        return this.assignmentErrorCode.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setRebalanceException(Throwable th) {
        this.rebalanceException = th;
    }

    public static StreamThread create(InternalTopologyBuilder internalTopologyBuilder, StreamsConfig streamsConfig, KafkaClientSupplier kafkaClientSupplier, Admin admin, UUID uuid, String str, StreamsMetricsImpl streamsMetricsImpl, Time time, StreamsMetadataState streamsMetadataState, long j, StateDirectory stateDirectory, StateRestoreListener stateRestoreListener, int i) {
        String str2 = str + "-StreamThread-" + i;
        String format = String.format("stream-thread [%s] ", str2);
        LogContext logContext = new LogContext(format);
        Logger logger = logContext.logger(StreamThread.class);
        logger.info("Creating restore consumer client");
        Consumer<byte[], byte[]> restoreConsumer = kafkaClientSupplier.getRestoreConsumer(streamsConfig.getRestoreConsumerConfigs(getRestoreConsumerClientId(str2)));
        StoreChangelogReader storeChangelogReader = new StoreChangelogReader(restoreConsumer, Duration.ofMillis(streamsConfig.getLong(StreamsConfig.POLL_MS_CONFIG).longValue()), stateRestoreListener, logContext);
        Producer<byte[], byte[]> producer = null;
        if (!StreamsConfig.EXACTLY_ONCE.equals(streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))) {
            Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs(getThreadProducerClientId(str2));
            logger.info("Creating shared producer client");
            producer = kafkaClientSupplier.getProducer(producerConfigs);
        }
        TaskManager taskManager = new TaskManager(storeChangelogReader, uuid, format, restoreConsumer, streamsMetadataState, new TaskCreator(internalTopologyBuilder, streamsConfig, streamsMetricsImpl, stateDirectory, storeChangelogReader, new ThreadCache(logContext, j, streamsMetricsImpl), time, kafkaClientSupplier, producer, str2, logger), new StandbyTaskCreator(internalTopologyBuilder, streamsConfig, streamsMetricsImpl, stateDirectory, storeChangelogReader, time, str2, logger), admin, new AssignedStreamsTasks(logContext), new AssignedStandbyTasks(logContext));
        logger.info("Creating consumer client");
        Map<String, Object> mainConsumerConfigs = streamsConfig.getMainConsumerConfigs(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG), getConsumerClientId(str2), i);
        mainConsumerConfigs.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager);
        AtomicInteger atomicInteger = new AtomicInteger();
        mainConsumerConfigs.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE, atomicInteger);
        String str3 = null;
        if (!internalTopologyBuilder.latestResetTopicsPattern().pattern().equals("") || !internalTopologyBuilder.earliestResetTopicsPattern().pattern().equals("")) {
            str3 = (String) mainConsumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
            mainConsumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
        }
        Consumer<byte[], byte[]> consumer = kafkaClientSupplier.getConsumer(mainConsumerConfigs);
        taskManager.setConsumer(consumer);
        return new StreamThread(time, streamsConfig, producer, restoreConsumer, consumer, str3, taskManager, streamsMetricsImpl, internalTopologyBuilder, str2, logContext, atomicInteger).updateThreadMetadata(getSharedAdminClientId(str));
    }

    public StreamThread(Time time, StreamsConfig streamsConfig, Producer<byte[], byte[]> producer, Consumer<byte[], byte[]> consumer, Consumer<byte[], byte[]> consumer2, String str, TaskManager taskManager, StreamsMetricsImpl streamsMetricsImpl, InternalTopologyBuilder internalTopologyBuilder, String str2, LogContext logContext, AtomicInteger atomicInteger) {
        super(str2);
        this.rebalanceException = null;
        this.processStandbyRecords = false;
        this.state = State.CREATED;
        this.stateLock = new Object();
        this.standbyRecords = new HashMap();
        this.streamsMetrics = streamsMetricsImpl;
        this.commitSensor = ThreadMetrics.commitSensor(str2, streamsMetricsImpl);
        this.pollSensor = ThreadMetrics.pollSensor(str2, streamsMetricsImpl);
        this.processSensor = ThreadMetrics.processSensor(str2, streamsMetricsImpl);
        this.punctuateSensor = ThreadMetrics.punctuateSensor(str2, streamsMetricsImpl);
        ThreadMetrics.createTaskSensor(str2, streamsMetricsImpl);
        ThreadMetrics.closeTaskSensor(str2, streamsMetricsImpl);
        if (streamsMetricsImpl.version() == StreamsMetricsImpl.Version.FROM_0100_TO_24) {
            ThreadMetrics.skipRecordSensor(str2, streamsMetricsImpl);
            ThreadMetrics.commitOverTasksSensor(str2, streamsMetricsImpl);
        }
        this.time = time;
        this.builder = internalTopologyBuilder;
        this.logPrefix = logContext.logPrefix();
        this.log = logContext.logger(StreamThread.class);
        this.rebalanceListener = new StreamsRebalanceListener(time, taskManager, this, this.log);
        this.taskManager = taskManager;
        this.producer = producer;
        this.restoreConsumer = consumer;
        this.consumer = consumer2;
        this.originalReset = str;
        this.assignmentErrorCode = atomicInteger;
        this.pollTime = Duration.ofMillis(streamsConfig.getLong(StreamsConfig.POLL_MS_CONFIG).longValue());
        this.maxPollTimeMs = new InternalConsumerConfig(streamsConfig.getMainConsumerConfigs("dummyGroupId", "dummyClientId", 1)).getInt("max.poll.interval.ms").intValue();
        this.commitTimeMs = streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG).longValue();
        this.numIterations = 1;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String getTaskProducerClientId(String str, TaskId taskId) {
        return str + ArgumentParsers.DEFAULT_PREFIX_CHARS + taskId + "-producer";
    }

    private static String getThreadProducerClientId(String str) {
        return str + "-producer";
    }

    private static String getConsumerClientId(String str) {
        return str + "-consumer";
    }

    private static String getRestoreConsumerClientId(String str) {
        return str + "-restore-consumer";
    }

    public static String getSharedAdminClientId(String str) {
        return str + "-admin";
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        this.log.info("Starting");
        if (setState(State.STARTING) == null) {
            this.log.info("StreamThread already shutdown. Not running");
            return;
        }
        boolean z = false;
        try {
            try {
                runLoop();
                z = true;
                completeShutdown(true);
            } catch (KafkaException e) {
                this.log.error("Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:", (Throwable) e);
                throw e;
            } catch (Exception e2) {
                this.log.error("Encountered the following error during processing:", (Throwable) e2);
                throw e2;
            }
        } catch (Throwable th) {
            completeShutdown(z);
            throw th;
        }
    }

    private void runLoop() {
        subscribeConsumer();
        while (isRunning()) {
            try {
                runOnce();
                if (this.assignmentErrorCode.get() == AssignorError.VERSION_PROBING.code()) {
                    this.log.info("Version probing detected. Triggering new rebalance.");
                    this.assignmentErrorCode.set(AssignorError.NONE.code());
                    enforceRebalance();
                }
            } catch (TaskMigratedException e) {
                this.log.warn("Detected task " + e.migratedTask().id() + " that got migrated to another thread. This implies that this thread missed a rebalance and dropped out of the consumer group. Will try to rejoin the consumer group. Below is the detailed description of the task:\n" + e.migratedTask().toString(">"), (Throwable) e);
                enforceRebalance();
            }
        }
    }

    private void enforceRebalance() {
        this.consumer.unsubscribe();
        subscribeConsumer();
    }

    private void subscribeConsumer() {
        if (this.builder.usesPatternSubscription()) {
            this.consumer.subscribe(this.builder.sourceTopicPattern(), this.rebalanceListener);
        } else {
            this.consumer.subscribe(this.builder.sourceTopicCollection(), this.rebalanceListener);
        }
    }

    void runOnce() {
        ConsumerRecords<byte[], byte[]> pollRequests;
        this.now = this.time.milliseconds();
        if (this.state == State.PARTITIONS_ASSIGNED) {
            pollRequests = pollRequests(Duration.ZERO);
        } else if (this.state == State.PARTITIONS_REVOKED) {
            pollRequests = pollRequests(Duration.ZERO);
        } else {
            if (this.state != State.RUNNING && this.state != State.STARTING) {
                this.log.error("Unexpected state {} during normal iteration", this.state);
                throw new StreamsException(this.logPrefix + "Unexpected state " + this.state + " during normal iteration");
            }
            pollRequests = pollRequests(this.pollTime);
        }
        long advanceNowAndComputeLatency = advanceNowAndComputeLatency();
        if (pollRequests != null && !pollRequests.isEmpty()) {
            this.pollSensor.record(advanceNowAndComputeLatency, this.now);
            addRecordsToTasks(pollRequests);
        }
        if (!isRunning()) {
            this.log.debug("State already transits to {}, skipping the run once call after poll request", this.state);
            return;
        }
        if (this.state == State.PARTITIONS_ASSIGNED && this.taskManager.updateNewAndRestoringTasks()) {
            setState(State.RUNNING);
        }
        advanceNowAndComputeLatency();
        if (this.taskManager.hasActiveRunningTasks()) {
            int i = 0;
            while (true) {
                for (int i2 = 0; i2 < this.numIterations; i2++) {
                    i = this.taskManager.process(this.now);
                    if (i <= 0) {
                        break;
                    }
                    this.processSensor.record(advanceNowAndComputeLatency() / i, this.now);
                    int maybeCommitActiveTasksPerUserRequested = this.taskManager.maybeCommitActiveTasksPerUserRequested();
                    if (maybeCommitActiveTasksPerUserRequested > 0) {
                        this.commitSensor.record(advanceNowAndComputeLatency() / maybeCommitActiveTasksPerUserRequested, this.now);
                    }
                }
                long max = Math.max(this.now - this.lastPollMs, 0L);
                if (maybePunctuate() || maybeCommit()) {
                    this.numIterations = this.numIterations > 1 ? this.numIterations / 2 : this.numIterations;
                } else if (max > this.maxPollTimeMs / 2) {
                    this.numIterations = this.numIterations > 1 ? this.numIterations / 2 : this.numIterations;
                } else if (i > 0) {
                    this.numIterations++;
                }
                if (i <= 0) {
                    break;
                }
            }
        }
        maybeUpdateStandbyTasks();
        maybeCommit();
    }

    private ConsumerRecords<byte[], byte[]> pollRequests(Duration duration) {
        ConsumerRecords<byte[], byte[]> consumerRecords = null;
        this.lastPollMs = this.now;
        try {
            consumerRecords = this.consumer.poll(duration);
        } catch (InvalidOffsetException e) {
            resetInvalidOffsets(e);
        }
        if (this.rebalanceException == null) {
            return consumerRecords;
        }
        if (this.rebalanceException instanceof TaskMigratedException) {
            throw ((TaskMigratedException) this.rebalanceException);
        }
        throw new StreamsException(this.logPrefix + "Failed to rebalance.", this.rebalanceException);
    }

    private void resetInvalidOffsets(InvalidOffsetException invalidOffsetException) {
        Set<TopicPartition> partitions = invalidOffsetException.partitions();
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        HashSet hashSet3 = new HashSet();
        for (TopicPartition topicPartition : partitions) {
            if (this.builder.earliestResetTopicsPattern().matcher(topicPartition.topic()).matches()) {
                addToResetList(topicPartition, hashSet2, "Setting topic '{}' to consume from {} offset", "earliest", hashSet);
            } else if (this.builder.latestResetTopicsPattern().matcher(topicPartition.topic()).matches()) {
                addToResetList(topicPartition, hashSet3, "Setting topic '{}' to consume from {} offset", StreamsConfig.METRICS_LATEST, hashSet);
            } else {
                if (this.originalReset == null || !(this.originalReset.equals("earliest") || this.originalReset.equals(StreamsConfig.METRICS_LATEST))) {
                    throw new StreamsException(String.format("No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured. You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset policy via StreamsBuilder#stream(..., Consumed.with(Topology.AutoOffsetReset)) or StreamsBuilder#table(..., Consumed.with(Topology.AutoOffsetReset))", topicPartition.topic(), Integer.valueOf(topicPartition.partition())), invalidOffsetException);
                }
                if (this.originalReset.equals("earliest")) {
                    addToResetList(topicPartition, hashSet2, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "earliest", hashSet);
                } else if (this.originalReset.equals(StreamsConfig.METRICS_LATEST)) {
                    addToResetList(topicPartition, hashSet3, "No custom setting defined for topic '{}' using original config '{}' for offset reset", StreamsConfig.METRICS_LATEST, hashSet);
                }
            }
        }
        if (!hashSet2.isEmpty()) {
            this.consumer.seekToBeginning(hashSet2);
        }
        if (hashSet3.isEmpty()) {
            return;
        }
        this.consumer.seekToEnd(hashSet3);
    }

    private void addToResetList(TopicPartition topicPartition, Set<TopicPartition> set, String str, String str2, Set<String> set2) {
        String str3 = topicPartition.topic();
        if (set2.add(str3)) {
            this.log.info(str, str3, str2);
        }
        set.add(topicPartition);
    }

    private void addRecordsToTasks(ConsumerRecords<byte[], byte[]> consumerRecords) {
        for (TopicPartition topicPartition : consumerRecords.partitions()) {
            StreamTask activeTask = this.taskManager.activeTask(topicPartition);
            if (activeTask == null) {
                if (isRunning()) {
                    this.log.error("Unable to locate active task for received-record partition {}. Current tasks: {}", topicPartition, this.taskManager.toString(">"));
                    throw new NullPointerException("Task was unexpectedly missing for partition " + topicPartition);
                }
                this.log.info("State already transits to {}, skipping the add records to non-existing task for partition {}", this.state, topicPartition);
            } else {
                if (activeTask.isClosed()) {
                    this.log.info("Stream task {} is already closed, probably because it got unexpectedly migrated to another thread already. Notifying the thread to trigger a new rebalance immediately.", activeTask.id());
                    throw new TaskMigratedException(activeTask);
                }
                activeTask.addRecords(topicPartition, consumerRecords.records(topicPartition));
            }
        }
    }

    private boolean maybePunctuate() {
        int punctuate = this.taskManager.punctuate();
        if (punctuate > 0) {
            this.punctuateSensor.record(advanceNowAndComputeLatency() / punctuate, this.now);
        }
        return punctuate > 0;
    }

    boolean maybeCommit() {
        int maybeCommitActiveTasksPerUserRequested;
        if (this.now - this.lastCommitMs > this.commitTimeMs) {
            if (this.log.isTraceEnabled()) {
                this.log.trace("Committing all active tasks {} and standby tasks {} since {}ms has elapsed (commit interval is {}ms)", this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds(), Long.valueOf(this.now - this.lastCommitMs), Long.valueOf(this.commitTimeMs));
            }
            maybeCommitActiveTasksPerUserRequested = this.taskManager.commitAll();
            if (maybeCommitActiveTasksPerUserRequested > 0) {
                long advanceNowAndComputeLatency = advanceNowAndComputeLatency();
                this.commitSensor.record(advanceNowAndComputeLatency / maybeCommitActiveTasksPerUserRequested, this.now);
                this.taskManager.maybePurgeCommitedRecords();
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Committed all active tasks {} and standby tasks {} in {}ms", this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds(), Long.valueOf(advanceNowAndComputeLatency));
                }
            }
            if (maybeCommitActiveTasksPerUserRequested == -1) {
                this.log.trace("Unable to commit as we are in the middle of a rebalance, will try again when it completes.");
            } else {
                this.lastCommitMs = this.now;
            }
            this.processStandbyRecords = true;
        } else {
            maybeCommitActiveTasksPerUserRequested = this.taskManager.maybeCommitActiveTasksPerUserRequested();
            if (maybeCommitActiveTasksPerUserRequested > 0) {
                this.commitSensor.record(advanceNowAndComputeLatency() / maybeCommitActiveTasksPerUserRequested, this.now);
            }
        }
        return maybeCommitActiveTasksPerUserRequested > 0;
    }

    private void maybeUpdateStandbyTasks() {
        if (this.state == State.RUNNING && this.taskManager.hasStandbyRunningTasks()) {
            if (this.processStandbyRecords) {
                if (!this.standbyRecords.isEmpty()) {
                    HashMap hashMap = new HashMap();
                    for (Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> entry : this.standbyRecords.entrySet()) {
                        TopicPartition key = entry.getKey();
                        List<ConsumerRecord<byte[], byte[]>> value = entry.getValue();
                        if (value != null) {
                            StandbyTask standbyTask = this.taskManager.standbyTask(key);
                            if (standbyTask.isClosed()) {
                                this.log.info("Standby task {} is already closed, probably because it got unexpectedly migrated to another thread already. Notifying the thread to trigger a new rebalance immediately.", standbyTask.id());
                                throw new TaskMigratedException(standbyTask);
                            }
                            List<ConsumerRecord<byte[], byte[]>> update = standbyTask.update(key, value);
                            if (update.isEmpty()) {
                                this.restoreConsumer.resume(Collections.singleton(key));
                            } else {
                                hashMap.put(key, update);
                            }
                        }
                    }
                    this.standbyRecords = hashMap;
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Updated standby tasks {} in {}ms", this.taskManager.standbyTaskIds(), Long.valueOf(this.time.milliseconds() - this.now));
                    }
                }
                this.processStandbyRecords = false;
            }
            try {
                ConsumerRecords<byte[], byte[]> poll = this.restoreConsumer.poll(Duration.ZERO);
                if (!poll.isEmpty()) {
                    for (TopicPartition topicPartition : poll.partitions()) {
                        StandbyTask standbyTask2 = this.taskManager.standbyTask(topicPartition);
                        if (standbyTask2 == null) {
                            throw new StreamsException(this.logPrefix + "Missing standby task for partition " + topicPartition);
                        }
                        if (standbyTask2.isClosed()) {
                            this.log.info("Standby task {} is already closed, probably because it got unexpectedly migrated to another thread already. Notifying the thread to trigger a new rebalance immediately.", standbyTask2.id());
                            throw new TaskMigratedException(standbyTask2);
                        }
                        List<ConsumerRecord<byte[], byte[]>> update2 = standbyTask2.update(topicPartition, poll.records(topicPartition));
                        if (!update2.isEmpty()) {
                            this.restoreConsumer.pause(Collections.singleton(topicPartition));
                            this.standbyRecords.put(topicPartition, update2);
                        }
                    }
                }
            } catch (InvalidOffsetException e) {
                this.log.warn("Updating StandbyTasks failed. Deleting StandbyTasks stores to recreate from scratch.", (Throwable) e);
                Set<TopicPartition> partitions = e.partitions();
                Iterator<TopicPartition> it = partitions.iterator();
                while (it.hasNext()) {
                    StandbyTask standbyTask3 = this.taskManager.standbyTask(it.next());
                    if (standbyTask3.isClosed()) {
                        this.log.info("Standby task {} is already closed, probably because it got unexpectedly migrated to another thread already. Notifying the thread to trigger a new rebalance immediately.", standbyTask3.id());
                        throw new TaskMigratedException(standbyTask3);
                    }
                    this.log.info("Reinitializing StandbyTask {} from changelogs {}", standbyTask3, e.partitions());
                    standbyTask3.reinitializeStateStoresForPartitions(e.partitions());
                }
                this.restoreConsumer.seekToBeginning(partitions);
            }
            advanceNowAndComputeLatency();
        }
    }

    private long advanceNowAndComputeLatency() {
        long j = this.now;
        this.now = this.time.milliseconds();
        return Math.max(this.now - j, 0L);
    }

    public void shutdown() {
        this.log.info("Informed to shut down");
        if (setState(State.PENDING_SHUTDOWN) == State.CREATED) {
            completeShutdown(true);
        }
    }

    private void completeShutdown(boolean z) {
        setState(State.PENDING_SHUTDOWN);
        this.log.info("Shutting down");
        try {
            this.taskManager.shutdown(z);
        } catch (Throwable th) {
            this.log.error("Failed to close task manager due to the following error:", th);
        }
        try {
            this.consumer.close();
        } catch (Throwable th2) {
            this.log.error("Failed to close consumer due to the following error:", th2);
        }
        try {
            this.restoreConsumer.close();
        } catch (Throwable th3) {
            this.log.error("Failed to close restore consumer due to the following error:", th3);
        }
        this.streamsMetrics.removeAllThreadLevelSensors(getName());
        setState(State.DEAD);
        this.log.info("Shutdown complete");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void clearStandbyRecords(List<TopicPartition> list) {
        Iterator<TopicPartition> it = list.iterator();
        while (it.hasNext()) {
            this.standbyRecords.remove(it.next());
        }
    }

    public final ThreadMetadata threadMetadata() {
        return this.threadMetadata;
    }

    StreamThread updateThreadMetadata(String str) {
        this.threadMetadata = new ThreadMetadata(getName(), state().name(), getConsumerClientId(getName()), getRestoreConsumerClientId(getName()), this.producer == null ? Collections.emptySet() : Collections.singleton(getThreadProducerClientId(getName())), str, Collections.emptySet(), Collections.emptySet());
        return this;
    }

    private void updateThreadMetadata(Map<TaskId, StreamTask> map, Map<TaskId, StandbyTask> map2) {
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        for (Map.Entry<TaskId, StreamTask> entry : map.entrySet()) {
            hashSet2.add(new TaskMetadata(entry.getKey().toString(), entry.getValue().partitions()));
            hashSet.add(getTaskProducerClientId(getName(), entry.getKey()));
        }
        HashSet hashSet3 = new HashSet();
        for (Map.Entry<TaskId, StandbyTask> entry2 : map2.entrySet()) {
            hashSet3.add(new TaskMetadata(entry2.getKey().toString(), entry2.getValue().partitions()));
        }
        this.threadMetadata = new ThreadMetadata(getName(), state().name(), getConsumerClientId(getName()), getRestoreConsumerClientId(getName()), this.producer == null ? hashSet : Collections.singleton(getThreadProducerClientId(getName())), this.threadMetadata.adminClientId(), hashSet2, hashSet3);
    }

    public Map<TaskId, StreamTask> activeTasks() {
        return this.taskManager.activeTasks();
    }

    public List<StreamTask> allStreamsTasks() {
        return this.taskManager.allStreamsTasks();
    }

    public List<StandbyTask> allStandbyTasks() {
        return this.taskManager.allStandbyTasks();
    }

    public Set<TaskId> restoringTaskIds() {
        return this.taskManager.restoringTaskIds();
    }

    public Map<TaskId, Task> allTasks() {
        TreeMap treeMap = new TreeMap();
        treeMap.putAll(this.taskManager.standbyTasks());
        treeMap.putAll(this.taskManager.activeTasks());
        return treeMap;
    }

    @Override // java.lang.Thread
    public String toString() {
        return toString("");
    }

    public String toString(String str) {
        return str + "\tStreamsThread threadId: " + getName() + StringUtils.LF + this.taskManager.toString(str);
    }

    public Map<MetricName, Metric> producerMetrics() {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        if (this.producer != null) {
            Map<MetricName, ? extends Metric> metrics = this.producer.metrics();
            if (metrics != null) {
                linkedHashMap.putAll(metrics);
            }
        } else {
            Iterator<StreamTask> it = this.taskManager.activeTasks().values().iterator();
            while (it.hasNext()) {
                linkedHashMap.putAll(it.next().getProducer().metrics());
            }
        }
        return linkedHashMap;
    }

    public Map<MetricName, Metric> consumerMetrics() {
        Map<MetricName, ? extends Metric> metrics = this.consumer.metrics();
        Map<MetricName, ? extends Metric> metrics2 = this.restoreConsumer.metrics();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.putAll(metrics);
        linkedHashMap.putAll(metrics2);
        return linkedHashMap;
    }

    public Map<MetricName, Metric> adminClientMetrics() {
        return new LinkedHashMap(this.taskManager.adminClient().metrics());
    }

    void setNow(long j) {
        this.now = j;
    }

    TaskManager taskManager() {
        return this.taskManager;
    }

    Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords() {
        return this.standbyRecords;
    }

    int currentNumIterations() {
        return this.numIterations;
    }

    public StateListener stateListener() {
        return this.stateListener;
    }
}
