package io.druid.indexing.overlord.autoscaling;

import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.emitter.EmittingLogger;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.WorkerTaskRunner;
import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.overlord.setup.WorkerSelectStrategy;
import io.druid.indexing.worker.Worker;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import org.joda.time.DateTime;
import org.joda.time.Duration;

/* loaded from: input_file:io/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerResourceManagementStrategy.class */
public class PendingTaskBasedWorkerResourceManagementStrategy extends AbstractWorkerResourceManagementStrategy {
    private static final EmittingLogger log = new EmittingLogger(PendingTaskBasedWorkerResourceManagementStrategy.class);
    private final PendingTaskBasedWorkerResourceManagementConfig config;
    private final Supplier<WorkerBehaviorConfig> workerConfigRef;
    private final ScalingStats scalingStats;
    private final Object lock;
    private final Set<String> currentlyProvisioning;
    private final Set<String> currentlyTerminating;
    private DateTime lastProvisionTime;
    private DateTime lastTerminateTime;

    @Inject
    public PendingTaskBasedWorkerResourceManagementStrategy(PendingTaskBasedWorkerResourceManagementConfig pendingTaskBasedWorkerResourceManagementConfig, Supplier<WorkerBehaviorConfig> supplier, ResourceManagementSchedulerConfig resourceManagementSchedulerConfig, ScheduledExecutorFactory scheduledExecutorFactory) {
        this(pendingTaskBasedWorkerResourceManagementConfig, supplier, resourceManagementSchedulerConfig, scheduledExecutorFactory.create(1, "PendingTaskBasedResourceManagement-manager--%d"));
    }

    public PendingTaskBasedWorkerResourceManagementStrategy(PendingTaskBasedWorkerResourceManagementConfig pendingTaskBasedWorkerResourceManagementConfig, Supplier<WorkerBehaviorConfig> supplier, ResourceManagementSchedulerConfig resourceManagementSchedulerConfig, ScheduledExecutorService scheduledExecutorService) {
        super(resourceManagementSchedulerConfig, scheduledExecutorService);
        this.lock = new Object();
        this.currentlyProvisioning = Sets.newHashSet();
        this.currentlyTerminating = Sets.newHashSet();
        this.lastProvisionTime = new DateTime();
        this.lastTerminateTime = new DateTime();
        this.config = pendingTaskBasedWorkerResourceManagementConfig;
        this.workerConfigRef = supplier;
        this.scalingStats = new ScalingStats(pendingTaskBasedWorkerResourceManagementConfig.getNumEventsToTrack());
    }

    @Override // io.druid.indexing.overlord.autoscaling.AbstractWorkerResourceManagementStrategy
    public boolean doProvision(WorkerTaskRunner workerTaskRunner) {
        Collection<Task> pendingTaskPayloads = workerTaskRunner.getPendingTaskPayloads();
        Collection<ImmutableWorkerInfo> workers = workerTaskRunner.getWorkers();
        synchronized (this.lock) {
            boolean z = false;
            WorkerBehaviorConfig workerBehaviorConfig = (WorkerBehaviorConfig) this.workerConfigRef.get();
            if (workerBehaviorConfig == null || workerBehaviorConfig.getAutoScaler() == null) {
                log.error("No workerConfig available, cannot provision new workers.", new Object[0]);
                return false;
            }
            this.currentlyProvisioning.removeAll(getWorkerNodeIDs(Collections2.transform(workers, new Function<ImmutableWorkerInfo, Worker>() { // from class: io.druid.indexing.overlord.autoscaling.PendingTaskBasedWorkerResourceManagementStrategy.1
                public Worker apply(ImmutableWorkerInfo immutableWorkerInfo) {
                    return immutableWorkerInfo.getWorker();
                }
            }), workerBehaviorConfig));
            if (this.currentlyProvisioning.isEmpty()) {
                int scaleUpNodeCount = getScaleUpNodeCount(workerTaskRunner.getConfig(), workerBehaviorConfig, pendingTaskPayloads, workers);
                while (true) {
                    if (scaleUpNodeCount <= 0) {
                        break;
                    }
                    AutoScalingData provision = workerBehaviorConfig.getAutoScaler().provision();
                    ImmutableList of = provision == null ? ImmutableList.of() : provision.getNodeIds();
                    if (of.isEmpty()) {
                        log.warn("NewNodes is empty, returning from provision loop", new Object[0]);
                        break;
                    }
                    this.currentlyProvisioning.addAll(of);
                    this.lastProvisionTime = new DateTime();
                    this.scalingStats.addProvisionEvent(provision);
                    scaleUpNodeCount -= provision.getNodeIds().size();
                    z = true;
                }
            } else {
                Duration duration = new Duration(this.lastProvisionTime, new DateTime());
                log.info("%s provisioning. Current wait time: %s", new Object[]{this.currentlyProvisioning, duration});
                if (duration.isLongerThan(this.config.getMaxScalingDuration().toStandardDuration())) {
                    log.makeAlert("Worker node provisioning taking too long!", new Object[0]).addData("millisSinceLastProvision", Long.valueOf(duration.getMillis())).addData("provisioningCount", Integer.valueOf(this.currentlyProvisioning.size())).emit();
                    workerBehaviorConfig.getAutoScaler().terminateWithIds(Lists.newArrayList(this.currentlyProvisioning));
                    this.currentlyProvisioning.clear();
                }
            }
            return z;
        }
    }

