package io.druid.indexing.overlord.autoscaling;

import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.collect.Collections2;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.emitter.EmittingLogger;
import io.druid.granularity.PeriodGranularity;
import io.druid.indexing.overlord.RemoteTaskRunner;
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.overlord.ZkWorker;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;

/* loaded from: input_file:io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategy.class */
public class SimpleResourceManagementStrategy implements ResourceManagementStrategy<RemoteTaskRunner> {
    private static final EmittingLogger log = new EmittingLogger(SimpleResourceManagementStrategy.class);
    private final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig;
    private final ScheduledExecutorService exec;
    private volatile boolean started;
    private final SimpleResourceManagementConfig config;
    private final Supplier<WorkerBehaviorConfig> workerConfigRef;
    private final ScalingStats scalingStats;
    private final Object lock;
    private final Set<String> currentlyProvisioning;
    private final Set<String> currentlyTerminating;
    private int targetWorkerCount;
    private DateTime lastProvisionTime;
    private DateTime lastTerminateTime;

    @Inject
    public SimpleResourceManagementStrategy(SimpleResourceManagementConfig simpleResourceManagementConfig, Supplier<WorkerBehaviorConfig> supplier, ResourceManagementSchedulerConfig resourceManagementSchedulerConfig, ScheduledExecutorFactory scheduledExecutorFactory) {
        this(simpleResourceManagementConfig, supplier, resourceManagementSchedulerConfig, scheduledExecutorFactory.create(1, "SimpleResourceManagement-manager--%d"));
    }

    public SimpleResourceManagementStrategy(SimpleResourceManagementConfig simpleResourceManagementConfig, Supplier<WorkerBehaviorConfig> supplier, ResourceManagementSchedulerConfig resourceManagementSchedulerConfig, ScheduledExecutorService scheduledExecutorService) {
        this.started = false;
        this.lock = new Object();
        this.currentlyProvisioning = Sets.newHashSet();
        this.currentlyTerminating = Sets.newHashSet();
        this.targetWorkerCount = -1;
        this.lastProvisionTime = new DateTime();
        this.lastTerminateTime = new DateTime();
        this.config = simpleResourceManagementConfig;
        this.workerConfigRef = supplier;
        this.scalingStats = new ScalingStats(simpleResourceManagementConfig.getNumEventsToTrack());
        this.resourceManagementSchedulerConfig = resourceManagementSchedulerConfig;
        this.exec = scheduledExecutorService;
    }

    boolean doProvision(RemoteTaskRunner remoteTaskRunner) {
        AutoScalingData provision;
        Collection<RemoteTaskRunnerWorkItem> pendingTasks = remoteTaskRunner.getPendingTasks();
        Collection<ZkWorker> workers = getWorkers(remoteTaskRunner);
        synchronized (this.lock) {
            boolean z = false;
            WorkerBehaviorConfig workerBehaviorConfig = (WorkerBehaviorConfig) this.workerConfigRef.get();
            if (workerBehaviorConfig == null || workerBehaviorConfig.getAutoScaler() == null) {
                log.warn("No workerConfig available, cannot provision new workers.", new Object[0]);
                return false;
            }
            int size = Collections2.filter(workers, createValidWorkerPredicate(this.config)).size();
            this.currentlyProvisioning.removeAll(workerBehaviorConfig.getAutoScaler().ipToIdLookup(Lists.newArrayList(Iterables.transform(workers, new Function<ZkWorker, String>() { // from class: io.druid.indexing.overlord.autoscaling.SimpleResourceManagementStrategy.1
                public String apply(ZkWorker zkWorker) {
                    return zkWorker.getWorker().getIp();
                }
            }))));
            updateTargetWorkerCount(workerBehaviorConfig, pendingTasks, workers);
            int size2 = this.targetWorkerCount - (size + this.currentlyProvisioning.size());
            while (size2 > 0 && (provision = workerBehaviorConfig.getAutoScaler().provision()) != null) {
                List<String> nodeIds = provision.getNodeIds();
                if (nodeIds.isEmpty()) {
                    break;
                }
                this.currentlyProvisioning.addAll(nodeIds);
                this.lastProvisionTime = new DateTime();
                this.scalingStats.addProvisionEvent(provision);
                size2 -= provision.getNodeIds().size();
                z = true;
            }
            if (!this.currentlyProvisioning.isEmpty()) {
                Duration duration = new Duration(this.lastProvisionTime, new DateTime());
                log.info("%s provisioning. Current wait time: %s", new Object[]{this.currentlyProvisioning, duration});
                if (duration.isLongerThan(this.config.getMaxScalingDuration().toStandardDuration())) {
                    log.makeAlert("Worker node provisioning taking too long!", new Object[0]).addData("millisSinceLastProvision", Long.valueOf(duration.getMillis())).addData("provisioningCount", Integer.valueOf(this.currentlyProvisioning.size())).emit();
                    workerBehaviorConfig.getAutoScaler().terminateWithIds(Lists.newArrayList(this.currentlyProvisioning));
                    this.currentlyProvisioning.clear();
                }
            }
            return z;
        }
    }

