package io.mantisrx.server.worker.jobmaster;

import io.mantisrx.runtime.descriptor.StageScalingPolicy;
import io.mantisrx.server.core.ServiceRegistry;
import io.mantisrx.server.core.WorkerAssignments;
import io.mantisrx.server.core.WorkerOutlier;
import io.mantisrx.server.master.client.MantisMasterClientApi;
import io.mantisrx.server.worker.jobmaster.JobAutoScaler;
import io.mantisrx.shaded.com.google.common.cache.Cache;
import io.mantisrx.shaded.com.google.common.cache.CacheBuilder;
import io.reactivx.mantis.operators.DropOperator;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.observers.SerializedObserver;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/mantisrx/server/worker/jobmaster/WorkerMetricHandler.class */
public class WorkerMetricHandler {
    private static final Logger logger = LoggerFactory.getLogger(WorkerMetricHandler.class);
    private final Observer<JobAutoScaler.Event> jobAutoScaleObserver;
    private final MantisMasterClientApi masterClientApi;
    private final AutoScaleMetricsConfig autoScaleMetricsConfig;
    private final MetricAggregator metricAggregator;
    private final String jobId;
    private final PublishSubject<MetricData> metricDataSubject = PublishSubject.create();
    private final Map<Integer, Integer> numWorkersByStage = new HashMap();
    private final Func1<Integer, Integer> lookupNumWorkersByStage = num -> {
        if (this.numWorkersByStage.containsKey(num)) {
            return this.numWorkersByStage.get(num);
        }
        logger.warn("num workers for stage {} not known", num);
        return -1;
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/mantisrx/server/worker/jobmaster/WorkerMetricHandler$StageMetricDataOperator.class */
    public class StageMetricDataOperator implements Observable.Operator<Object, MetricData> {
        private static final int killCooldownSecs = 600;
        private final int stage;
        private final Func1<Integer, Integer> numStageWorkersFn;
        private final AutoScaleMetricsConfig autoScaleMetricsConfig;
        private final WorkerOutlier workerOutlier;
        private static final int metricsIntervalSeconds = 30;
        private final int valuesToKeep = 2;
        private final ConcurrentMap<Integer, WorkerMetrics> workersMap = new ConcurrentHashMap();
        private final ConcurrentMap<String, WorkerMetrics> sourceJobWorkersMap = new ConcurrentHashMap();
        private final Cache<String, String> sourceJobMetricsRecent = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).build();
        private final Map<Integer, Integer> workerNumberByIndex = new HashMap();

        public StageMetricDataOperator(int i, Func1<Integer, Integer> func1, AutoScaleMetricsConfig autoScaleMetricsConfig) {
            WorkerMetricHandler.logger.debug("setting operator for stage " + i);
            this.stage = i;
            this.numStageWorkersFn = func1;
            this.autoScaleMetricsConfig = autoScaleMetricsConfig;
            this.workerOutlier = new WorkerOutlier(600L, num -> {
                try {
                    if (!this.workerNumberByIndex.containsKey(num)) {
                        WorkerMetricHandler.logger.error("outlier resubmit FAILED. worker number not found for worker index {} stage {}", num, Integer.valueOf(i));
                        return;
                    }
                    int intValue = this.workerNumberByIndex.get(num).intValue();
                    if (resubmitOutlierWorkerEnabled()) {
                        WorkerMetricHandler.logger.info("resubmitting worker job {} stage {} idx {} workerNum {} (dropping excessive data compared to others)", new Object[]{WorkerMetricHandler.this.jobId, Integer.valueOf(i), num, Integer.valueOf(intValue)});
                        WorkerMetricHandler.this.masterClientApi.resubmitJobWorker(WorkerMetricHandler.this.jobId, "JobMaster", intValue, "dropping excessive data compared to others in stage").onErrorResumeNext(th -> {
                            WorkerMetricHandler.logger.error("caught error ({}) when resubmitting outlier worker num {}", th.getMessage(), Integer.valueOf(intValue));
                            return Observable.empty();
                        }).subscribe();
                    } else {
                        WorkerMetricHandler.logger.info("resubmitOutlier property is disabled. Not killing worker job {} stage {} idx {} workerNum {} (dropping excessive data compared to others)", new Object[]{WorkerMetricHandler.this.jobId, Integer.valueOf(i), num, Integer.valueOf(intValue)});
                    }
                } catch (Exception e) {
                    WorkerMetricHandler.logger.warn("Can't resubmit outlier worker idx {} error {}", new Object[]{num, e.getMessage(), e});
                }
            });
        }

        private boolean resubmitOutlierWorkerEnabled() {
            return Boolean.valueOf(ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantis.worker.jobmaster.outlier.worker.resubmit", "true")).booleanValue();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void addDataPoint(MetricData metricData) {
            int workerIndex = metricData.getWorkerIndex();
            WorkerMetricHandler.logger.debug("adding data point for worker idx={} data={}", Integer.valueOf(workerIndex), metricData);
            WorkerMetrics workerMetrics = this.workersMap.get(Integer.valueOf(workerIndex));
            if (workerMetrics == null) {
                workerMetrics = new WorkerMetrics(2);
                this.workersMap.put(Integer.valueOf(workerIndex), workerMetrics);
            }
            MetricData addDataPoint = workerMetrics.addDataPoint(metricData.getMetricGroupName(), metricData);
            if (addDataPoint.getMetricGroupName().equals("DataDrop")) {
                Map<String, Double> gauges = addDataPoint.getGaugeData().getGauges();
                if (gauges.containsKey("dropPercent")) {
                    this.workerOutlier.addDataPoint(workerIndex, gauges.get("dropPercent").doubleValue(), ((Integer) this.numStageWorkersFn.call(Integer.valueOf(this.stage))).intValue());
                }
            }
            this.workerNumberByIndex.put(Integer.valueOf(workerIndex), Integer.valueOf(metricData.getWorkerNumber()));
            int i = 0;
            synchronized (this.workersMap) {
                Iterator<Integer> it = this.workersMap.keySet().iterator();
                while (it.hasNext()) {
                    i = Math.max(i, it.next().intValue());
                }
            }
            Integer num = (Integer) this.numStageWorkersFn.call(Integer.valueOf(this.stage));
            if (num.intValue() > -1) {
                for (int intValue = num.intValue(); intValue <= i; intValue++) {
                    this.workersMap.remove(Integer.valueOf(intValue));
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void addSourceJobDataPoint(MetricData metricData) {
            String str = metricData.getJobId() + ":" + metricData.getWorkerIndex();
            WorkerMetrics workerMetrics = this.sourceJobWorkersMap.get(str);
            if (workerMetrics == null) {
                workerMetrics = new WorkerMetrics(2);
                this.sourceJobWorkersMap.put(str, workerMetrics);
            }
            workerMetrics.addDataPoint(metricData.getMetricGroupName(), metricData);
            String str2 = str + ":" + metricData.getMetricGroupName();
            this.sourceJobMetricsRecent.put(str2, str2);
        }

        public Subscriber<? super MetricData> call(final Subscriber<? super Object> subscriber) {
            subscriber.add(Schedulers.computation().createWorker().schedulePeriodically(new Action0() { // from class: io.mantisrx.server.worker.jobmaster.WorkerMetricHandler.StageMetricDataOperator.1
                public void call() {
                    ArrayList arrayList = new ArrayList();
                    synchronized (StageMetricDataOperator.this.workersMap) {
                        Iterator it = StageMetricDataOperator.this.workersMap.entrySet().iterator();
                        while (it.hasNext()) {
                            arrayList.add(WorkerMetricHandler.this.metricAggregator.getAggregates(((WorkerMetrics) ((Map.Entry) it.next()).getValue()).getGaugesByMetricGrp()));
                        }
                    }
                    int intValue = ((Integer) StageMetricDataOperator.this.numStageWorkersFn.call(Integer.valueOf(StageMetricDataOperator.this.stage))).intValue();
                    Map aggregates = WorkerMetricHandler.this.getAggregates(arrayList);
                    WorkerMetricHandler.logger.info("Job stage " + StageMetricDataOperator.this.stage + " avgResUsage from " + StageMetricDataOperator.this.workersMap.size() + " workers: " + aggregates.toString());
                    for (Map.Entry<String, Set<String>> entry : StageMetricDataOperator.this.autoScaleMetricsConfig.getUserDefinedMetrics().entrySet()) {
                        String key = entry.getKey();
                        for (String str : entry.getValue()) {
                            if (aggregates.containsKey(key) && ((GaugeData) aggregates.get(key)).getGauges().containsKey(str)) {
                                WorkerMetricHandler.this.jobAutoScaleObserver.onNext(new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.UserDefined, StageMetricDataOperator.this.stage, ((GaugeData) aggregates.get(key)).getGauges().get(str).doubleValue(), intValue, ""));
                            } else {
                                WorkerMetricHandler.logger.debug("no gauge data found for UserDefined (metric={})", entry);
                            }
                        }
                    }
                    if (aggregates.containsKey("consumer-fetch-manager-metrics")) {
                        Map<String, Double> gauges = ((GaugeData) aggregates.get("consumer-fetch-manager-metrics")).getGauges();
                        if (gauges.containsKey("records-lag-max")) {
                            WorkerMetricHandler.this.jobAutoScaleObserver.onNext(new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.KafkaLag, StageMetricDataOperator.this.stage, gauges.get("records-lag-max").doubleValue(), intValue, ""));
                        }
                        if (gauges.containsKey("records-consumed-rate")) {
                            WorkerMetricHandler.this.jobAutoScaleObserver.onNext(new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.KafkaProcessed, StageMetricDataOperator.this.stage, gauges.get("records-consumed-rate").doubleValue(), intValue, ""));
                        }
                    }
                    if (aggregates.containsKey("ResourceUsage")) {
                        WorkerMetricHandler.this.jobAutoScaleObserver.onNext(new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.CPU, StageMetricDataOperator.this.stage, ((GaugeData) aggregates.get("ResourceUsage")).getGauges().get("cpuPctUsageCurr").doubleValue() / 100.0d, intValue, ""));
                        WorkerMetricHandler.this.jobAutoScaleObserver.onNext(new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.Memory, StageMetricDataOperator.this.stage, ((GaugeData) aggregates.get("ResourceUsage")).getGauges().get("totMemUsageCurr").doubleValue(), intValue, ""));
                        WorkerMetricHandler.this.jobAutoScaleObserver.onNext(new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.Network, StageMetricDataOperator.this.stage, ((GaugeData) aggregates.get("ResourceUsage")).getGauges().get("nwBytesUsageCurr").doubleValue(), intValue, ""));
                        WorkerMetricHandler.this.jobAutoScaleObserver.onNext(new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.JVMMemory, StageMetricDataOperator.this.stage, ((GaugeData) aggregates.get("ResourceUsage")).getGauges().get("jvmMemoryUsedBytes").doubleValue(), intValue, ""));
                    }
                    if (aggregates.containsKey("DataDrop")) {
                        Map<String, Double> gauges2 = ((GaugeData) aggregates.get("DataDrop")).getGauges();
                        if (gauges2.containsKey("dropPercent")) {
                            WorkerMetricHandler.this.jobAutoScaleObserver.onNext(new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.DataDrop, StageMetricDataOperator.this.stage, gauges2.get("dropPercent").doubleValue(), intValue, ""));
                        }
                    }
                    if (aggregates.containsKey("worker_stage_inner_input")) {
                        Map<String, Double> gauges3 = ((GaugeData) aggregates.get("worker_stage_inner_input")).getGauges();
                        if (gauges3.containsKey("onNextGauge")) {
                            WorkerMetricHandler.this.jobAutoScaleObserver.onNext(new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.RPS, StageMetricDataOperator.this.stage, gauges3.get("onNextGauge").doubleValue() / 6.0d, intValue, ""));
                        }
                    }
                    double d = 0.0d;
                    boolean z = false;
                    ConcurrentMap asMap = StageMetricDataOperator.this.sourceJobMetricsRecent.asMap();
                    for (Map.Entry entry2 : StageMetricDataOperator.this.sourceJobWorkersMap.entrySet()) {
                        for (Map.Entry<String, GaugeData> entry3 : WorkerMetricHandler.this.metricAggregator.getAggregates(((WorkerMetrics) entry2.getValue()).getGaugesByMetricGrp()).entrySet()) {
                            String str2 = ((String) entry2.getKey()) + ":" + entry3.getKey();
                            for (Map.Entry<String, Double> entry4 : entry3.getValue().getGauges().entrySet()) {
                                if (asMap.containsKey(str2) && StageMetricDataOperator.this.autoScaleMetricsConfig.isSourceJobDropMetric(entry3.getKey(), entry4.getKey())) {
                                    d += entry4.getValue().doubleValue();
                                    z = true;
                                }
                            }
                        }
                    }
                    if (z) {
                        WorkerMetricHandler.logger.info("Job stage {}, source job drop metrics: {}", Integer.valueOf(StageMetricDataOperator.this.stage), Double.valueOf(d));
                        WorkerMetricHandler.this.jobAutoScaleObserver.onNext(new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.SourceJobDrop, StageMetricDataOperator.this.stage, (d / 6.0d) / intValue, intValue, ""));
                    }
                }
            }, 30L, 30L, TimeUnit.SECONDS));
            return new Subscriber<MetricData>() { // from class: io.mantisrx.server.worker.jobmaster.WorkerMetricHandler.StageMetricDataOperator.2
                public void onCompleted() {
                    subscriber.unsubscribe();
                }

                public void onError(Throwable th) {
                    WorkerMetricHandler.logger.error("Unexpected error: " + th.getMessage(), th);
                }

                public void onNext(MetricData metricData) {
                    WorkerMetricHandler.logger.debug("Got metric metricData for job " + WorkerMetricHandler.this.jobId + " stage " + StageMetricDataOperator.this.stage + ", worker " + metricData.getWorkerNumber() + ": " + metricData);
                    if (metricData.getJobId() == WorkerMetricHandler.this.jobId) {
                        StageMetricDataOperator.this.addDataPoint(metricData);
                    } else {
                        StageMetricDataOperator.this.addSourceJobDataPoint(metricData);
                    }
                }
            };
        }
    }

    public WorkerMetricHandler(String str, Observer<JobAutoScaler.Event> observer, MantisMasterClientApi mantisMasterClientApi, AutoScaleMetricsConfig autoScaleMetricsConfig) {
        this.jobId = str;
        this.jobAutoScaleObserver = observer;
        this.masterClientApi = mantisMasterClientApi;
        this.autoScaleMetricsConfig = autoScaleMetricsConfig;
        this.metricAggregator = new MetricAggregator(autoScaleMetricsConfig);
    }

    public Observer<MetricData> initAndGetMetricDataObserver() {
        start();
        return new SerializedObserver(this.metricDataSubject);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Map<String, GaugeData> getAggregates(List<Map<String, GaugeData>> list) {
        HashMap hashMap = new HashMap();
        Iterator<Map<String, GaugeData>> it = list.iterator();
        while (it.hasNext()) {
            for (Map.Entry<String, GaugeData> entry : it.next().entrySet()) {
                if (!hashMap.containsKey(entry.getKey())) {
                    hashMap.put(entry.getKey(), new ArrayList());
                }
                ((List) hashMap.get(entry.getKey())).add(entry.getValue());
            }
        }
        return this.metricAggregator.getAggregates(hashMap);
    }

    private void start() {
        AtomicReference atomicReference = new AtomicReference(new ArrayList());
        this.masterClientApi.schedulingChanges(this.jobId).doOnNext(jobSchedulingInfo -> {
            Iterator it = jobSchedulingInfo.getWorkerAssignments().entrySet().iterator();
            while (it.hasNext()) {
                WorkerAssignments workerAssignments = (WorkerAssignments) ((Map.Entry) it.next()).getValue();
                logger.debug("setting numWorkers={} for stage={}", Integer.valueOf(workerAssignments.getNumWorkers()), Integer.valueOf(workerAssignments.getStage()));
                this.numWorkersByStage.put(Integer.valueOf(workerAssignments.getStage()), Integer.valueOf(workerAssignments.getNumWorkers()));
            }
        }).subscribe();
        logger.info("Starting worker metric handler with autoscale config {}", this.autoScaleMetricsConfig);
        this.metricDataSubject.groupBy(metricData -> {
            return Integer.valueOf(metricData.getStage());
        }).lift(new DropOperator(WorkerMetricHandler.class.getName())).doOnNext(groupedObservable -> {
            Integer num = (Integer) groupedObservable.getKey();
            Subscription subscribe = groupedObservable.lift(new StageMetricDataOperator(num.intValue(), this.lookupNumWorkersByStage, this.autoScaleMetricsConfig)).subscribe();
            logger.info("adding subscription for stage {} StageMetricDataOperator", num);
            ((List) atomicReference.get()).add(subscribe);
        }).doOnUnsubscribe(() -> {
            Iterator it = ((List) atomicReference.get()).iterator();
            while (it.hasNext()) {
                ((Subscription) it.next()).unsubscribe();
            }
        }).subscribe();
    }
}
