package org.apache.kafka.streams.processor.internals;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StandbyTask.class */
public class StandbyTask extends AbstractTask implements Task {
    private final Logger log;
    private final String logPrefix;
    private final Sensor closeTaskSensor;
    private final boolean eosEnabled;
    private final InternalProcessorContext processorContext;
    private final StreamsMetricsImpl streamsMetrics;

    /* JADX INFO: Access modifiers changed from: package-private */
    public StandbyTask(TaskId taskId, Set<TopicPartition> set, ProcessorTopology processorTopology, StreamsConfig streamsConfig, StreamsMetricsImpl streamsMetricsImpl, ProcessorStateManager processorStateManager, StateDirectory stateDirectory, ThreadCache threadCache, InternalProcessorContext internalProcessorContext) {
        super(taskId, processorTopology, stateDirectory, processorStateManager, set, streamsConfig.getLong(StreamsConfig.TASK_TIMEOUT_MS_CONFIG).longValue());
        this.processorContext = internalProcessorContext;
        this.streamsMetrics = streamsMetricsImpl;
        internalProcessorContext.transitionToStandby(threadCache);
        this.logPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName()) + String.format("%s [%s] ", "standby-task", taskId);
        this.log = new LogContext(this.logPrefix).logger(getClass());
        this.closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), streamsMetricsImpl);
        this.eosEnabled = StreamThread.eosEnabled(streamsConfig);
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public boolean isActive() {
        return false;
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void initializeIfNeeded() {
        if (state() != Task.State.CREATED) {
            if (state() == Task.State.RESTORING) {
                throw new IllegalStateException("Illegal state " + state() + " while initializing standby task " + this.id);
            }
            return;
        }
        StateManagerUtil.registerStateStores(this.log, this.logPrefix, this.topology, this.stateMgr, this.stateDirectory, this.processorContext);
        this.offsetSnapshotSinceLastFlush = Collections.emptyMap();
        transitionTo(Task.State.RESTORING);
        transitionTo(Task.State.RUNNING);
        this.processorContext.initialize();
        this.log.info("Initialized");
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void completeRestoration() {
        throw new IllegalStateException("Standby task " + this.id + " should never be completing restoration");
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void suspend() {
        switch (state()) {
            case CREATED:
                this.log.info("Suspended created");
                transitionTo(Task.State.SUSPENDED);
                return;
            case RUNNING:
                this.log.info("Suspended running");
                transitionTo(Task.State.SUSPENDED);
                return;
            case SUSPENDED:
                this.log.info("Skip suspending since state is {}", state());
                return;
            case RESTORING:
            case CLOSED:
                throw new IllegalStateException("Illegal state " + state() + " while suspending standby task " + this.id);
            default:
                throw new IllegalStateException("Unknown state " + state() + " while suspending standby task " + this.id);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void resume() {
        if (state() == Task.State.RESTORING) {
            throw new IllegalStateException("Illegal state " + state() + " while resuming standby task " + this.id);
        }
        this.log.trace("No-op resume with state {}", state());
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
        switch (state()) {
            case CREATED:
                this.log.debug("Skipped preparing created task for commit");
                break;
            case RUNNING:
            case SUSPENDED:
                this.log.debug("Prepared {} task for committing", state());
                break;
            default:
                throw new IllegalStateException("Illegal state " + state() + " while preparing standby task " + this.id + " for committing ");
        }
        return Collections.emptyMap();
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void postCommit(boolean z) {
        switch (state()) {
            case CREATED:
                this.log.debug("Skipped writing checkpoint for created task");
                return;
            case RUNNING:
            case SUSPENDED:
                maybeWriteCheckpoint(z);
                this.log.debug("Finalized commit for {} task", state());
                return;
            default:
                throw new IllegalStateException("Illegal state " + state() + " while post committing standby task " + this.id);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void closeClean() {
        this.streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), this.id.toString());
        close(true);
        this.log.info("Closed clean");
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void closeDirty() {
        this.streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), this.id.toString());
        close(false);
        this.log.info("Closed dirty");
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void closeCleanAndRecycleState() {
        this.streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), this.id.toString());
        if (state() != Task.State.SUSPENDED) {
            throw new IllegalStateException("Illegal state " + state() + " while closing standby task " + this.id);
        }
        this.stateMgr.recycle();
        this.closeTaskSensor.record();
        transitionTo(Task.State.CLOSED);
        this.log.info("Closed clean and recycled state");
    }

    private void close(boolean z) {
        switch (state()) {
            case CREATED:
            case RUNNING:
            case RESTORING:
                throw new IllegalStateException("Illegal state " + state() + " while closing standby task " + this.id);
            case SUSPENDED:
                TaskManager.executeAndMaybeSwallow(z, () -> {
                    StateManagerUtil.closeStateManager(this.log, this.logPrefix, z, this.eosEnabled, this.stateMgr, this.stateDirectory, Task.TaskType.STANDBY);
                }, "state manager close", this.log);
                this.closeTaskSensor.record();
                transitionTo(Task.State.CLOSED);
                return;
            case CLOSED:
                this.log.trace("Skip closing since state is {}", state());
                return;
            default:
                throw new IllegalStateException("Unknown state " + state() + " while closing standby task " + this.id);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public boolean commitNeeded() {
        return StateManagerUtil.checkpointNeeded(false, this.offsetSnapshotSinceLastFlush, this.stateMgr.changelogOffsets());
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public Map<TopicPartition, Long> changelogOffsets() {
        return Collections.unmodifiableMap(this.stateMgr.changelogOffsets());
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void addRecords(TopicPartition topicPartition, Iterable<ConsumerRecord<byte[], byte[]>> iterable) {
        throw new IllegalStateException("Attempted to add records to task " + id() + " for invalid input partition " + topicPartition);
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void maybeInitTaskTimeoutOrThrow(long j, TimeoutException timeoutException) throws StreamsException {
        maybeInitTaskTimeoutOrThrow(j, timeoutException, this.log);
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void clearTaskTimeout() {
        clearTaskTimeout(this.log);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public InternalProcessorContext processorContext() {
        return this.processorContext;
    }

    public String toString() {
        return toString("");
    }

    public String toString(String str) {
        StringBuilder sb = new StringBuilder();
        sb.append(str);
        sb.append("TaskId: ");
        sb.append(this.id);
        sb.append(StringUtils.LF);
        if (this.topology != null) {
            sb.append(str).append(this.topology.toString(str + "\t"));
        }
        return sb.toString();
    }
}