    private static Collection<String> getWorkerNodeIDs(Collection<Worker> collection, WorkerBehaviorConfig workerBehaviorConfig) {
        return workerBehaviorConfig.getAutoScaler().ipToIdLookup(Lists.newArrayList(Iterables.transform(collection, new Function<Worker, String>() { // from class: io.druid.indexing.overlord.autoscaling.PendingTaskBasedWorkerResourceManagementStrategy.2
            public String apply(Worker worker) {
                return worker.getIp();
            }
        })));
    }

    int getScaleUpNodeCount(WorkerTaskRunnerConfig workerTaskRunnerConfig, WorkerBehaviorConfig workerBehaviorConfig, Collection<Task> collection, Collection<ImmutableWorkerInfo> collection2) {
        int minNumWorkers = workerBehaviorConfig.getAutoScaler().getMinNumWorkers();
        int maxNumWorkers = workerBehaviorConfig.getAutoScaler().getMaxNumWorkers();
        int size = Collections2.filter(collection2, ResourceManagementUtil.createValidWorkerPredicate(this.config)).size();
        int max = Math.max(minNumWorkers - size, Math.min(this.config.getMaxScalingStep(), size == 0 ? minNumWorkers : getWorkersNeededToAssignTasks(workerTaskRunnerConfig, workerBehaviorConfig, collection, collection2)));
        if (max <= 0 || size < maxNumWorkers) {
            return Math.min(max, maxNumWorkers - size);
        }
        log.warn("Unable to provision more workers. Current workerCount[%d] maximum workerCount[%d].", new Object[]{Integer.valueOf(size), Integer.valueOf(maxNumWorkers)});
        return 0;
    }

    int getWorkersNeededToAssignTasks(WorkerTaskRunnerConfig workerTaskRunnerConfig, WorkerBehaviorConfig workerBehaviorConfig, Collection<Task> collection, Collection<ImmutableWorkerInfo> collection2) {
        ImmutableWorkerInfo createDummyWorker;
        Collection<ImmutableWorkerInfo> filter = Collections2.filter(collection2, ResourceManagementUtil.createValidWorkerPredicate(this.config));
        HashMap newHashMap = Maps.newHashMap();
        for (ImmutableWorkerInfo immutableWorkerInfo : filter) {
            newHashMap.put(immutableWorkerInfo.getWorker().getHost(), immutableWorkerInfo);
        }
        WorkerSelectStrategy selectStrategy = workerBehaviorConfig.getSelectStrategy();
        int i = 0;
        int expectedWorkerCapacity = getExpectedWorkerCapacity(collection2);
        for (Task task : collection) {
            Optional<ImmutableWorkerInfo> findWorkerForTask = selectStrategy.findWorkerForTask(workerTaskRunnerConfig, ImmutableMap.copyOf(newHashMap), task);
            if (findWorkerForTask.isPresent()) {
                createDummyWorker = (ImmutableWorkerInfo) findWorkerForTask.get();
            } else {
                createDummyWorker = createDummyWorker("dummy" + i, expectedWorkerCapacity, workerTaskRunnerConfig.getMinWorkerVersion());
                i++;
            }
            newHashMap.put(createDummyWorker.getWorker().getHost(), workerWithTask(createDummyWorker, task));
        }
        return i;
    }

