package org.apache.kafka.coordinator.common.runtime;

import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.coordinator.common.runtime.CoordinatorExecutor;
import org.apache.kafka.coordinator.common.runtime.CoordinatorShard;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImpl.class */
public class CoordinatorExecutorImpl<S extends CoordinatorShard<U>, U> implements CoordinatorExecutor<U> {
    private final Logger log;
    private final TopicPartition shard;
    private final CoordinatorRuntime<S, U> runtime;
    private final ExecutorService executor;
    private final Duration writeTimeout;
    private final Map<String, CoordinatorExecutor.TaskRunnable<?>> tasks = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImpl$TaskResult.class */
    public static class TaskResult<R> {
        final R result;
        final Throwable exception;

        TaskResult(R r, Throwable th) {
            this.result = r;
            this.exception = th;
        }
    }

    public CoordinatorExecutorImpl(LogContext logContext, TopicPartition topicPartition, CoordinatorRuntime<S, U> coordinatorRuntime, ExecutorService executorService, Duration duration) {
        this.log = logContext.logger(CoordinatorExecutorImpl.class);
        this.shard = topicPartition;
        this.runtime = coordinatorRuntime;
        this.executor = executorService;
        this.writeTimeout = duration;
    }

    private <R> TaskResult<R> executeTask(CoordinatorExecutor.TaskRunnable<R> taskRunnable) {
        try {
            return new TaskResult<>(taskRunnable.run(), null);
        } catch (Throwable th) {
            return new TaskResult<>(null, th);
        }
    }

    @Override // org.apache.kafka.coordinator.common.runtime.CoordinatorExecutor
    public <R> boolean schedule(String str, CoordinatorExecutor.TaskRunnable<R> taskRunnable, CoordinatorExecutor.TaskOperation<U, R> taskOperation) {
        if (this.tasks.putIfAbsent(str, taskRunnable) != null) {
            return false;
        }
        this.executor.submit(() -> {
            if (this.tasks.get(str) != taskRunnable) {
                return;
            }
            TaskResult<R> executeTask = executeTask(taskRunnable);
            this.runtime.scheduleWriteOperation(str, this.shard, this.writeTimeout, coordinatorShard -> {
                if (this.tasks.remove(str, taskRunnable)) {
                    return taskOperation.onComplete(executeTask.result, executeTask.exception);
                }
                throw new RejectedExecutionException(String.format("Task %s was overridden or cancelled", str));
            }).exceptionally(th -> {
                this.tasks.remove(str, taskRunnable);
                if (th instanceof RejectedExecutionException) {
                    this.log.debug("The write event for the task {} was not executed because it was cancelled or overridden.", str);
                    return null;
                }
                if ((th instanceof NotCoordinatorException) || (th instanceof CoordinatorLoadInProgressException)) {
                    this.log.debug("The write event for the task {} failed due to {}. Ignoring it because the coordinator is not active.", str, th.getMessage());
                    return null;
                }
                this.log.error("The write event for the task {} failed due to {}. Ignoring it. ", str, th.getMessage());
                return null;
            });
        });
        return true;
    }

    @Override // org.apache.kafka.coordinator.common.runtime.CoordinatorExecutor
    public boolean isScheduled(String str) {
        return this.tasks.containsKey(str);
    }

    @Override // org.apache.kafka.coordinator.common.runtime.CoordinatorExecutor
    public void cancel(String str) {
        this.tasks.remove(str);
    }

    public void cancelAll() {
        Iterator<String> it = this.tasks.keySet().iterator();
        while (it.hasNext()) {
            it.remove();
        }
    }
}
