package cn.feiliu.taskflow.client.grpc;

import cn.feiliu.taskflow.client.telemetry.MetricsContainer;
import cn.feiliu.taskflow.common.metadata.tasks.ExecutingTask;
import cn.feiliu.taskflow.common.metadata.tasks.TaskExecResult;
import cn.feiliu.taskflow.grpc.TaskflowServiceGrpc;
import cn.feiliu.taskflow.mapper.MapperFactory;
import cn.feiliu.taskflow.proto.TaskModelPb;
import cn.feiliu.taskflow.sdk.worker.Worker;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/feiliu/taskflow/client/grpc/PoolWorker.class */
public class PoolWorker {
    private static final Logger log = LoggerFactory.getLogger(PoolWorker.class);
    private final PooledPoller pooledPoller;
    private final Worker worker;
    private final TaskflowServiceGrpc.TaskflowServiceFutureStub taskServiceStub;
    private final Semaphore semaphore;

    public PoolWorker(TaskflowServiceGrpc.TaskflowServiceFutureStub taskflowServiceFutureStub, PooledPoller pooledPoller, Worker worker, Semaphore semaphore) {
        this.taskServiceStub = taskflowServiceFutureStub;
        this.pooledPoller = pooledPoller;
        this.worker = worker;
        this.semaphore = semaphore;
    }

    public void run() {
        try {
            this.semaphore.acquireUninterruptibly();
            TaskModelPb.Task task = this.pooledPoller.getTask();
            if (task != null && !"NO_OP".equals(task.getTaskId())) {
                log.debug("Executing task {}", task.getTaskId());
                ExecutingTask fromProto = MapperFactory.getInstance().fromProto(task);
                try {
                    if (fromProto.getOutputData().containsKey("_severSendTime")) {
                        fromProto.getOutputData().put("_pollNetworkLatency", Long.valueOf(System.currentTimeMillis() - ((Number) fromProto.getOutputData().get("_severSendTime")).longValue()));
                    }
                } catch (Exception e) {
                    log.warn("Error", e);
                }
                TaskExecResult execute = this.worker.execute(fromProto);
                log.info("Executed task {}", task.getTaskId());
                updateTaskResult(3, fromProto, execute, this.worker);
            }
        } catch (Throwable th) {
            log.error("Error executing task: {}", th.getMessage(), th);
        }
    }

    private void updateTaskResult(int i, ExecutingTask executingTask, TaskExecResult taskExecResult, Worker worker) {
        try {
            retryOperation(taskExecResult2 -> {
                doUpdateTask(taskExecResult2);
                return null;
            }, i, taskExecResult, "updateTask");
        } catch (Exception e) {
            worker.onErrorUpdate(executingTask);
            MetricsContainer.incrementTaskUpdateErrorCount(worker.getTaskDefName(), e);
            log.error("Failed to update result: {} for task: {} in worker: {}", new Object[]{taskExecResult.toString(), executingTask.getTaskDefName(), worker.getIdentity(), e});
        }
    }

    private void doUpdateTask(TaskExecResult taskExecResult) {
        long currentTimeMillis = System.currentTimeMillis();
        taskExecResult.getOutputData().put("_clientSendTime", Long.valueOf(currentTimeMillis));
        try {
            try {
                log.info("更新任务->>{}", ((TaskModelPb.UpdateTaskResponse) this.taskServiceStub.updateTask(TaskModelPb.UpdateTaskRequest.newBuilder().setResult(MapperFactory.getInstance().toProto(taskExecResult)).build()).get(30000L, TimeUnit.MILLISECONDS)).getTaskId());
                log.info("Took {} ms to update task", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            log.info("Took {} ms to update task", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            throw th;
        }
    }

    private <T, R> R retryOperation(Function<T, R> function, int i, T t, String str) {
        int i2 = 0;
        while (i2 < i) {
            try {
                return function.apply(t);
            } catch (Exception e) {
                i2++;
                try {
                    Thread.sleep(500 * (i2 + 1));
                } catch (InterruptedException e2) {
                    log.error("Retry interrupted", e2);
                }
            }
        }
        throw new RuntimeException("Exhausted retries performing " + str);
    }
}