    @Override // io.druid.indexing.overlord.autoscaling.AbstractWorkerResourceManagementStrategy
    public boolean doTerminate(WorkerTaskRunner workerTaskRunner) {
        Collection<ImmutableWorkerInfo> workers = workerTaskRunner.getWorkers();
        synchronized (this.lock) {
            WorkerBehaviorConfig workerBehaviorConfig = (WorkerBehaviorConfig) this.workerConfigRef.get();
            if (workerBehaviorConfig == null) {
                log.warn("No workerConfig available, cannot terminate workers.", new Object[0]);
                return false;
            }
            if (!this.currentlyProvisioning.isEmpty()) {
                log.debug("Already provisioning nodes, Not Terminating any nodes.", new Object[0]);
                return false;
            }
            boolean z = false;
            Collection<String> workerNodeIDs = getWorkerNodeIDs(workerTaskRunner.getLazyWorkers(), workerBehaviorConfig);
            HashSet newHashSet = Sets.newHashSet();
            for (String str : this.currentlyTerminating) {
                if (workerNodeIDs.contains(str)) {
                    newHashSet.add(str);
                }
            }
            this.currentlyTerminating.clear();
            this.currentlyTerminating.addAll(newHashSet);
            if (this.currentlyTerminating.isEmpty()) {
                ArrayList newArrayList = Lists.newArrayList(Collections2.transform(workerTaskRunner.markWorkersLazy(ResourceManagementUtil.createLazyWorkerPredicate(this.config), maxWorkersToTerminate(workers, workerBehaviorConfig)), new Function<Worker, String>() { // from class: io.druid.indexing.overlord.autoscaling.PendingTaskBasedWorkerResourceManagementStrategy.3
                    public String apply(Worker worker) {
                        return worker.getIp();
                    }
                }));
                if (newArrayList.isEmpty()) {
                    log.debug("Found no lazy workers", new Object[0]);
                } else {
                    log.info("Terminating %,d lazy workers: %s", new Object[]{Integer.valueOf(newArrayList.size()), Joiner.on(", ").join(newArrayList)});
                    AutoScalingData terminate = workerBehaviorConfig.getAutoScaler().terminate(newArrayList);
                    if (terminate != null) {
                        this.currentlyTerminating.addAll(terminate.getNodeIds());
                        this.lastTerminateTime = new DateTime();
                        this.scalingStats.addTerminateEvent(terminate);
                        z = true;
                    }
                }
            } else {
                Duration duration = new Duration(this.lastTerminateTime, new DateTime());
                log.info("%s terminating. Current wait time: %s", new Object[]{this.currentlyTerminating, duration});
                if (duration.isLongerThan(this.config.getMaxScalingDuration().toStandardDuration())) {
                    log.makeAlert("Worker node termination taking too long!", new Object[0]).addData("millisSinceLastTerminate", Long.valueOf(duration.getMillis())).addData("terminatingCount", Integer.valueOf(this.currentlyTerminating.size())).emit();
                    this.currentlyTerminating.clear();
                }
            }
            return z;
        }
    }

    private int maxWorkersToTerminate(Collection<ImmutableWorkerInfo> collection, WorkerBehaviorConfig workerBehaviorConfig) {
        int size = Collections2.filter(collection, ResourceManagementUtil.createValidWorkerPredicate(this.config)).size();
        return (collection.size() - size) + Math.max(0, Math.min(this.config.getMaxScalingStep(), size - workerBehaviorConfig.getAutoScaler().getMinNumWorkers()));
    }

    @Override // io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy
    public ScalingStats getStats() {
        return this.scalingStats;
    }

    private static int getExpectedWorkerCapacity(Collection<ImmutableWorkerInfo> collection) {
        if (collection.size() == 0) {
            return 1;
        }
        return collection.iterator().next().getWorker().getCapacity();
    }

    private static ImmutableWorkerInfo workerWithTask(ImmutableWorkerInfo immutableWorkerInfo, Task task) {
        return new ImmutableWorkerInfo(immutableWorkerInfo.getWorker(), immutableWorkerInfo.getCurrCapacityUsed() + 1, Sets.union(immutableWorkerInfo.getAvailabilityGroups(), Sets.newHashSet(new String[]{task.getTaskResource().getAvailabilityGroup()})), Sets.union(immutableWorkerInfo.getRunningTasks(), Sets.newHashSet(new String[]{task.getId()})), DateTime.now());
    }

    private static ImmutableWorkerInfo createDummyWorker(String str, int i, String str2) {
        return new ImmutableWorkerInfo(new Worker(str, "-2", i, str2), 0, Sets.newHashSet(), Sets.newHashSet(), DateTime.now());
    }
}
