package cn.feiliu.taskflow.client.automator;

import cn.feiliu.taskflow.client.ApiClient;
import cn.feiliu.taskflow.client.core.WorkerHttpScheduler;
import cn.feiliu.taskflow.client.telemetry.MetricsContainer;
import cn.feiliu.taskflow.common.metadata.tasks.ExecutingTask;
import cn.feiliu.taskflow.common.metadata.tasks.TaskExecResult;
import cn.feiliu.taskflow.sdk.config.PropertyFactory;
import cn.feiliu.taskflow.sdk.exceptions.ExceptionRateLimiter;
import cn.feiliu.taskflow.sdk.worker.Worker;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.Uninterruptibles;
import com.netflix.appinfo.InstanceInfo;
import com.netflix.discovery.EurekaClient;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Spectator;
import com.netflix.spectator.api.patterns.ThreadPoolMonitor;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.Thread;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:cn/feiliu/taskflow/client/automator/TaskRunner.class */
public class TaskRunner {
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskRunner.class);
    private static final Registry REGISTRY = Spectator.globalRegistry();
    private final EurekaClient eurekaClient;
    private final ApiClient apiClient;
    private final int updateRetryCount;
    private final WorkerHttpScheduler workerHttpScheduler;
    private final Map<String, String> taskToDomain;
    private final int taskPollTimeout;
    public static final String DOMAIN = "domain";
    private static final String OVERRIDE_DISCOVERY = "pollOutOfDiscovery";
    public static final String ALL_WORKERS = "all";
    private final Semaphore permits;
    private final Worker worker;
    private int pollingIntervalInMillis;
    private String domain;
    private final String taskType;
    private int errorAt;
    private final ExceptionRateLimiter rateLimiter = new ExceptionRateLimiter();
    private final AtomicReference<Boolean> stopSignalRef = new AtomicReference<>(false);
    private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = (thread, th) -> {
        MetricsContainer.incrementUncaughtExceptionCount();
        LOGGER.error("Uncaught exception. Thread {} will exit now", thread, th);
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    public TaskRunner(Worker worker, EurekaClient eurekaClient, ApiClient apiClient, int i, Map<String, String> map, String str, int i2, int i3) {
        this.worker = worker;
        this.eurekaClient = eurekaClient;
        this.apiClient = apiClient;
        this.updateRetryCount = i;
        this.taskToDomain = map;
        this.taskPollTimeout = i3;
        this.permits = new Semaphore(i2);
        this.pollingIntervalInMillis = worker.getPollingInterval();
        this.taskType = worker.getTaskDefName();
        this.domain = PropertyFactory.getString(this.taskType, DOMAIN, (String) null);
        if (this.domain == null) {
            this.domain = PropertyFactory.getString(ALL_WORKERS, DOMAIN, (String) null);
        }
        if (this.domain == null) {
            this.domain = map.get(this.taskType);
        }
        int intValue = PropertyFactory.getInteger(this.taskType, "LOG_INTERVAL", 0).intValue();
        intValue = intValue == 0 ? PropertyFactory.getInteger(ALL_WORKERS, "LOG_INTERVAL", 0).intValue() : intValue;
        this.errorAt = intValue == 0 ? 100 : intValue;
        LOGGER.info("Polling errors will be sampled at every {} error (after the first 100 errors) for taskType {}", Integer.valueOf(this.errorAt), this.taskType);
        this.workerHttpScheduler = apiClient.getExecutorFactory().getWorkerHttpScheduler(worker, i2, str, this.uncaughtExceptionHandler);
        ThreadPoolMonitor.attach(REGISTRY, this.workerHttpScheduler.getThreadPoolExecutor(), str);
        LOGGER.info("Starting Worker for taskType '{}' with {} threads, {} ms polling interval and domain {}", new Object[]{this.taskType, Integer.valueOf(i2), Integer.valueOf(this.pollingIntervalInMillis), this.domain});
        LOGGER.info("Polling errors for taskType {} will be printed at every {} occurance.", this.taskType, Integer.valueOf(this.errorAt));
    }

    public void pollAndExecute() throws InterruptedException {
        Stopwatch stopwatch = null;
        long j = this.pollingIntervalInMillis;
        while (!this.stopSignalRef.get().booleanValue()) {
            try {
                Pair<List<ExecutingTask>, Throwable> pollTasksForWorker = pollTasksForWorker();
                List list = (List) pollTasksForWorker.getLeft();
                j = pollTasksForWorker.getRight() != null ? Math.min(j * 2, TimeUnit.MINUTES.toMillis(10L)) : this.pollingIntervalInMillis;
                if (list.isEmpty()) {
                    if (stopwatch == null) {
                        stopwatch = Stopwatch.createStarted();
                    }
                    Uninterruptibles.sleepUninterruptibly(j, TimeUnit.MILLISECONDS);
                } else {
                    if (stopwatch != null) {
                        stopwatch.stop();
                        LOGGER.trace("Poller for task {} waited for {} ms before getting {} tasks to execute", new Object[]{this.taskType, Long.valueOf(stopwatch.elapsed(TimeUnit.MILLISECONDS)), Integer.valueOf(list.size())});
                        stopwatch = null;
                    }
                    list.forEach(executingTask -> {
                        this.workerHttpScheduler.submit(() -> {
                            processTask(executingTask);
                        });
                    });
                }
            } catch (Throwable th) {
                LOGGER.error(th.getMessage(), th);
                Thread.sleep(1000L);
            }
        }
    }

    public void shutdown(int i) {
        this.stopSignalRef.set(true);
        this.workerHttpScheduler.shutdown(i);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v31, types: [java.util.List] */
    private Pair<List<ExecutingTask>, Throwable> pollTasksForWorker() {
        LinkedList linkedList = new LinkedList();
        Boolean bool = (Boolean) Optional.ofNullable(PropertyFactory.getBoolean(this.taskType, OVERRIDE_DISCOVERY, (Boolean) null)).orElseGet(() -> {
            return PropertyFactory.getBoolean(ALL_WORKERS, OVERRIDE_DISCOVERY, false);
        });
        if (this.eurekaClient != null && !this.eurekaClient.getInstanceRemoteStatus().equals(InstanceInfo.InstanceStatus.UP) && !bool.booleanValue()) {
            LOGGER.trace("Instance is NOT UP in discovery - will not poll");
            return Pair.of(linkedList, (Object) null);
        }
        if (this.worker.paused()) {
            MetricsContainer.incrementTaskPausedCount(this.taskType);
            LOGGER.trace("Worker {} has been paused. Not polling anymore!", this.worker.getClass());
            return Pair.of(linkedList, (Object) null);
        }
        int i = 0;
        while (this.permits.tryAcquire()) {
            i++;
        }
        if (i == 0) {
            return Pair.of(linkedList, (Object) null);
        }
        try {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Polling task of type: {} in domain: '{}' with size {}", new Object[]{this.taskType, this.domain, Integer.valueOf(i)});
            }
            Stopwatch createStarted = Stopwatch.createStarted();
            int i2 = i;
            linkedList = (List) MetricsContainer.getPollTimer(this.taskType).record(() -> {
                return pollTask(this.domain, i2);
            });
            createStarted.stop();
            this.permits.release(i - linkedList.size());
            if (LOGGER.isInfoEnabled()) {
                LOGGER.debug("Time taken to poll {} task with a batch size of {} is {} ms", new Object[]{this.taskType, Integer.valueOf(linkedList.size()), Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS))});
            }
            return Pair.of(linkedList, (Object) null);
        } catch (Throwable th) {
            this.permits.release(i - linkedList.size());
            this.rateLimiter.shouldRecordLog(th, exceptionSummary -> {
                Optional<String> tryParserType = exceptionSummary.tryParserType();
                String orElse = tryParserType.orElse("error");
                if (exceptionSummary.isFirst() || !tryParserType.isPresent()) {
                    LOGGER.error("Error polling for taskType: '{}', {}", new Object[]{this.taskType, orElse, th});
                } else {
                    LOGGER.error("Error polling for taskType: '{}', {} = {}", new Object[]{this.taskType, orElse, th.getMessage()});
                }
            });
            return Pair.of(linkedList, th);
        }
    }

    private List<ExecutingTask> pollTask(String str, int i) {
        if (i < 1) {
            return Collections.emptyList();
        }
        String identity = this.worker.getIdentity();
        LOGGER.debug("poll {} in the domain {} with batch size {}", new Object[]{this.taskType, str, Integer.valueOf(i)});
        return this.apiClient.getTaskClient().batchPollTasksInDomain(this.taskType, str, identity, i, this.taskPollTimeout);
    }

    private void processTask(ExecutingTask executingTask) {
        LOGGER.trace("Executing task: {} of type: {} in worker: {} at {}", new Object[]{executingTask.getTaskId(), this.taskType, this.worker.getClass().getSimpleName(), this.worker.getIdentity()});
        LOGGER.trace("task {} is getting executed after {} ms of getting polled", executingTask.getTaskId(), Long.valueOf(System.currentTimeMillis() - executingTask.getStartTime()));
        try {
            try {
                Stopwatch createStarted = Stopwatch.createStarted();
                executeTask(this.worker, executingTask);
                createStarted.stop();
                LOGGER.trace("Took {} ms to execute and update task with id {}", Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS)), executingTask.getTaskId());
                this.permits.release();
            } catch (Throwable th) {
                executingTask.setStatus(ExecutingTask.Status.FAILED);
                handleException(th, new TaskExecResult(executingTask), this.worker, executingTask);
                this.permits.release();
            }
        } catch (Throwable th2) {
            this.permits.release();
            throw th2;
        }
    }

    private void executeTask(Worker worker, ExecutingTask executingTask) {
        if (executingTask == null || executingTask.getTaskDefName().isEmpty()) {
            LOGGER.warn("Empty task {}", worker.getTaskDefName());
            return;
        }
        Stopwatch createStarted = Stopwatch.createStarted();
        TaskExecResult taskExecResult = null;
        try {
            try {
                LOGGER.trace("Executing task: {} in worker: {} at {}", new Object[]{executingTask.getTaskId(), worker.getClass().getSimpleName(), worker.getIdentity()});
                taskExecResult = worker.execute(executingTask);
                taskExecResult.setWorkflowInstanceId(executingTask.getWorkflowInstanceId());
                taskExecResult.setTaskId(executingTask.getTaskId());
                taskExecResult.setWorkerId(worker.getIdentity());
                createStarted.stop();
                MetricsContainer.getExecutionTimer(worker.getTaskDefName()).record(createStarted.elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
            } catch (Exception e) {
                LOGGER.error("Unable to execute task: {} of type: {}", new Object[]{executingTask.getTaskId(), executingTask.getTaskDefName(), e});
                MetricsContainer.incrementTaskExecutionErrorCount(executingTask.getTaskType(), e);
                if (taskExecResult == null) {
                    executingTask.setStatus(ExecutingTask.Status.FAILED);
                    taskExecResult = new TaskExecResult(executingTask);
                }
                handleException(e, taskExecResult, worker, executingTask);
                createStarted.stop();
                MetricsContainer.getExecutionTimer(worker.getTaskDefName()).record(createStarted.elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
            }
            LOGGER.trace("Task: {} executed by worker: {} at {} with status: {}", new Object[]{executingTask.getTaskId(), worker.getClass().getSimpleName(), worker.getIdentity(), taskExecResult.getStatus()});
            Stopwatch createStarted2 = Stopwatch.createStarted();
            updateTaskResult(this.updateRetryCount, executingTask, taskExecResult, worker);
            createStarted2.stop();
            LOGGER.trace("Time taken to update the {} {} ms", executingTask.getTaskType(), Long.valueOf(createStarted2.elapsed(TimeUnit.MILLISECONDS)));
        } catch (Throwable th) {
            createStarted.stop();
            MetricsContainer.getExecutionTimer(worker.getTaskDefName()).record(createStarted.elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
            throw th;
        }
    }

    private void updateTaskResult(int i, ExecutingTask executingTask, TaskExecResult taskExecResult, Worker worker) {
        try {
            Optional optional = (Optional) retryOperation(taskExecResult2 -> {
                return upload(taskExecResult2, executingTask.getTaskType());
            }, i, taskExecResult, "evaluateAndUploadLargePayload");
            if (optional.isPresent()) {
                taskExecResult.setExternalOutputPayloadStoragePath((String) optional.get());
                taskExecResult.setOutputData((Map) null);
            }
            retryOperation(taskExecResult3 -> {
                this.apiClient.getTaskClient().updateTask(taskExecResult3);
                return null;
            }, i, taskExecResult, "updateTask");
        } catch (Exception e) {
            worker.onErrorUpdate(executingTask);
            MetricsContainer.incrementTaskUpdateErrorCount(worker.getTaskDefName(), e);
            LOGGER.error(String.format("Failed to update result: %s for task: %s in worker: %s", taskExecResult.toString(), executingTask.getTaskDefName(), worker.getIdentity()), e);
        }
    }

    private Optional<String> upload(TaskExecResult taskExecResult, String str) {
        return Optional.empty();
    }

    private <T, R> R retryOperation(Function<T, R> function, int i, T t, String str) {
        int i2 = 0;
        while (i2 < i) {
            try {
                return function.apply(t);
            } catch (Exception e) {
                LOGGER.error("Error executing {}", str, e);
                i2++;
                Uninterruptibles.sleepUninterruptibly(500 * (i + 1), TimeUnit.MILLISECONDS);
            }
        }
        throw new RuntimeException("Exhausted retries performing " + str);
    }

    private void handleException(Throwable th, TaskExecResult taskExecResult, Worker worker, ExecutingTask executingTask) {
        LOGGER.error(String.format("Error while executing task %s", executingTask.toString()), th);
        MetricsContainer.incrementTaskExecutionErrorCount(this.taskType, th);
        taskExecResult.setStatus(TaskExecResult.Status.FAILED);
        taskExecResult.setReasonForIncompletion("Error while executing the task: " + th);
        StringWriter stringWriter = new StringWriter();
        th.printStackTrace(new PrintWriter(stringWriter));
        taskExecResult.log(stringWriter.toString());
        updateTaskResult(this.updateRetryCount, executingTask, taskExecResult, worker);
    }
}