    boolean doTerminate(RemoteTaskRunner remoteTaskRunner) {
        Collection<RemoteTaskRunnerWorkItem> pendingTasks = remoteTaskRunner.getPendingTasks();
        synchronized (this.lock) {
            WorkerBehaviorConfig workerBehaviorConfig = (WorkerBehaviorConfig) this.workerConfigRef.get();
            if (workerBehaviorConfig == null) {
                log.warn("No workerConfig available, cannot terminate workers.", new Object[0]);
                return false;
            }
            boolean z = false;
            HashSet newHashSet = Sets.newHashSet(workerBehaviorConfig.getAutoScaler().ipToIdLookup(Lists.newArrayList(Iterables.transform(remoteTaskRunner.getLazyWorkers(), new Function<ZkWorker, String>() { // from class: io.druid.indexing.overlord.autoscaling.SimpleResourceManagementStrategy.2
                public String apply(ZkWorker zkWorker) {
                    return zkWorker.getWorker().getIp();
                }
            }))));
            HashSet newHashSet2 = Sets.newHashSet();
            for (String str : this.currentlyTerminating) {
                if (newHashSet.contains(str)) {
                    newHashSet2.add(str);
                }
            }
            this.currentlyTerminating.clear();
            this.currentlyTerminating.addAll(newHashSet2);
            Collection<ZkWorker> workers = getWorkers(remoteTaskRunner);
            updateTargetWorkerCount(workerBehaviorConfig, pendingTasks, workers);
            if (this.currentlyTerminating.isEmpty()) {
                int size = (workers.size() + this.currentlyProvisioning.size()) - this.targetWorkerCount;
                if (size > 0) {
                    List<String> transform = Lists.transform(remoteTaskRunner.markWorkersLazy(createLazyWorkerPredicate(this.config), size), new Function<ZkWorker, String>() { // from class: io.druid.indexing.overlord.autoscaling.SimpleResourceManagementStrategy.3
                        public String apply(ZkWorker zkWorker) {
                            return zkWorker.getWorker().getIp();
                        }
                    });
                    if (transform.isEmpty()) {
                        log.info("Wanted to terminate %,d workers, but couldn't find any lazy ones!", new Object[]{Integer.valueOf(size)});
                    } else {
                        log.info("Terminating %,d workers (wanted %,d): %s", new Object[]{Integer.valueOf(transform.size()), Integer.valueOf(size), Joiner.on(", ").join(transform)});
                        AutoScalingData terminate = workerBehaviorConfig.getAutoScaler().terminate(transform);
                        if (terminate != null) {
                            this.currentlyTerminating.addAll(terminate.getNodeIds());
                            this.lastTerminateTime = new DateTime();
                            this.scalingStats.addTerminateEvent(terminate);
                            z = true;
                        }
                    }
                }
            } else {
                Duration duration = new Duration(this.lastTerminateTime, new DateTime());
                log.info("%s terminating. Current wait time: %s", new Object[]{this.currentlyTerminating, duration});
                if (duration.isLongerThan(this.config.getMaxScalingDuration().toStandardDuration())) {
                    log.makeAlert("Worker node termination taking too long!", new Object[0]).addData("millisSinceLastTerminate", Long.valueOf(duration.getMillis())).addData("terminatingCount", Integer.valueOf(this.currentlyTerminating.size())).emit();
                    this.currentlyTerminating.clear();
                }
            }
            return z;
        }
    }

