package cn.feiliu.taskflow.client.grpc;

import cn.feiliu.taskflow.client.ApiClient;
import cn.feiliu.taskflow.client.core.WorkerGrpcScheduler;
import cn.feiliu.taskflow.client.utils.AssertUtils;
import cn.feiliu.taskflow.grpc.TaskflowServiceGrpc;
import cn.feiliu.taskflow.grpc.TaskflowStreamServiceGrpc;
import cn.feiliu.taskflow.proto.TaskModelPb;
import cn.feiliu.taskflow.sdk.worker.Worker;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.Uninterruptibles;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/feiliu/taskflow/client/grpc/PooledPoller.class */
public class PooledPoller implements StreamObserver<TaskModelPb.Task> {
    private static final Logger log = LoggerFactory.getLogger(PooledPoller.class);
    private final TaskflowServiceGrpc.TaskflowServiceStub taskPollClient;
    private final TaskflowStreamServiceGrpc.TaskflowStreamServiceStub streamStub;
    private final Worker worker;
    private final String domain;
    private Integer threadCountForTask;
    private final ArrayBlockingQueue<Holder> latchesForOrder = new ArrayBlockingQueue<>(10000);
    private final AtomicBoolean callAgain = new AtomicBoolean(true);
    private final AtomicLong lastAskedForMessageCount = new AtomicLong(0);
    private final Semaphore semaphore;
    private final int taskPollCount;
    private final ApiClient apiClient;
    private final WorkerGrpcScheduler workerScheduler;
    private final Integer taskPollTimeout;

    /* renamed from: cn.feiliu.taskflow.client.grpc.PooledPoller$1, reason: invalid class name */
    /* loaded from: input_file:cn/feiliu/taskflow/client/grpc/PooledPoller$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$grpc$Status$Code = new int[Status.Code.values().length];

        static {
            try {
                $SwitchMap$io$grpc$Status$Code[Status.Code.UNAVAILABLE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$grpc$Status$Code[Status.Code.UNAUTHENTICATED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$grpc$Status$Code[Status.Code.CANCELLED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$grpc$Status$Code[Status.Code.ABORTED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$io$grpc$Status$Code[Status.Code.DATA_LOSS.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$io$grpc$Status$Code[Status.Code.DEADLINE_EXCEEDED.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:cn/feiliu/taskflow/client/grpc/PooledPoller$Holder.class */
    public static class Holder {
        CountDownLatch latch;
        TaskModelPb.Task task;

        public Holder(CountDownLatch countDownLatch) {
            this.latch = countDownLatch;
        }

        public CountDownLatch getLatch() {
            return this.latch;
        }

        public TaskModelPb.Task getTask() {
            return this.task;
        }

        public void setLatch(CountDownLatch countDownLatch) {
            this.latch = countDownLatch;
        }

        public void setTask(TaskModelPb.Task task) {
            this.task = task;
        }
    }

    public PooledPoller(ApiClient apiClient, Worker worker, String str, Integer num, Integer num2) {
        this.workerScheduler = apiClient.getExecutorFactory().getWorkerGrpcScheduler(worker, num.intValue());
        this.apiClient = apiClient;
        this.taskPollTimeout = num2;
        this.streamStub = apiClient.channelManager().newTaskflowStreamServiceStub();
        this.taskPollClient = apiClient.channelManager().newTaskflowServiceStub();
        this.worker = worker;
        this.domain = str;
        this.threadCountForTask = num;
        this.taskPollCount = num.intValue();
        this.semaphore = new Semaphore(num.intValue());
    }

    public void start() {
        log.info("Starting {} worker with {} threads and polling interval at {} ms with pollCount at {}", new Object[]{this.worker.getTaskDefName(), this.threadCountForTask, Integer.valueOf(this.worker.getPollingInterval()), Integer.valueOf(this.taskPollCount)});
        this.workerScheduler.scheduleBatchPoll(() -> {
            runAccumulatedRequests();
        });
        this.workerScheduler.startTaskProcess(new PoolWorker(this.apiClient.channelManager().newTaskflowServiceFutureStub(), this, this.worker, this.semaphore));
    }

