package io.druid.indexing.overlord;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.common.Pair;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.concurrent.Execs;
import io.druid.concurrent.TaskThreadPriority;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.SegmentDescriptor;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import org.joda.time.DateTime;
import org.joda.time.Interval;

/* loaded from: input_file:io/druid/indexing/overlord/ThreadPoolTaskRunner.class */
public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker {
    private static final EmittingLogger log = new EmittingLogger(ThreadPoolTaskRunner.class);
    private final TaskToolboxFactory toolboxFactory;
    private final TaskConfig taskConfig;
    private final ConcurrentMap<Integer, ListeningExecutorService> exec = new ConcurrentHashMap();
    private final Set<ThreadPoolTaskRunnerWorkItem> runningItems = new ConcurrentSkipListSet();
    private final ServiceEmitter emitter;

    /* loaded from: input_file:io/druid/indexing/overlord/ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.class */
    private static class ThreadPoolTaskRunnerCallable implements Callable<TaskStatus> {
        private final Task task;
        private final TaskToolbox toolbox;

        public ThreadPoolTaskRunnerCallable(Task task, TaskToolbox taskToolbox) {
            this.task = task;
            this.toolbox = taskToolbox;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public TaskStatus call() {
            TaskStatus failure;
            long currentTimeMillis = System.currentTimeMillis();
            try {
                ThreadPoolTaskRunner.log.info("Running task: %s", new Object[]{this.task.getId()});
                failure = this.task.run(this.toolbox);
            } catch (InterruptedException e) {
                ThreadPoolTaskRunner.log.error(e, "Interrupted while running task[%s]", new Object[]{this.task});
                throw Throwables.propagate(e);
            } catch (Exception e2) {
                ThreadPoolTaskRunner.log.error(e2, "Exception while running task[%s]", new Object[]{this.task});
                failure = TaskStatus.failure(this.task.getId());
            } catch (Throwable th) {
                ThreadPoolTaskRunner.log.error(th, "Uncaught Throwable while running task[%s]", new Object[]{this.task});
                throw Throwables.propagate(th);
            }
            try {
                return failure.withDuration(System.currentTimeMillis() - currentTimeMillis);
            } catch (Exception e3) {
                ThreadPoolTaskRunner.log.error(e3, "Uncaught Exception during callback for task[%s]", new Object[]{this.task});
                throw Throwables.propagate(e3);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/druid/indexing/overlord/ThreadPoolTaskRunner$ThreadPoolTaskRunnerWorkItem.class */
    public static class ThreadPoolTaskRunnerWorkItem extends TaskRunnerWorkItem {
        private final Task task;

        private ThreadPoolTaskRunnerWorkItem(Task task, ListenableFuture<TaskStatus> listenableFuture) {
            super(task.getId(), listenableFuture);
            this.task = task;
        }

        public Task getTask() {
            return this.task;
        }
    }

    @Inject
    public ThreadPoolTaskRunner(TaskToolboxFactory taskToolboxFactory, TaskConfig taskConfig, ServiceEmitter serviceEmitter) {
        this.toolboxFactory = (TaskToolboxFactory) Preconditions.checkNotNull(taskToolboxFactory, "toolboxFactory");
        this.taskConfig = taskConfig;
        this.emitter = (ServiceEmitter) Preconditions.checkNotNull(serviceEmitter, "emitter");
    }

    @Override // io.druid.indexing.overlord.TaskRunner
    public List<Pair<Task, ListenableFuture<TaskStatus>>> restore() {
        return ImmutableList.of();
    }

    private static ListeningExecutorService buildExecutorService(int i) {
        return MoreExecutors.listeningDecorator(Execs.singleThreaded("task-runner-%d-priority-" + i, TaskThreadPriority.getThreadPriorityFromTaskPriority(i)));
    }

    @Override // io.druid.indexing.overlord.TaskRunner
    @LifecycleStop
    public void stop() {
        boolean z;
        Iterator<Map.Entry<Integer, ListeningExecutorService>> it = this.exec.entrySet().iterator();
        while (it.hasNext()) {
            try {
                it.next().getValue().shutdown();
            } catch (SecurityException e) {
                log.wtf(e, "I can't control my own threads!", new Object[0]);
            }
        }
        for (ThreadPoolTaskRunnerWorkItem threadPoolTaskRunnerWorkItem : this.runningItems) {
            Task task = threadPoolTaskRunnerWorkItem.getTask();
            long currentTimeMillis = System.currentTimeMillis();
            boolean z2 = false;
            if (this.taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {
                z = true;
                log.info("Starting graceful shutdown of task[%s].", new Object[]{task.getId()});
                try {
                    task.stopGracefully();
                    log.info("Graceful shutdown of task[%s] finished in %,dms with status[%s].", new Object[]{task.getId(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis), ((TaskStatus) threadPoolTaskRunnerWorkItem.getResult().get(new Interval(new DateTime(currentTimeMillis), this.taskConfig.getGracefulShutdownTimeout()).toDurationMillis(), TimeUnit.MILLISECONDS)).getStatusCode()});
                } catch (Exception e2) {
                    log.makeAlert(e2, "Graceful task shutdown failed: %s", new Object[]{task.getDataSource()}).addData("taskId", task.getId()).addData("dataSource", task.getDataSource()).emit();
                    log.warn(e2, "Graceful shutdown of task[%s] aborted with exception.", new Object[]{task.getId()});
                    z2 = true;
                }
            } else {
                z = false;
            }
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            ServiceMetricEvent.Builder dimension = ServiceMetricEvent.builder().setDimension("task", task.getId()).setDimension("dataSource", task.getDataSource()).setDimension("graceful", String.valueOf(z)).setDimension("error", String.valueOf(z2));
            this.emitter.emit(dimension.build("task/interrupt/count", 1L));
            this.emitter.emit(dimension.build("task/interrupt/elapsed", Long.valueOf(currentTimeMillis2)));
        }
        Iterator<Map.Entry<Integer, ListeningExecutorService>> it2 = this.exec.entrySet().iterator();
        while (it2.hasNext()) {
            try {
                it2.next().getValue().shutdownNow();
            } catch (SecurityException e3) {
                log.wtf(e3, "I can't control my own threads!", new Object[0]);
            }
        }
    }

    @Override // io.druid.indexing.overlord.TaskRunner
    public ListenableFuture<TaskStatus> run(Task task) {
        TaskToolbox build = this.toolboxFactory.build(task);
        Object contextValue = task.getContextValue("backgroundThreadPriority");
        int i = 0;
        if (contextValue != null) {
            if (contextValue instanceof Number) {
                i = ((Number) contextValue).intValue();
            } else if (contextValue instanceof String) {
                try {
                    i = Integer.parseInt(contextValue.toString());
                } catch (NumberFormatException e) {
                    log.error(e, "Error parsing task priority [%s] for task [%s]", new Object[]{contextValue, task.getId()});
                }
            }
        }
        if (!this.exec.containsKey(Integer.valueOf(i))) {
            ListeningExecutorService buildExecutorService = buildExecutorService(i);
            if (this.exec.putIfAbsent(Integer.valueOf(i), buildExecutorService) != null) {
                buildExecutorService.shutdownNow();
            }
        }
        ListenableFuture<TaskStatus> submit = this.exec.get(Integer.valueOf(i)).submit(new ThreadPoolTaskRunnerCallable(task, build));
        final ThreadPoolTaskRunnerWorkItem threadPoolTaskRunnerWorkItem = new ThreadPoolTaskRunnerWorkItem(task, submit);
        this.runningItems.add(threadPoolTaskRunnerWorkItem);
        Futures.addCallback(submit, new FutureCallback<TaskStatus>() { // from class: io.druid.indexing.overlord.ThreadPoolTaskRunner.1
            public void onSuccess(TaskStatus taskStatus) {
                ThreadPoolTaskRunner.this.runningItems.remove(threadPoolTaskRunnerWorkItem);
            }

            public void onFailure(Throwable th) {
                ThreadPoolTaskRunner.this.runningItems.remove(threadPoolTaskRunnerWorkItem);
            }
        });
        return submit;
    }

    @Override // io.druid.indexing.overlord.TaskRunner
    public void shutdown(String str) {
        for (ThreadPoolTaskRunnerWorkItem threadPoolTaskRunnerWorkItem : this.runningItems) {
            if (threadPoolTaskRunnerWorkItem.getTaskId().equals(str)) {
                threadPoolTaskRunnerWorkItem.getResult().cancel(true);
            }
        }
    }

    @Override // io.druid.indexing.overlord.TaskRunner
    public Collection<TaskRunnerWorkItem> getRunningTasks() {
        return ImmutableList.copyOf(this.runningItems);
    }

    @Override // io.druid.indexing.overlord.TaskRunner
    public Collection<TaskRunnerWorkItem> getPendingTasks() {
        return ImmutableList.of();
    }

    @Override // io.druid.indexing.overlord.TaskRunner
    public Collection<TaskRunnerWorkItem> getKnownTasks() {
        return ImmutableList.copyOf(this.runningItems);
    }

    @Override // io.druid.indexing.overlord.TaskRunner
    public Optional<ScalingStats> getScalingStats() {
        return Optional.absent();
    }

    public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> iterable) {
        return getQueryRunnerImpl(query);
    }

    public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> iterable) {
        return getQueryRunnerImpl(query);
    }

    private <T> QueryRunner<T> getQueryRunnerImpl(Query<T> query) {
        QueryRunner<T> queryRunner;
        QueryRunner<T> queryRunner2 = null;
        String str = (String) Iterables.getOnlyElement(query.getDataSource().getNames());
        UnmodifiableIterator it = ImmutableList.copyOf(this.runningItems).iterator();
        while (it.hasNext()) {
            Task task = ((ThreadPoolTaskRunnerWorkItem) it.next()).getTask();
            if (task.getDataSource().equals(str) && (queryRunner = task.getQueryRunner(query)) != null) {
                if (queryRunner2 == null) {
                    queryRunner2 = queryRunner;
                } else {
                    log.makeAlert("Found too many query runners for datasource", new Object[0]).addData("dataSource", str).emit();
                }
            }
        }
        return queryRunner2 == null ? new NoopQueryRunner() : queryRunner2;
    }
}
