package io.druid.indexing.overlord;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.metamx.common.ISE;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.InputStreamResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.curator.cache.PathChildrenCacheFactory;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.overlord.setup.WorkerSelectStrategy;
import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.Worker;
import io.druid.server.initialization.IndexerZkConfig;
import io.druid.tasklogs.TaskLogStreamer;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;

/* loaded from: input_file:io/druid/indexing/overlord/RemoteTaskRunner.class */
public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer {
    private static final EmittingLogger log = new EmittingLogger(RemoteTaskRunner.class);
    private static final StatusResponseHandler RESPONSE_HANDLER = new StatusResponseHandler(Charsets.UTF_8);
    private static final Joiner JOINER = Joiner.on("/");
    private final ObjectMapper jsonMapper;
    private final RemoteTaskRunnerConfig config;
    private final IndexerZkConfig indexerZkConfig;
    private final CuratorFramework cf;
    private final PathChildrenCacheFactory pathChildrenCacheFactory;
    private final PathChildrenCache workerPathCache;
    private final HttpClient httpClient;
    private final Supplier<WorkerBehaviorConfig> workerConfigRef;
    private final ConcurrentMap<String, ZkWorker> zkWorkers = new ConcurrentHashMap();
    private final ConcurrentMap<String, Task> pendingTaskPayloads = new ConcurrentHashMap();
    private final RemoteTaskRunnerWorkQueue pendingTasks = new RemoteTaskRunnerWorkQueue();
    private final RemoteTaskRunnerWorkQueue runningTasks = new RemoteTaskRunnerWorkQueue();
    private final RemoteTaskRunnerWorkQueue completeTasks = new RemoteTaskRunnerWorkQueue();
    private final ExecutorService runPendingTasksExec = Executors.newSingleThreadExecutor();
    private final Object statusLock = new Object();
    private volatile boolean started = false;

    /* renamed from: io.druid.indexing.overlord.RemoteTaskRunner$6, reason: invalid class name */
    /* loaded from: input_file:io/druid/indexing/overlord/RemoteTaskRunner$6.class */
    static /* synthetic */ class AnonymousClass6 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type = new int[PathChildrenCacheEvent.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_ADDED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_UPDATED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_REMOVED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.INITIALIZED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public RemoteTaskRunner(ObjectMapper objectMapper, RemoteTaskRunnerConfig remoteTaskRunnerConfig, IndexerZkConfig indexerZkConfig, CuratorFramework curatorFramework, PathChildrenCacheFactory pathChildrenCacheFactory, HttpClient httpClient, Supplier<WorkerBehaviorConfig> supplier) {
        this.jsonMapper = objectMapper;
        this.config = remoteTaskRunnerConfig;
        this.indexerZkConfig = indexerZkConfig;
        this.cf = curatorFramework;
        this.pathChildrenCacheFactory = pathChildrenCacheFactory;
        this.workerPathCache = pathChildrenCacheFactory.make(curatorFramework, indexerZkConfig.getAnnouncementsPath());
        this.httpClient = httpClient;
        this.workerConfigRef = supplier;
    }

    @LifecycleStart
    public void start() {
        try {
            if (this.started) {
                return;
            }
            final MutableInt mutableInt = new MutableInt(1);
            final Object obj = new Object();
            this.workerPathCache.getListenable().addListener(new PathChildrenCacheListener() { // from class: io.druid.indexing.overlord.RemoteTaskRunner.1
                public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                    switch (AnonymousClass6.$SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[pathChildrenCacheEvent.getType().ordinal()]) {
                        case 1:
                            Worker worker = (Worker) RemoteTaskRunner.this.jsonMapper.readValue(pathChildrenCacheEvent.getData().getData(), Worker.class);
                            synchronized (obj) {
                                mutableInt.increment();
                            }
                            Futures.addCallback(RemoteTaskRunner.this.addWorker(worker), new FutureCallback<ZkWorker>() { // from class: io.druid.indexing.overlord.RemoteTaskRunner.1.1
                                public void onSuccess(ZkWorker zkWorker) {
                                    synchronized (obj) {
                                        mutableInt.decrement();
                                        obj.notifyAll();
                                    }
                                }

                                public void onFailure(Throwable th) {
                                    synchronized (obj) {
                                        mutableInt.decrement();
                                        obj.notifyAll();
                                    }
                                }
                            });
                            return;
                        case 2:
                            RemoteTaskRunner.this.updateWorker((Worker) RemoteTaskRunner.this.jsonMapper.readValue(pathChildrenCacheEvent.getData().getData(), Worker.class));
                            return;
                        case 3:
                            RemoteTaskRunner.this.removeWorker((Worker) RemoteTaskRunner.this.jsonMapper.readValue(pathChildrenCacheEvent.getData().getData(), Worker.class));
                            return;
                        case 4:
                            synchronized (obj) {
                                mutableInt.decrement();
                                obj.notifyAll();
                            }
                            return;
                        default:
                            return;
                    }
                }
            });
            this.workerPathCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
            synchronized (obj) {
                while (mutableInt.intValue() > 0) {
                    obj.wait();
                }
            }
            this.started = true;
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    @LifecycleStop
    public void stop() {
        try {
            if (this.started) {
                this.started = false;
                Iterator<ZkWorker> it = this.zkWorkers.values().iterator();
                while (it.hasNext()) {
                    it.next().close();
                }
                this.workerPathCache.close();
            }
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    @Override // io.druid.indexing.overlord.TaskRunner
    public Collection<ZkWorker> getWorkers() {
        return ImmutableList.copyOf(this.zkWorkers.values());
    }

    @Override // io.druid.indexing.overlord.TaskRunner
    public Collection<RemoteTaskRunnerWorkItem> getRunningTasks() {
        return ImmutableList.copyOf(this.runningTasks.values());
    }

    @Override // io.druid.indexing.overlord.TaskRunner
    public Collection<RemoteTaskRunnerWorkItem> getPendingTasks() {
        return ImmutableList.copyOf(this.pendingTasks.values());
    }

    @Override // io.druid.indexing.overlord.TaskRunner
    public Collection<RemoteTaskRunnerWorkItem> getKnownTasks() {
        return ImmutableList.copyOf(Iterables.concat(this.pendingTasks.values(), this.runningTasks.values(), this.completeTasks.values()));
    }

    public ZkWorker findWorkerRunningTask(String str) {
        for (ZkWorker zkWorker : this.zkWorkers.values()) {
            if (zkWorker.isRunningTask(str)) {
                return zkWorker;
            }
        }
        return null;
    }

    public boolean isWorkerRunningTask(Worker worker, String str) {
        ZkWorker zkWorker = this.zkWorkers.get(worker.getHost());
        return zkWorker != null && zkWorker.isRunningTask(str);
    }

    @Override // io.druid.indexing.overlord.TaskRunner
    public ListenableFuture<TaskStatus> run(Task task) {
        RemoteTaskRunnerWorkItem remoteTaskRunnerWorkItem = this.pendingTasks.get(task.getId());
        if (remoteTaskRunnerWorkItem != null) {
            log.info("Assigned a task[%s] that is already pending, not doing anything", new Object[]{task.getId()});
            return remoteTaskRunnerWorkItem.getResult();
        }
        RemoteTaskRunnerWorkItem remoteTaskRunnerWorkItem2 = this.runningTasks.get(task.getId());
        if (remoteTaskRunnerWorkItem2 == null) {
            RemoteTaskRunnerWorkItem remoteTaskRunnerWorkItem3 = this.completeTasks.get(task.getId());
            return remoteTaskRunnerWorkItem3 != null ? remoteTaskRunnerWorkItem3.getResult() : addPendingTask(task).getResult();
        }
        ZkWorker findWorkerRunningTask = findWorkerRunningTask(task.getId());
        if (findWorkerRunningTask == null) {
            log.warn("Told to run task[%s], but no worker has started running it yet.", new Object[]{task.getId()});
        } else {
            log.info("Task[%s] already running on %s.", new Object[]{task.getId(), findWorkerRunningTask.getWorker().getHost()});
            TaskAnnouncement taskAnnouncement = findWorkerRunningTask.getRunningTasks().get(task.getId());
            if (taskAnnouncement.getTaskStatus().isComplete()) {
                taskComplete(remoteTaskRunnerWorkItem2, findWorkerRunningTask, taskAnnouncement.getTaskStatus());
            }
        }
        return remoteTaskRunnerWorkItem2.getResult();
    }

    @Override // io.druid.indexing.overlord.TaskRunner
    public void shutdown(String str) {
        if (!this.started) {
            log.info("This TaskRunner is stopped. Ignoring shutdown command for task: %s", new Object[]{str});
            return;
        }
        if (this.pendingTasks.remove(str) != null) {
            this.pendingTaskPayloads.remove(str);
            log.info("Removed task from pending queue: %s", new Object[]{str});
            return;
        }
        if (this.completeTasks.containsKey(str)) {
            cleanup(str);
            return;
        }
        ZkWorker findWorkerRunningTask = findWorkerRunningTask(str);
        if (findWorkerRunningTask == null) {
            log.info("Can't shutdown! No worker running task %s", new Object[]{str});
            return;
        }
        try {
            StatusResponseHolder statusResponseHolder = (StatusResponseHolder) this.httpClient.go(new Request(HttpMethod.POST, makeWorkerURL(findWorkerRunningTask.getWorker(), String.format("/task/%s/shutdown", str))), RESPONSE_HANDLER).get();
            log.info("Sent shutdown message to worker: %s, status %s, response: %s", new Object[]{findWorkerRunningTask.getWorker().getHost(), statusResponseHolder.getStatus(), statusResponseHolder.getContent()});
            if (!statusResponseHolder.getStatus().equals(HttpResponseStatus.ACCEPTED)) {
                log.error("Shutdown failed for %s! Are you sure the task was running?", new Object[]{str});
            }
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    public Optional<ByteSource> streamTaskLog(String str, long j) {
        ZkWorker findWorkerRunningTask = findWorkerRunningTask(str);
        if (findWorkerRunningTask == null) {
            return Optional.absent();
        }
        final URL makeWorkerURL = makeWorkerURL(findWorkerRunningTask.getWorker(), String.format("/task/%s/log?offset=%d", str, Long.valueOf(j)));
        return Optional.of(new ByteSource() { // from class: io.druid.indexing.overlord.RemoteTaskRunner.2
            public InputStream openStream() throws IOException {
                try {
                    return (InputStream) RemoteTaskRunner.this.httpClient.go(new Request(HttpMethod.GET, makeWorkerURL), new InputStreamResponseHandler()).get();
                } catch (InterruptedException e) {
                    throw Throwables.propagate(e);
                } catch (ExecutionException e2) {
                    Throwables.propagateIfPossible(e2.getCause(), IOException.class);
                    throw Throwables.propagate(e2);
                }
            }
        });
    }

    private URL makeWorkerURL(Worker worker, String str) {
        Preconditions.checkArgument(str.startsWith("/"), "path must start with '/': %s", new Object[]{str});
        try {
            return new URL(String.format("http://%s/druid/worker/v1%s", worker.getHost(), str));
        } catch (MalformedURLException e) {
            throw Throwables.propagate(e);
        }
    }

    private RemoteTaskRunnerWorkItem addPendingTask(Task task) {
        log.info("Added pending task %s", new Object[]{task.getId()});
        RemoteTaskRunnerWorkItem remoteTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem(task.getId(), null);
        this.pendingTaskPayloads.put(task.getId(), task);
        this.pendingTasks.put(task.getId(), remoteTaskRunnerWorkItem);
        runPendingTasks();
        return remoteTaskRunnerWorkItem;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void runPendingTasks() {
        this.runPendingTasksExec.submit(new Callable<Void>() { // from class: io.druid.indexing.overlord.RemoteTaskRunner.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                try {
                    for (RemoteTaskRunnerWorkItem remoteTaskRunnerWorkItem : Lists.newArrayList(RemoteTaskRunner.this.pendingTasks.values())) {
                        String taskId = remoteTaskRunnerWorkItem.getTaskId();
                        try {
                            if (RemoteTaskRunner.this.tryAssignTask((Task) RemoteTaskRunner.this.pendingTaskPayloads.get(taskId), remoteTaskRunnerWorkItem)) {
                                RemoteTaskRunner.this.pendingTaskPayloads.remove(taskId);
                            }
                        } catch (Exception e) {
                            RemoteTaskRunner.log.makeAlert(e, "Exception while trying to assign task", new Object[0]).addData("taskId", remoteTaskRunnerWorkItem.getTaskId()).emit();
                            RemoteTaskRunner.this.taskComplete(RemoteTaskRunner.this.pendingTasks.remove(taskId), null, TaskStatus.failure(taskId));
                        }
                    }
                    return null;
                } catch (Exception e2) {
                    RemoteTaskRunner.log.makeAlert(e2, "Exception in running pending tasks", new Object[0]).emit();
                    return null;
                }
            }
        });
    }

    private void cleanup(String str) {
        if (this.started) {
            RemoteTaskRunnerWorkItem remove = this.completeTasks.remove(str);
            Worker worker = remove.getWorker();
            if (remove == null || worker == null) {
                log.makeAlert("WTF?! Asked to cleanup nonexistent task", new Object[0]).addData("taskId", str).emit();
                return;
            }
            String host = worker.getHost();
            log.info("Cleaning up task[%s] on worker[%s]", new Object[]{str, host});
            String join = JOINER.join(this.indexerZkConfig.getStatusPath(), host, new Object[]{str});
            try {
                this.cf.delete().guaranteed().forPath(join);
            } catch (Exception e) {
                throw Throwables.propagate(e);
            } catch (KeeperException.NoNodeException e2) {
                log.info("Tried to delete status path[%s] that didn't exist! Must've gone away already?", new Object[]{join});
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean tryAssignTask(Task task, RemoteTaskRunnerWorkItem remoteTaskRunnerWorkItem) throws Exception {
        WorkerSelectStrategy workerSelectStrategy;
        Preconditions.checkNotNull(task, "task");
        Preconditions.checkNotNull(remoteTaskRunnerWorkItem, "taskRunnerWorkItem");
        Preconditions.checkArgument(task.getId().equals(remoteTaskRunnerWorkItem.getTaskId()), "task id != workItem id");
        if (this.runningTasks.containsKey(task.getId()) || findWorkerRunningTask(task.getId()) != null) {
            log.info("Task[%s] already running.", new Object[]{task.getId()});
            return true;
        }
        WorkerBehaviorConfig workerBehaviorConfig = (WorkerBehaviorConfig) this.workerConfigRef.get();
        if (workerBehaviorConfig == null || workerBehaviorConfig.getSelectStrategy() == null) {
            log.warn("No worker selections strategy set. Using default.", new Object[0]);
            workerSelectStrategy = WorkerBehaviorConfig.DEFAULT_STRATEGY;
        } else {
            workerSelectStrategy = workerBehaviorConfig.getSelectStrategy();
        }
        Optional<ImmutableZkWorker> findWorkerForTask = workerSelectStrategy.findWorkerForTask(this.config, ImmutableMap.copyOf(Maps.transformEntries(this.zkWorkers, new Maps.EntryTransformer<String, ZkWorker, ImmutableZkWorker>() { // from class: io.druid.indexing.overlord.RemoteTaskRunner.4
            public ImmutableZkWorker transformEntry(String str, ZkWorker zkWorker) {
                return zkWorker.toImmutable();
            }
        })), task);
        if (findWorkerForTask.isPresent()) {
            announceTask(task, this.zkWorkers.get(((ImmutableZkWorker) findWorkerForTask.get()).getWorker().getHost()), remoteTaskRunnerWorkItem);
            return true;
        }
        log.debug("Worker nodes %s do not have capacity to run any more tasks!", new Object[]{this.zkWorkers.values()});
        return false;
    }

    private void announceTask(Task task, ZkWorker zkWorker, RemoteTaskRunnerWorkItem remoteTaskRunnerWorkItem) throws Exception {
        Preconditions.checkArgument(task.getId().equals(remoteTaskRunnerWorkItem.getTaskId()), "task id != workItem id");
        Worker worker = zkWorker.getWorker();
        log.info("Coordinator asking Worker[%s] to add task[%s]", new Object[]{worker.getHost(), task.getId()});
        byte[] writeValueAsBytes = this.jsonMapper.writeValueAsBytes(task);
        if (writeValueAsBytes.length > this.config.getMaxZnodeBytes()) {
            throw new ISE("Length of raw bytes for task too large[%,d > %,d]", new Object[]{Integer.valueOf(writeValueAsBytes.length), Long.valueOf(this.config.getMaxZnodeBytes())});
        }
        String join = JOINER.join(this.indexerZkConfig.getTasksPath(), worker.getHost(), new Object[]{task.getId()});
        if (this.cf.checkExists().forPath(join) == null) {
            ((ACLBackgroundPathAndBytesable) this.cf.create().withMode(CreateMode.EPHEMERAL)).forPath(join, writeValueAsBytes);
        }
        RemoteTaskRunnerWorkItem remove = this.pendingTasks.remove(task.getId());
        if (remove == null) {
            log.makeAlert("WTF?! Got a null work item from pending tasks?! How can this be?!", new Object[0]).addData("taskId", task.getId()).emit();
            return;
        }
        RemoteTaskRunnerWorkItem withWorker = remove.withWorker(worker);
        this.runningTasks.put(task.getId(), withWorker);
        log.info("Task %s switched from pending to running (on [%s])", new Object[]{task.getId(), withWorker.getWorker().getHost()});
        Stopwatch createUnstarted = Stopwatch.createUnstarted();
        createUnstarted.start();
        synchronized (this.statusLock) {
            while (true) {
                if (isWorkerRunningTask(worker, task.getId())) {
                    break;
                }
                long millis = this.config.getTaskAssignmentTimeout().toStandardDuration().getMillis();
                this.statusLock.wait(millis);
                long elapsed = createUnstarted.elapsed(TimeUnit.MILLISECONDS);
                if (elapsed >= millis) {
                    log.error("Something went wrong! [%s] never ran task [%s]! Timeout: (%s >= %s)!", new Object[]{worker.getHost(), task.getId(), Long.valueOf(elapsed), this.config.getTaskAssignmentTimeout()});
                    taskComplete(remoteTaskRunnerWorkItem, zkWorker, TaskStatus.failure(task.getId()));
                    break;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ListenableFuture<ZkWorker> addWorker(final Worker worker) {
        log.info("Worker[%s] reportin' for duty!", new Object[]{worker.getHost()});
        try {
            PathChildrenCache make = this.pathChildrenCacheFactory.make(this.cf, JOINER.join(this.indexerZkConfig.getStatusPath(), worker.getHost(), new Object[0]));
            final SettableFuture create = SettableFuture.create();
            final ZkWorker zkWorker = new ZkWorker(worker, make, this.jsonMapper);
            zkWorker.addListener(new PathChildrenCacheListener() { // from class: io.druid.indexing.overlord.RemoteTaskRunner.5
                public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                    RemoteTaskRunnerWorkItem remoteTaskRunnerWorkItem;
                    synchronized (RemoteTaskRunner.this.statusLock) {
                        try {
                            switch (AnonymousClass6.$SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[pathChildrenCacheEvent.getType().ordinal()]) {
                                case 1:
                                case 2:
                                    String nodeFromPath = ZKPaths.getNodeFromPath(pathChildrenCacheEvent.getData().getPath());
                                    TaskStatus taskStatus = (TaskStatus) RemoteTaskRunner.this.jsonMapper.readValue(pathChildrenCacheEvent.getData().getData(), TaskStatus.class);
                                    RemoteTaskRunner.log.info("Worker[%s] wrote %s status for task: %s", new Object[]{zkWorker.getWorker().getHost(), taskStatus.getStatusCode(), nodeFromPath});
                                    RemoteTaskRunner.this.statusLock.notifyAll();
                                    RemoteTaskRunnerWorkItem remoteTaskRunnerWorkItem2 = RemoteTaskRunner.this.runningTasks.get(nodeFromPath);
                                    if (remoteTaskRunnerWorkItem2 != null) {
                                        remoteTaskRunnerWorkItem = remoteTaskRunnerWorkItem2;
                                    } else {
                                        RemoteTaskRunnerWorkItem remoteTaskRunnerWorkItem3 = new RemoteTaskRunnerWorkItem(nodeFromPath, zkWorker.getWorker());
                                        RemoteTaskRunnerWorkItem putIfAbsent = RemoteTaskRunner.this.runningTasks.putIfAbsent(nodeFromPath, remoteTaskRunnerWorkItem3);
                                        if (putIfAbsent == null) {
                                            RemoteTaskRunner.log.warn("Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s", new Object[]{zkWorker.getWorker().getHost(), nodeFromPath});
                                            remoteTaskRunnerWorkItem = remoteTaskRunnerWorkItem3;
                                        } else {
                                            remoteTaskRunnerWorkItem = putIfAbsent;
                                        }
                                    }
                                    if (taskStatus.isComplete()) {
                                        RemoteTaskRunner.this.taskComplete(remoteTaskRunnerWorkItem, zkWorker, taskStatus);
                                        RemoteTaskRunner.this.runPendingTasks();
                                        break;
                                    }
                                    break;
                                case 3:
                                    String nodeFromPath2 = ZKPaths.getNodeFromPath(pathChildrenCacheEvent.getData().getPath());
                                    RemoteTaskRunnerWorkItem remove = RemoteTaskRunner.this.runningTasks.remove(nodeFromPath2);
                                    if (remove == null) {
                                        RemoteTaskRunner.log.info("Task[%s] went bye bye.", new Object[]{nodeFromPath2});
                                        break;
                                    } else {
                                        RemoteTaskRunner.log.info("Task[%s] just disappeared!", new Object[]{nodeFromPath2});
                                        remove.setResult(TaskStatus.failure(remove.getTaskId()));
                                        break;
                                    }
                                case 4:
                                    if (RemoteTaskRunner.this.zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) {
                                        create.set(zkWorker);
                                    } else {
                                        String format = String.format("WTF?! Tried to add already-existing worker[%s]", worker.getHost());
                                        RemoteTaskRunner.log.makeAlert(format, new Object[0]).addData("workerHost", worker.getHost()).addData("workerIp", worker.getIp()).emit();
                                        create.setException(new IllegalStateException(format));
                                    }
                                    RemoteTaskRunner.this.runPendingTasks();
                                    break;
                            }
                        } catch (Exception e) {
                            RemoteTaskRunner.log.makeAlert(e, "Failed to handle new worker status", new Object[0]).addData("worker", zkWorker.getWorker().getHost()).addData("znode", pathChildrenCacheEvent.getData().getPath()).emit();
                        }
                    }
                }
            });
            zkWorker.start();
            return create;
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateWorker(Worker worker) {
        ZkWorker zkWorker = this.zkWorkers.get(worker.getHost());
        if (zkWorker == null) {
            log.warn("WTF, worker[%s] updated its announcement but we didn't have a ZkWorker for it. Ignoring.", new Object[]{worker.getHost()});
        } else {
            log.info("Worker[%s] updated its announcement from[%s] to[%s].", new Object[]{worker.getHost(), zkWorker.getWorker(), worker});
            zkWorker.setWorker(worker);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeWorker(Worker worker) {
        log.info("Kaboom! Worker[%s] removed!", new Object[]{worker.getHost()});
        ZkWorker zkWorker = this.zkWorkers.get(worker.getHost());
        try {
            if (zkWorker != null) {
                try {
                    ArrayList<String> newArrayList = Lists.newArrayList((Iterable) this.cf.getChildren().forPath(JOINER.join(this.indexerZkConfig.getTasksPath(), worker.getHost(), new Object[0])));
                    log.info("[%s]: Found %d tasks assigned", new Object[]{worker.getHost(), Integer.valueOf(newArrayList.size())});
                    for (Map.Entry<String, RemoteTaskRunnerWorkItem> entry : this.runningTasks.entrySet()) {
                        if (entry.getValue() == null) {
                            log.error("Huh? null work item for [%s]", new Object[]{entry.getKey()});
                        } else if (entry.getValue().getWorker() == null) {
                            log.error("Huh? no worker for [%s]", new Object[]{entry.getKey()});
                        } else if (entry.getValue().getWorker().getHost().equalsIgnoreCase(worker.getHost())) {
                            log.info("[%s]: Found [%s] running", new Object[]{worker.getHost(), entry.getKey()});
                            newArrayList.add(entry.getKey());
                        }
                    }
                    for (String str : newArrayList) {
                        RemoteTaskRunnerWorkItem remove = this.runningTasks.remove(str);
                        if (remove != null) {
                            String join = JOINER.join(this.indexerZkConfig.getTasksPath(), worker.getHost(), new Object[]{str});
                            if (this.cf.checkExists().forPath(join) != null) {
                                this.cf.delete().guaranteed().forPath(join);
                            }
                            log.info("Failing task[%s]", new Object[]{str});
                            remove.setResult(TaskStatus.failure(remove.getTaskId()));
                        } else {
                            log.warn("RemoteTaskRunner has no knowledge of task[%s]", new Object[]{str});
                        }
                    }
                    try {
                        zkWorker.close();
                    } catch (Exception e) {
                        log.error(e, "Exception closing worker[%s]!", new Object[]{worker.getHost()});
                    }
                    this.zkWorkers.remove(worker.getHost());
                } catch (Exception e2) {
                    throw Throwables.propagate(e2);
                }
            }
        } catch (Throwable th) {
            try {
                zkWorker.close();
            } catch (Exception e3) {
                log.error(e3, "Exception closing worker[%s]!", new Object[]{worker.getHost()});
            }
            this.zkWorkers.remove(worker.getHost());
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void taskComplete(RemoteTaskRunnerWorkItem remoteTaskRunnerWorkItem, ZkWorker zkWorker, TaskStatus taskStatus) {
        Preconditions.checkNotNull(remoteTaskRunnerWorkItem, "taskRunnerWorkItem");
        Preconditions.checkNotNull(taskStatus, "taskStatus");
        if (zkWorker != null) {
            log.info("Worker[%s] completed task[%s] with status[%s]", new Object[]{zkWorker.getWorker().getHost(), taskStatus.getId(), taskStatus.getStatusCode()});
            zkWorker.setLastCompletedTaskTime(new DateTime());
        } else {
            log.info("Workerless task[%s] completed with status[%s]", new Object[]{taskStatus.getId(), taskStatus.getStatusCode()});
        }
        this.completeTasks.put(taskStatus.getId(), remoteTaskRunnerWorkItem);
        this.runningTasks.remove(taskStatus.getId());
        remoteTaskRunnerWorkItem.setResult(taskStatus);
    }
}
