package io.druid.indexing.worker;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import io.druid.indexer.TaskLocation;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.TaskRunner;
import io.druid.indexing.overlord.TaskRunnerListener;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.java.util.emitter.EmittingLogger;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;

/* loaded from: input_file:io/druid/indexing/worker/WorkerTaskMonitor.class */
public class WorkerTaskMonitor {
    private static final EmittingLogger log = new EmittingLogger(WorkerTaskMonitor.class);
    private static final int STOP_WARNING_SECONDS = 10;
    private final ObjectMapper jsonMapper;
    private final PathChildrenCache pathChildrenCache;
    private final CuratorFramework cf;
    private final WorkerCuratorCoordinator workerCuratorCoordinator;
    private final TaskRunner taskRunner;
    private final BlockingQueue<Notice> notices = new LinkedBlockingDeque();
    private final Map<String, TaskDetails> running = new ConcurrentHashMap();
    private final CountDownLatch doneStopping = new CountDownLatch(1);
    private final Object lifecycleLock = new Object();
    private volatile boolean started = false;
    private final ExecutorService exec = Execs.singleThreaded("WorkerTaskMonitor");

    /* loaded from: input_file:io/druid/indexing/worker/WorkerTaskMonitor$LocationNotice.class */
    private class LocationNotice implements Notice {
        private final String taskId;
        private final TaskLocation location;

        public LocationNotice(String str, TaskLocation taskLocation) {
            this.taskId = str;
            this.location = taskLocation;
        }

        @Override // io.druid.indexing.worker.WorkerTaskMonitor.Notice
        public String getTaskId() {
            return this.taskId;
        }