    public TaskModelPb.Task getTask() {
        Stopwatch createStarted = Stopwatch.createStarted();
        TaskModelPb.Task task = null;
        try {
            try {
                CountDownLatch countDownLatch = new CountDownLatch(1);
                Holder holder = new Holder(countDownLatch);
                this.latchesForOrder.put(holder);
                Uninterruptibles.awaitUninterruptibly(countDownLatch);
                task = holder.getTask();
                long elapsed = createStarted.elapsed(TimeUnit.MILLISECONDS);
                if (elapsed > 9000) {
                    log.info("Polled in {} ms - found task - {}", Long.valueOf(elapsed), Boolean.valueOf((task == null || task.getTaskId().equals("NO_OP")) ? false : true));
                }
            } catch (InterruptedException e) {
                log.error("ERROR WAITING --- ", e);
                long elapsed2 = createStarted.elapsed(TimeUnit.MILLISECONDS);
                if (elapsed2 > 9000) {
                    log.info("Polled in {} ms - found task - {}", Long.valueOf(elapsed2), Boolean.valueOf((task == null || task.getTaskId().equals("NO_OP")) ? false : true));
                }
            }
            if (task == null) {
                this.semaphore.release();
            }
            return task;
        } catch (Throwable th) {
            long elapsed3 = createStarted.elapsed(TimeUnit.MILLISECONDS);
            if (elapsed3 > 9000) {
                log.info("Polled in {} ms - found task - {}", Long.valueOf(elapsed3), Boolean.valueOf((task == null || task.getTaskId().equals("NO_OP")) ? false : true));
            }
            throw th;
        }
    }

    public void saveTask(TaskModelPb.Task task) {
        if (task != null) {
            try {
                Holder poll = this.latchesForOrder.poll(1000L, TimeUnit.MILLISECONDS);
                if (poll == null) {
                    throw new RuntimeException("Holder cannot be null!");
                }
                poll.task = task;
                poll.latch.countDown();
            } catch (InterruptedException e) {
                log.error("ERROR!", e);
            }
        }
    }

    public void runAccumulatedRequests() {
        int intValue = this.threadCountForTask.intValue() - this.semaphore.availablePermits();
        if (intValue <= 0) {
            return;
        }
        if (intValue > this.taskPollCount) {
            intValue = this.taskPollCount;
        }
        if (this.callAgain.get()) {
            this.callAgain.set(false);
            this.lastAskedForMessageCount.set(intValue);
            log.trace("Polling {} for {} tasks", this.worker.getTaskDefName(), Integer.valueOf(intValue));
            this.streamStub.batchPoll(buildPollRequest(intValue), this);
        }
    }

    private TaskModelPb.BatchPollRequest buildPollRequest(int i) {
        AssertUtils.assertArgument(this.taskPollTimeout, (Integer) 100, (Integer) 2000);
        TaskModelPb.BatchPollRequest.Builder workerId = TaskModelPb.BatchPollRequest.newBuilder().setCount(i).setTaskType(this.worker.getTaskDefName()).setTimeout(this.taskPollTimeout.intValue()).setWorkerId(this.worker.getIdentity());
        if (this.domain != null) {
            workerId = workerId.setDomain(this.domain);
        }
        return workerId.build();
    }

    public void onNext(TaskModelPb.Task task) {
        try {
            saveTask(task);
            this.semaphore.release();
            this.lastAskedForMessageCount.decrementAndGet();
        } catch (Throwable th) {
            log.error(th.getMessage(), th);
        }
    }

    public void onError(Throwable th) {
        Status.Code code = Status.fromThrowable(th).getCode();
        drain();
        switch (AnonymousClass1.$SwitchMap$io$grpc$Status$Code[code.ordinal()]) {
            case 1:
                log.trace("Server not available ");
                return;
            case 2:
                log.error("{} - Invalid or missing api key/secret", code);
                return;
            case 3:
            case 4:
            case 5:
            case 6:
                return;
            default:
                log.error("Error from server when polling for the task {} - {}", this.worker.getTaskDefName(), code);
                return;
        }
    }

    public void onCompleted() {
        drain();
    }

    private void drain() {
        long j = this.lastAskedForMessageCount.get();
        if (j > 0) {
            log.debug("Didn't get {} messages from server as expected", Long.valueOf(j));
            for (int i = 0; i < j; i++) {
                saveTask(TaskModelPb.Task.newBuilder().setTaskId("NO_OP").build());
                this.semaphore.release();
            }
        }
        this.callAgain.set(true);
    }

    public void close() {
        this.workerScheduler.shutdown();
    }
}