    @Override // io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy
    public void startManagement(final RemoteTaskRunner remoteTaskRunner) {
        synchronized (this.lock) {
            if (this.started) {
                return;
            }
            log.info("Started Resource Management Scheduler", new Object[0]);
            ScheduledExecutors.scheduleAtFixedRate(this.exec, this.resourceManagementSchedulerConfig.getProvisionPeriod().toStandardDuration(), new Runnable() { // from class: io.druid.indexing.overlord.autoscaling.SimpleResourceManagementStrategy.4
                @Override // java.lang.Runnable
                public void run() {
                    SimpleResourceManagementStrategy.this.doProvision(remoteTaskRunner);
                }
            });
            PeriodGranularity periodGranularity = new PeriodGranularity(this.resourceManagementSchedulerConfig.getTerminatePeriod(), this.resourceManagementSchedulerConfig.getOriginTime(), (DateTimeZone) null);
            ScheduledExecutors.scheduleAtFixedRate(this.exec, new Duration(System.currentTimeMillis(), periodGranularity.next(periodGranularity.truncate(new DateTime().getMillis()))), this.resourceManagementSchedulerConfig.getTerminatePeriod().toStandardDuration(), new Runnable() { // from class: io.druid.indexing.overlord.autoscaling.SimpleResourceManagementStrategy.5
                @Override // java.lang.Runnable
                public void run() {
                    SimpleResourceManagementStrategy.this.doTerminate(remoteTaskRunner);
                }
            });
            this.started = true;
        }
    }

    @Override // io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy
    public void stopManagement() {
        synchronized (this.lock) {
            if (this.started) {
                log.info("Stopping Resource Management Scheduler", new Object[0]);
                this.exec.shutdown();
                this.started = false;
            }
        }
    }

    @Override // io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy
    public ScalingStats getStats() {
        return this.scalingStats;
    }

    private static Predicate<ZkWorker> createLazyWorkerPredicate(final SimpleResourceManagementConfig simpleResourceManagementConfig) {
        final Predicate<ZkWorker> createValidWorkerPredicate = createValidWorkerPredicate(simpleResourceManagementConfig);
        return new Predicate<ZkWorker>() { // from class: io.druid.indexing.overlord.autoscaling.SimpleResourceManagementStrategy.6
            public boolean apply(ZkWorker zkWorker) {
                return (((System.currentTimeMillis() - zkWorker.getLastCompletedTaskTime().getMillis()) > SimpleResourceManagementConfig.this.getWorkerIdleTimeout().toStandardDuration().getMillis() ? 1 : ((System.currentTimeMillis() - zkWorker.getLastCompletedTaskTime().getMillis()) == SimpleResourceManagementConfig.this.getWorkerIdleTimeout().toStandardDuration().getMillis() ? 0 : -1)) >= 0) || !createValidWorkerPredicate.apply(zkWorker);
            }
        };
    }

    private static Predicate<ZkWorker> createValidWorkerPredicate(final SimpleResourceManagementConfig simpleResourceManagementConfig) {
        return new Predicate<ZkWorker>() { // from class: io.druid.indexing.overlord.autoscaling.SimpleResourceManagementStrategy.7
            public boolean apply(ZkWorker zkWorker) {
                String workerVersion = SimpleResourceManagementConfig.this.getWorkerVersion();
                if (workerVersion == null) {
                    throw new ISE("No minVersion found! It should be set in your runtime properties or configuration database.", new Object[0]);
                }
                return zkWorker.isValidVersion(workerVersion);
            }
        };
    }

