package io.druid.indexing.overlord;

import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.metadata.EntryExistsException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/* loaded from: input_file:io/druid/indexing/overlord/TaskQueue.class */
public class TaskQueue {
    private final TaskQueueConfig config;
    private final TaskStorage taskStorage;
    private final TaskRunner taskRunner;
    private final TaskActionClientFactory taskActionClientFactory;
    private final TaskLockbox taskLockbox;
    private final ServiceEmitter emitter;
    private static final EmittingLogger log = new EmittingLogger(TaskQueue.class);
    private final List<Task> tasks = Lists.newArrayList();
    private final Map<String, ListenableFuture<TaskStatus>> taskFutures = Maps.newHashMap();
    private final ReentrantLock giant = new ReentrantLock();
    private final Condition managementMayBeNecessary = this.giant.newCondition();
    private final ExecutorService managerExec = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(false).setNameFormat("TaskQueue-Manager").build());
    private final ScheduledExecutorService storageSyncExec = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(false).setNameFormat("TaskQueue-StorageSync").build());
    private volatile boolean active = false;

    @Inject
    public TaskQueue(TaskQueueConfig taskQueueConfig, TaskStorage taskStorage, TaskRunner taskRunner, TaskActionClientFactory taskActionClientFactory, TaskLockbox taskLockbox, ServiceEmitter serviceEmitter) {
        this.config = (TaskQueueConfig) Preconditions.checkNotNull(taskQueueConfig, "config");
        this.taskStorage = (TaskStorage) Preconditions.checkNotNull(taskStorage, "taskStorage");
        this.taskRunner = (TaskRunner) Preconditions.checkNotNull(taskRunner, "taskRunner");
        this.taskActionClientFactory = (TaskActionClientFactory) Preconditions.checkNotNull(taskActionClientFactory, "taskActionClientFactory");
        this.taskLockbox = (TaskLockbox) Preconditions.checkNotNull(taskLockbox, "taskLockbox");
        this.emitter = (ServiceEmitter) Preconditions.checkNotNull(serviceEmitter, "emitter");
    }

    @LifecycleStart
    public void start() {
        this.giant.lock();
        try {
            Preconditions.checkState(!this.active, "queue must be stopped");
            this.active = true;
            syncFromStorage();
            this.managerExec.submit(new Runnable() { // from class: io.druid.indexing.overlord.TaskQueue.1
                @Override // java.lang.Runnable
                public void run() {
                    while (true) {
                        try {
                            TaskQueue.this.manage();
                            return;
                        } catch (InterruptedException e) {
                            TaskQueue.log.info("Interrupted, exiting!", new Object[0]);
                            return;
                        } catch (Exception e2) {
                            long millis = TaskQueue.this.config.getRestartDelay().getMillis();
                            TaskQueue.log.makeAlert(e2, "Failed to manage", new Object[0]).addData("restartDelay", Long.valueOf(millis)).emit();
                            try {
                                Thread.sleep(millis);
                            } catch (InterruptedException e3) {
                                TaskQueue.log.info("Interrupted, exiting!", new Object[0]);
                                return;
                            }
                        }
                    }
                }
            });
            ScheduledExecutors.scheduleAtFixedRate(this.storageSyncExec, this.config.getStorageSyncRate(), new Callable<ScheduledExecutors.Signal>() { // from class: io.druid.indexing.overlord.TaskQueue.2
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public ScheduledExecutors.Signal call() {
                    try {
                        TaskQueue.this.syncFromStorage();
                    } catch (Exception e) {
                        if (TaskQueue.this.active) {
                            TaskQueue.log.makeAlert(e, "Failed to sync with storage", new Object[0]).emit();
                        }
                    }
                    return TaskQueue.this.active ? ScheduledExecutors.Signal.REPEAT : ScheduledExecutors.Signal.STOP;
                }
            });
            this.managementMayBeNecessary.signalAll();
            this.giant.unlock();
        } catch (Throwable th) {
            this.giant.unlock();
            throw th;
        }
    }

    @LifecycleStop
    public void stop() {
        this.giant.lock();
        try {
            this.tasks.clear();
            this.taskFutures.clear();
            this.active = false;
            this.managerExec.shutdownNow();
            this.storageSyncExec.shutdownNow();
            this.managementMayBeNecessary.signalAll();
            this.giant.unlock();
        } catch (Throwable th) {
            this.giant.unlock();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void manage() throws InterruptedException {
        ListenableFuture<TaskStatus> listenableFuture;
        log.info("Beginning management in %s.", new Object[]{this.config.getStartDelay()});
        Thread.sleep(this.config.getStartDelay().getMillis());
        while (this.active) {
            this.giant.lock();
            try {
                HashMap newHashMap = Maps.newHashMap();
                for (TaskRunnerWorkItem taskRunnerWorkItem : this.taskRunner.getKnownTasks()) {
                    newHashMap.put(taskRunnerWorkItem.getTaskId(), taskRunnerWorkItem.getResult());
                }
                Iterator it = ImmutableList.copyOf(this.tasks).iterator();
                while (it.hasNext()) {
                    Task task = (Task) it.next();
                    if (!this.taskFutures.containsKey(task.getId())) {
                        if (newHashMap.containsKey(task.getId())) {
                            listenableFuture = (ListenableFuture) newHashMap.get(task.getId());
                        } else {
                            try {
                            } catch (Exception e) {
                                log.warn(e, "Exception thrown during isReady for task: %s", new Object[]{task.getId()});
                                notifyStatus(task, TaskStatus.failure(task.getId()));
                            }
                            if (task.isReady(this.taskActionClientFactory.create(task))) {
                                log.info("Asking taskRunner to run: %s", new Object[]{task.getId()});
                                listenableFuture = this.taskRunner.run(task);
                            }
                        }
                        this.taskFutures.put(task.getId(), attachCallbacks(task, listenableFuture));
                    }
                }
                Sets.SetView<String> difference = Sets.difference(newHashMap.keySet(), ImmutableSet.copyOf(Lists.transform(this.tasks, new Function<Task, Object>() { // from class: io.druid.indexing.overlord.TaskQueue.3
                    public String apply(Task task2) {
                        return task2.getId();
                    }
                })));
                if (!difference.isEmpty()) {
                    log.info("Asking taskRunner to clean up %,d tasks.", new Object[]{Integer.valueOf(difference.size())});
                    for (String str : difference) {
                        try {
                            this.taskRunner.shutdown(str);
                        } catch (Exception e2) {
                            log.warn(e2, "TaskRunner failed to clean up task: %s", new Object[]{str});
                        }
                    }
                }
                this.managementMayBeNecessary.awaitNanos(60000000000L);
                this.giant.unlock();
            } catch (Throwable th) {
                this.giant.unlock();
                throw th;
            }
        }
    }

    public boolean add(Task task) throws EntryExistsException {
        this.giant.lock();
        try {
            Preconditions.checkState(this.active, "Queue is not active!");
            Preconditions.checkNotNull(task, "task");
            Preconditions.checkState(this.tasks.size() < this.config.getMaxSize(), "Too many tasks (max = %,d)", new Object[]{Integer.valueOf(this.config.getMaxSize())});
            this.taskStorage.insert(task, TaskStatus.running(task.getId()));
            this.tasks.add(task);
            this.managementMayBeNecessary.signalAll();
            this.giant.unlock();
            return true;
        } catch (Throwable th) {
            this.giant.unlock();
            throw th;
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:10:0x0038, code lost:
    
        notifyStatus(r0, io.druid.indexing.common.TaskStatus.failure(r5));
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void shutdown(java.lang.String r5) {
        /*
            r4 = this;
            r0 = r4
            java.util.concurrent.locks.ReentrantLock r0 = r0.giant
            r0.lock()
            r0 = r5
            java.lang.String r1 = "taskId"
            java.lang.Object r0 = com.google.common.base.Preconditions.checkNotNull(r0, r1)     // Catch: java.lang.Throwable -> L51
            r0 = r4
            java.util.List<io.druid.indexing.common.task.Task> r0 = r0.tasks     // Catch: java.lang.Throwable -> L51
            java.util.Iterator r0 = r0.iterator()     // Catch: java.lang.Throwable -> L51
            r6 = r0
        L18:
            r0 = r6
            boolean r0 = r0.hasNext()     // Catch: java.lang.Throwable -> L51
            if (r0 == 0) goto L47
            r0 = r6
            java.lang.Object r0 = r0.next()     // Catch: java.lang.Throwable -> L51
            io.druid.indexing.common.task.Task r0 = (io.druid.indexing.common.task.Task) r0     // Catch: java.lang.Throwable -> L51
            r7 = r0
            r0 = r7
            java.lang.String r0 = r0.getId()     // Catch: java.lang.Throwable -> L51
            r1 = r5
            boolean r0 = r0.equals(r1)     // Catch: java.lang.Throwable -> L51
            if (r0 == 0) goto L44
            r0 = r4
            r1 = r7
            r2 = r5
            io.druid.indexing.common.TaskStatus r2 = io.druid.indexing.common.TaskStatus.failure(r2)     // Catch: java.lang.Throwable -> L51
            r0.notifyStatus(r1, r2)     // Catch: java.lang.Throwable -> L51
            goto L47
        L44:
            goto L18
        L47:
            r0 = r4
            java.util.concurrent.locks.ReentrantLock r0 = r0.giant
            r0.unlock()
            goto L5d
        L51:
            r8 = move-exception
            r0 = r4
            java.util.concurrent.locks.ReentrantLock r0 = r0.giant
            r0.unlock()
            r0 = r8
            throw r0
        L5d:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: io.druid.indexing.overlord.TaskQueue.shutdown(java.lang.String):void");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyStatus(Task task, TaskStatus taskStatus) {
        this.giant.lock();
        try {
            Preconditions.checkNotNull(task, "task");
            Preconditions.checkNotNull(taskStatus, "status");
            Preconditions.checkState(this.active, "Queue is not active!");
            Preconditions.checkArgument(task.getId().equals(taskStatus.getId()), "Mismatching task ids[%s/%s]", new Object[]{task.getId(), taskStatus.getId()});
            try {
                this.taskRunner.shutdown(task.getId());
            } catch (Exception e) {
                log.warn(e, "TaskRunner failed to cleanup task after completion: %s", new Object[]{task.getId()});
            }
            int i = 0;
            int size = this.tasks.size() - 1;
            while (true) {
                if (size < 0) {
                    break;
                }
                if (this.tasks.get(size).getId().equals(task.getId())) {
                    i = 0 + 1;
                    this.tasks.remove(size);
                    break;
                }
                size--;
            }
            if (i == 0) {
                log.warn("Unknown task completed: %s", new Object[]{task.getId()});
            } else if (i > 1) {
                log.makeAlert("Removed multiple copies of task", new Object[0]).addData("count", Integer.valueOf(i)).addData("task", task.getId()).emit();
            }
            this.taskFutures.remove(task.getId());
            if (i > 0) {
                try {
                    Optional<TaskStatus> status = this.taskStorage.getStatus(task.getId());
                    if (status.isPresent() && ((TaskStatus) status.get()).isRunnable()) {
                        this.taskStorage.setStatus(taskStatus);
                        this.taskLockbox.unlock(task);
                        log.info("Task done: %s", new Object[]{task});
                        this.managementMayBeNecessary.signalAll();
                    } else {
                        log.makeAlert("Ignoring notification for already-complete task", new Object[0]).addData("task", task.getId()).emit();
                    }
                } catch (Exception e2) {
                    log.makeAlert(e2, "Failed to persist status for task", new Object[0]).addData("task", task.getId()).addData("statusCode", taskStatus.getStatusCode()).emit();
                }
            }
        } finally {
            this.giant.unlock();
        }
    }

    private ListenableFuture<TaskStatus> attachCallbacks(final Task task, ListenableFuture<TaskStatus> listenableFuture) {
        final ServiceMetricEvent.Builder dimension = new ServiceMetricEvent.Builder().setDimension("dataSource", task.getDataSource()).setDimension("taskType", task.getType());
        Futures.addCallback(listenableFuture, new FutureCallback<TaskStatus>() { // from class: io.druid.indexing.overlord.TaskQueue.4
            public void onSuccess(TaskStatus taskStatus) {
                TaskQueue.log.info("Received %s status for task: %s", new Object[]{taskStatus.getStatusCode(), taskStatus.getId()});
                handleStatus(taskStatus);
            }

            public void onFailure(Throwable th) {
                TaskQueue.log.makeAlert(th, "Failed to run task", new Object[0]).addData("task", task.getId()).addData("type", task.getType()).addData("dataSource", task.getDataSource()).emit();
                handleStatus(TaskStatus.failure(task.getId()));
            }

            private void handleStatus(TaskStatus taskStatus) {
                try {
                    if (!TaskQueue.this.active) {
                        TaskQueue.log.info("Abandoning task due to shutdown: %s", new Object[]{task.getId()});
                        return;
                    }
                    TaskQueue.this.notifyStatus(task, taskStatus);
                    if (taskStatus.isComplete()) {
                        dimension.setDimension("taskStatus", taskStatus.getStatusCode().toString());
                        TaskQueue.this.emitter.emit(dimension.build("task/run/time", Long.valueOf(taskStatus.getDuration())));
                        TaskQueue.log.info("Task %s: %s (%d run duration)", new Object[]{taskStatus.getStatusCode(), task, Long.valueOf(taskStatus.getDuration())});
                    }
                } catch (Exception e) {
                    TaskQueue.log.makeAlert(e, "Failed to handle task status", new Object[0]).addData("task", task.getId()).addData("statusCode", taskStatus.getStatusCode()).emit();
                }
            }
        });
        return listenableFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void syncFromStorage() {
        this.giant.lock();
        try {
            try {
                if (this.active) {
                    List<Task> activeTasks = this.taskStorage.getActiveTasks();
                    log.info("Synced %,d tasks from storage (%,d tasks added, %,d tasks removed).", new Object[]{Integer.valueOf(activeTasks.size()), Integer.valueOf(Sets.difference(Sets.newHashSet(activeTasks), Sets.newHashSet(this.tasks)).size()), Integer.valueOf(Sets.difference(Sets.newHashSet(this.tasks), Sets.newHashSet(activeTasks)).size())});
                    this.tasks.clear();
                    this.tasks.addAll(activeTasks);
                    this.managementMayBeNecessary.signalAll();
                } else {
                    log.info("Not active. Skipping storage sync.", new Object[0]);
                }
            } catch (Exception e) {
                log.warn(e, "Failed to sync tasks from storage!", new Object[0]);
                throw Throwables.propagate(e);
            }
        } finally {
            this.giant.unlock();
        }
    }
}