        @Override // io.druid.indexing.worker.WorkerTaskMonitor.Notice
        public void handle() throws InterruptedException {
            TaskDetails taskDetails = (TaskDetails) WorkerTaskMonitor.this.running.get(this.taskId);
            if (taskDetails == null) {
                WorkerTaskMonitor.log.warn("Got location notice for task [%s] that isn't running...", new Object[]{this.taskId});
                return;
            }
            if (Objects.equals(taskDetails.location, this.location)) {
                return;
            }
            taskDetails.location = this.location;
            try {
                WorkerTaskMonitor.log.info("Updating task [%s] announcement with location [%s]", new Object[]{this.taskId, this.location});
                WorkerTaskMonitor.this.workerCuratorCoordinator.updateTaskStatusAnnouncement(TaskAnnouncement.create(taskDetails.task, taskDetails.status, taskDetails.location));
            } catch (InterruptedException e) {
                throw e;
            } catch (Exception e2) {
                WorkerTaskMonitor.log.makeAlert(e2, "Failed to update task announcement", new Object[0]).addData("task", this.taskId).emit();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/druid/indexing/worker/WorkerTaskMonitor$Notice.class */
    public interface Notice {
        String getTaskId();

        void handle() throws Exception;
    }

    /* loaded from: input_file:io/druid/indexing/worker/WorkerTaskMonitor$RunNotice.class */
    private class RunNotice implements Notice {
        private final Task task;

        public RunNotice(Task task) {
            this.task = task;
        }

        @Override // io.druid.indexing.worker.WorkerTaskMonitor.Notice
        public String getTaskId() {
            return this.task.getId();
        }

        @Override // io.druid.indexing.worker.WorkerTaskMonitor.Notice
        public void handle() throws Exception {
            if (WorkerTaskMonitor.this.running.containsKey(this.task.getId())) {
                WorkerTaskMonitor.log.warn("Got run notice for task [%s] that I am already running...", new Object[]{this.task.getId()});
                WorkerTaskMonitor.this.workerCuratorCoordinator.removeTaskRunZnode(this.task.getId());
                return;
            }
            WorkerTaskMonitor.log.info("Submitting runnable for task[%s]", new Object[]{this.task.getId()});
            WorkerTaskMonitor.this.workerCuratorCoordinator.updateTaskStatusAnnouncement(TaskAnnouncement.create(this.task, TaskStatus.running(this.task.getId()), TaskLocation.unknown()));
            WorkerTaskMonitor.log.info("Affirmative. Running task [%s]", new Object[]{this.task.getId()});
            WorkerTaskMonitor.this.workerCuratorCoordinator.removeTaskRunZnode(this.task.getId());
            WorkerTaskMonitor.this.addRunningTask(this.task, WorkerTaskMonitor.this.taskRunner.run(this.task));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/druid/indexing/worker/WorkerTaskMonitor$StatusNotice.class */
    public class StatusNotice implements Notice {
        private final Task task;
        private final TaskStatus status;

        public StatusNotice(Task task, TaskStatus taskStatus) {
            this.task = task;
            this.status = taskStatus;
        }

        @Override // io.druid.indexing.worker.WorkerTaskMonitor.Notice
        public String getTaskId() {
            return this.task.getId();
        }

        @Override // io.druid.indexing.worker.WorkerTaskMonitor.Notice
        public void handle() throws Exception {
            TaskDetails taskDetails = (TaskDetails) WorkerTaskMonitor.this.running.get(this.task.getId());
            if (taskDetails == null) {
                WorkerTaskMonitor.log.warn("Got status notice for task [%s] that isn't running...", new Object[]{this.task.getId()});
                return;
            }
            if (!this.status.isComplete()) {
                WorkerTaskMonitor.log.warn("WTF?! Got status notice for task [%s] that isn't complete (status = [%s])...", new Object[]{this.task.getId(), this.status.getStatusCode()});
                return;
            }
            taskDetails.status = this.status.withDuration(System.currentTimeMillis() - taskDetails.startTime);
            try {
                try {
                    WorkerTaskMonitor.this.workerCuratorCoordinator.updateTaskStatusAnnouncement(TaskAnnouncement.create(taskDetails.task, taskDetails.status, taskDetails.location));
                    WorkerTaskMonitor.log.info("Job's finished. Completed [%s] with status [%s]", new Object[]{this.task.getId(), this.status.getStatusCode()});
                    WorkerTaskMonitor.this.running.remove(this.task.getId());
                } catch (InterruptedException e) {
                    throw e;
                } catch (Exception e2) {
                    WorkerTaskMonitor.log.makeAlert(e2, "Failed to update task announcement", new Object[0]).addData("task", this.task.getId()).emit();
                    WorkerTaskMonitor.this.running.remove(this.task.getId());
                }
            } catch (Throwable th) {
                WorkerTaskMonitor.this.running.remove(this.task.getId());
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/druid/indexing/worker/WorkerTaskMonitor$TaskDetails.class */
    public static class TaskDetails {
        private final Task task;
        private TaskStatus status;
        private final long startTime = System.currentTimeMillis();
        private TaskLocation location = TaskLocation.unknown();

        public TaskDetails(Task task) {
            this.task = task;
            this.status = TaskStatus.running(task.getId());
        }
    }

    @Inject
    public WorkerTaskMonitor(ObjectMapper objectMapper, CuratorFramework curatorFramework, WorkerCuratorCoordinator workerCuratorCoordinator, TaskRunner taskRunner) {
        this.jsonMapper = objectMapper;
        this.pathChildrenCache = new PathChildrenCache(curatorFramework, workerCuratorCoordinator.getTaskPathForWorker(), false, true, Execs.makeThreadFactory("TaskMonitorCache-%s"));
        this.cf = curatorFramework;
        this.workerCuratorCoordinator = workerCuratorCoordinator;
        this.taskRunner = taskRunner;
    }

    @LifecycleStart
    public void start() throws Exception {
        synchronized (this.lifecycleLock) {
            Preconditions.checkState(!this.started, "already started");
            Preconditions.checkState(!this.exec.isShutdown(), "already stopped");
            this.started = true;
            try {
                restoreRestorableTasks();
                cleanupStaleAnnouncements();
                registerRunListener();
                registerLocationListener();
                this.pathChildrenCache.start();
                this.exec.submit(new Runnable() { // from class: io.druid.indexing.worker.WorkerTaskMonitor.1
                    @Override // java.lang.Runnable
                    public void run() {
                        WorkerTaskMonitor.this.mainLoop();
                    }
                });
                log.info("Started WorkerTaskMonitor.", new Object[0]);
                this.started = true;
            } catch (InterruptedException e) {
                throw e;
            } catch (Exception e2) {
                log.makeAlert(e2, "Exception starting WorkerTaskMonitor", new Object[0]).emit();
                throw e2;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void mainLoop() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                Notice take = this.notices.take();
                try {
                    take.handle();
                } catch (InterruptedException e) {
                    throw e;
                } catch (Exception e2) {
                    log.makeAlert(e2, "Failed to handle notice", new Object[0]).addData("noticeClass", take.getClass().getSimpleName()).addData("noticeTaskId", take.getTaskId()).emit();
                }
            } catch (InterruptedException e3) {
                log.info("WorkerTaskMonitor interrupted, exiting.", new Object[0]);
                return;
            } finally {
                this.doneStopping.countDown();
            }
        }
    }

    private void restoreRestorableTasks() {
        for (Pair<Task, ListenableFuture<TaskStatus>> pair : this.taskRunner.restore()) {
            addRunningTask((Task) pair.lhs, (ListenableFuture) pair.rhs);
        }
    }

    private void cleanupStaleAnnouncements() throws Exception {
        for (TaskAnnouncement taskAnnouncement : this.workerCuratorCoordinator.getAnnouncements()) {
            if (!this.running.containsKey(taskAnnouncement.getTaskStatus().getId()) && taskAnnouncement.getTaskStatus().isRunnable()) {
                log.info("Cleaning up stale announcement for task [%s].", new Object[]{taskAnnouncement.getTaskStatus().getId()});
                this.workerCuratorCoordinator.updateTaskStatusAnnouncement(TaskAnnouncement.create(taskAnnouncement.getTaskStatus().getId(), taskAnnouncement.getTaskResource(), TaskStatus.failure(taskAnnouncement.getTaskStatus().getId()), TaskLocation.unknown()));
            }
        }
    }

    private void registerRunListener() {
        this.pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { // from class: io.druid.indexing.worker.WorkerTaskMonitor.2
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
                    WorkerTaskMonitor.this.notices.add(new RunNotice((Task) WorkerTaskMonitor.this.jsonMapper.readValue((byte[]) WorkerTaskMonitor.this.cf.getData().forPath(pathChildrenCacheEvent.getData().getPath()), Task.class)));
                }
            }
        });
    }

    private void registerLocationListener() {
        this.taskRunner.registerListener(new TaskRunnerListener() { // from class: io.druid.indexing.worker.WorkerTaskMonitor.3
            @Override // io.druid.indexing.overlord.TaskRunnerListener
            public String getListenerId() {
                return "WorkerTaskMonitor";
            }

            @Override // io.druid.indexing.overlord.TaskRunnerListener
            public void locationChanged(String str, TaskLocation taskLocation) {
                WorkerTaskMonitor.this.notices.add(new LocationNotice(str, taskLocation));
            }

            @Override // io.druid.indexing.overlord.TaskRunnerListener
            public void statusChanged(String str, TaskStatus taskStatus) {
            }
        }, MoreExecutors.sameThreadExecutor());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addRunningTask(final Task task, ListenableFuture<TaskStatus> listenableFuture) {
        this.running.put(task.getId(), new TaskDetails(task));
        Futures.addCallback(listenableFuture, new FutureCallback<TaskStatus>() { // from class: io.druid.indexing.worker.WorkerTaskMonitor.4
            public void onSuccess(TaskStatus taskStatus) {
                WorkerTaskMonitor.this.notices.add(new StatusNotice(task, taskStatus));
            }

            public void onFailure(Throwable th) {
                WorkerTaskMonitor.this.notices.add(new StatusNotice(task, TaskStatus.failure(task.getId())));
            }
        });
    }

    @LifecycleStop
    public void stop() throws InterruptedException {
        synchronized (this.lifecycleLock) {
            Preconditions.checkState(this.started, "not started");
            try {
                this.started = false;
                this.taskRunner.unregisterListener("WorkerTaskMonitor");
                this.exec.shutdownNow();
                this.pathChildrenCache.close();
                this.taskRunner.stop();
                if (!this.doneStopping.await(10L, TimeUnit.SECONDS)) {
                    log.warn("WorkerTaskMonitor taking longer than %s seconds to exit. Still waiting...", new Object[]{Integer.valueOf(STOP_WARNING_SECONDS)});
                    this.doneStopping.await();
                }
                log.info("Stopped WorkerTaskMonitor.", new Object[0]);
            } catch (InterruptedException e) {
                throw e;
            } catch (Exception e2) {
                log.makeAlert(e2, "Exception stopping WorkerTaskMonitor", new Object[0]).emit();
            }
        }
    }
}