    private void updateTargetWorkerCount(WorkerBehaviorConfig workerBehaviorConfig, Collection<? extends TaskRunnerWorkItem> collection, Collection<ZkWorker> collection2) {
        synchronized (this.lock) {
            Collection filter = Collections2.filter(collection2, createValidWorkerPredicate(this.config));
            Predicate<ZkWorker> createLazyWorkerPredicate = createLazyWorkerPredicate(this.config);
            int minNumWorkers = workerBehaviorConfig.getAutoScaler().getMinNumWorkers();
            int maxNumWorkers = workerBehaviorConfig.getAutoScaler().getMaxNumWorkers();
            if (minNumWorkers > maxNumWorkers) {
                log.error("Huh? minWorkerCount[%d] > maxWorkerCount[%d]. I give up!", new Object[]{Integer.valueOf(minNumWorkers), Integer.valueOf(maxNumWorkers)});
                return;
            }
            if (this.targetWorkerCount < 0) {
                this.targetWorkerCount = Math.max(Math.min(collection2.size(), maxNumWorkers), minNumWorkers);
                log.info("Starting with a target of %,d workers (current = %,d, min = %,d, max = %,d).", new Object[]{Integer.valueOf(this.targetWorkerCount), Integer.valueOf(filter.size()), Integer.valueOf(minNumWorkers), Integer.valueOf(maxNumWorkers)});
            }
            boolean z = this.currentlyProvisioning.isEmpty() && this.currentlyTerminating.isEmpty();
            boolean z2 = z && filter.size() >= this.targetWorkerCount && this.targetWorkerCount < maxNumWorkers && (hasTaskPendingBeyondThreshold(collection) || this.targetWorkerCount < minNumWorkers);
            boolean z3 = z && filter.size() == this.targetWorkerCount && this.targetWorkerCount > minNumWorkers && Iterables.any(filter, createLazyWorkerPredicate);
            if (z2) {
                this.targetWorkerCount = Math.max(this.targetWorkerCount + 1, minNumWorkers);
                log.info("I think we should scale up to %,d workers (current = %,d, min = %,d, max = %,d).", new Object[]{Integer.valueOf(this.targetWorkerCount), Integer.valueOf(filter.size()), Integer.valueOf(minNumWorkers), Integer.valueOf(maxNumWorkers)});
            } else if (z3) {
                this.targetWorkerCount = Math.min(this.targetWorkerCount - 1, maxNumWorkers);
                log.info("I think we should scale down to %,d workers (current = %,d, min = %,d, max = %,d).", new Object[]{Integer.valueOf(this.targetWorkerCount), Integer.valueOf(filter.size()), Integer.valueOf(minNumWorkers), Integer.valueOf(maxNumWorkers)});
            } else {
                log.info("Our target is %,d workers, and I'm okay with that (current = %,d, min = %,d, max = %,d).", new Object[]{Integer.valueOf(this.targetWorkerCount), Integer.valueOf(filter.size()), Integer.valueOf(minNumWorkers), Integer.valueOf(maxNumWorkers)});
            }
        }
    }

    private boolean hasTaskPendingBeyondThreshold(Collection<? extends TaskRunnerWorkItem> collection) {
        synchronized (this.lock) {
            long currentTimeMillis = System.currentTimeMillis();
            Iterator<? extends TaskRunnerWorkItem> it = collection.iterator();
            while (it.hasNext()) {
                Duration duration = new Duration(it.next().getQueueInsertionTime().getMillis(), currentTimeMillis);
                Duration standardDuration = this.config.getPendingTaskTimeout().toStandardDuration();
                if (duration.isEqual(standardDuration) || duration.isLongerThan(standardDuration)) {
                    return true;
                }
            }
            return false;
        }
    }

    public Collection<ZkWorker> getWorkers(RemoteTaskRunner remoteTaskRunner) {
        return remoteTaskRunner.getWorkers();
    }
}
