package org.apache.james.task;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.stream.Stream;
import org.apache.james.task.Task;
import org.apache.james.task.TaskExecutionDetails;
import org.apache.james.task.TaskManagerWorker;
import org.apache.james.util.MDCBuilder;
import org.apache.james.util.ReactorUtils;
import org.apache.james.util.concurrent.NamedThreadFactory;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/apache/james/task/SerialTaskManagerWorker.class */
public class SerialTaskManagerWorker implements TaskManagerWorker {
    private static final Logger LOGGER = LoggerFactory.getLogger(SerialTaskManagerWorker.class);
    public static final boolean MAY_INTERRUPT_IF_RUNNING = true;
    private final TaskManagerWorker.Listener listener;
    private final Duration pollingInterval;
    private final Scheduler taskExecutor = Schedulers.fromExecutor(Executors.newSingleThreadExecutor(NamedThreadFactory.withName("task executor")));
    private final Scheduler asyncTaskExecutor = Schedulers.fromExecutor(Executors.newCachedThreadPool(NamedThreadFactory.withName("async task executor")));
    private final Set<TaskId> cancelledTasks = Sets.newConcurrentHashSet();
    private final Map<TaskId, CompletableFuture<Task.Result>> runningTasks = Maps.newConcurrentMap();

    public SerialTaskManagerWorker(TaskManagerWorker.Listener listener, Duration duration) {
        this.pollingInterval = duration;
        this.listener = listener;
    }

    @Override // org.apache.james.task.TaskManagerWorker
    public Mono<Task.Result> executeTask(TaskWithId taskWithId) {
        if (this.cancelledTasks.remove(taskWithId.getId())) {
            return Mono.from(this.listener.cancelled(taskWithId.getId(), taskWithId.getTask().details())).doOnTerminate(() -> {
                this.runningTasks.remove(taskWithId.getId());
            }).then(Mono.empty());
        }
        CompletableFuture<Task.Result> future = Mono.fromCallable(() -> {
            return runWithMdc(taskWithId, this.listener);
        }).subscribeOn(schedulerForTask(taskWithId)).toFuture();
        this.runningTasks.put(taskWithId.getId(), future);
        Mono<Task.Result> doOnTerminate = Mono.using(() -> {
            return pollAdditionalInformation(taskWithId).subscribe();
        }, disposable -> {
            return Mono.fromFuture(future).onErrorResume(th -> {
                return Mono.from(handleExecutionError(taskWithId, this.listener, th)).thenReturn(Task.Result.PARTIAL);
            });
        }, (v0) -> {
            v0.dispose();
        }).doOnTerminate(() -> {
            this.runningTasks.remove(taskWithId.getId());
        });
        if (!(taskWithId.getTask() instanceof AsyncSafeTask)) {
            return doOnTerminate;
        }
        doOnTerminate.subscribe();
        return Mono.empty();
    }

    private Scheduler schedulerForTask(TaskWithId taskWithId) {
        return taskWithId.getTask() instanceof AsyncSafeTask ? this.asyncTaskExecutor : this.taskExecutor;
    }

    private Publisher<Void> handleExecutionError(TaskWithId taskWithId, TaskManagerWorker.Listener listener, Throwable th) {
        return th instanceof CancellationException ? Mono.from(listener.cancelled(taskWithId.getId(), taskWithId.getTask().details())).then(Mono.fromCallable(() -> {
            return Boolean.valueOf(this.cancelledTasks.remove(taskWithId.getId()));
        })).then() : listener.failed(taskWithId.getId(), taskWithId.getTask().details(), th);
    }

    private Flux<TaskExecutionDetails.AdditionalInformation> pollAdditionalInformation(TaskWithId taskWithId) {
        return Mono.fromCallable(() -> {
            return taskWithId.getTask().details();
        }).delayElement(this.pollingInterval, Schedulers.elastic()).repeat().handle(ReactorUtils.publishIfPresent()).flatMap(additionalInformation -> {
            return Mono.from(this.listener.updated(taskWithId.getId(), additionalInformation)).thenReturn(additionalInformation);
        }, 16);
    }

    private Task.Result runWithMdc(TaskWithId taskWithId, TaskManagerWorker.Listener listener) {
        return (Task.Result) MDCBuilder.withMdc(MDCBuilder.create().addToContext("taskId", taskWithId.getId().asString()).addToContext("taskType", taskWithId.getTask().type().asString()).addToContext("taskDetails", taskWithId.getTask().details().toString()), () -> {
            return (Task.Result) run(taskWithId, listener).block();
        });
    }

    private Mono<Task.Result> run(TaskWithId taskWithId, TaskManagerWorker.Listener listener) {
        return Mono.from(listener.started(taskWithId.getId())).then(runTask(taskWithId, listener)).onErrorResume(this::isCausedByInterruptedException, th -> {
            return cancelled(taskWithId, listener);
        }).onErrorResume(Exception.class, exc -> {
            LOGGER.error("Error while running task {}", taskWithId.getId(), exc);
            return Mono.from(listener.failed(taskWithId.getId(), taskWithId.getTask().details(), exc)).thenReturn(Task.Result.PARTIAL);
        });
    }

    private boolean isCausedByInterruptedException(Throwable th) {
        if (th instanceof InterruptedException) {
            return true;
        }
        return Stream.iterate(th, th2 -> {
            return th2.getCause() != null;
        }, (v0) -> {
            return v0.getCause();
        }).anyMatch(th3 -> {
            return th3 instanceof InterruptedException;
        });
    }

    private Mono<Task.Result> cancelled(TaskWithId taskWithId, TaskManagerWorker.Listener listener) {
        return Mono.from(listener.cancelled(taskWithId.getId(), taskWithId.getTask().details())).thenReturn(Task.Result.PARTIAL);
    }

    private Mono<Task.Result> runTask(TaskWithId taskWithId, TaskManagerWorker.Listener listener) {
        return Mono.from(taskWithId.getTask().runAsync()).doOnNext(result -> {
            result.onComplete(new Task.CompletionOperation[]{result -> {
                Mono.from(listener.completed(taskWithId.getId(), result, taskWithId.getTask().details())).block();
            }}).onFailure(new Task.Operation[]{() -> {
                LOGGER.error("Task was partially performed. Check logs for more details. Taskid : " + taskWithId.getId());
                Mono.from(listener.failed(taskWithId.getId(), taskWithId.getTask().details())).block();
            }});
        });
    }

    @Override // org.apache.james.task.TaskManagerWorker
    public void cancelTask(TaskId taskId) {
        this.cancelledTasks.add(taskId);
        Optional.ofNullable(this.runningTasks.get(taskId)).ifPresent(completableFuture -> {
            completableFuture.cancel(true);
        });
    }

    @Override // org.apache.james.task.TaskManagerWorker
    public Publisher<Void> fail(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> optional, String str, Throwable th) {
        return this.listener.failed(taskId, optional, str, th);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.taskExecutor.dispose();
        this.asyncTaskExecutor.dispose();
    }
}
