package com.datarobot.mlops.channel;

import com.datarobot.mlops.MLOpsStatsInternal;
import com.datarobot.mlops.common.config.Config;
import com.datarobot.mlops.common.constants.ConfigConstants;
import com.datarobot.mlops.common.enums.DataFormat;
import com.datarobot.mlops.common.exceptions.DRCommonException;
import com.datarobot.mlops.common.exceptions.DRInvalidValue;
import com.datarobot.mlops.common.records.Record;
import com.datarobot.mlops.metrics.Metric;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.TimerTask;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datarobot/mlops/channel/OutputChannelQueueAsync.class */
public class OutputChannelQueueAsync extends OutputChannelQueue {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) OutputChannelQueueAsync.class);
    private final int DEFAULT_REPORT_QUEUE_LIST_SIZE = 100000;
    private final int DEFAULT_REPORT_QUEUE_NUM_LISTS = 10;
    private final int DEFAULT_TIMEOUT_PROCESS_QUEUE_MS = 1000;
    private final int TIMEOUT_FETCH_ACTIVE_LIST_MSEC = 100;
    private final int REPORT_TRIGGER_DELAY_MSEC = 1000;
    private final int REPORT_TRIGGER_PERIOD_MSEC = 1000;
    private Worker worker;
    private Thread workerTh;
    private ArrayBlockingQueue<ArrayList<Metric>> readyQueue;
    private ArrayBlockingQueue<ArrayList<Metric>> workerQueue;
    private ArrayList<Metric> userActiveList;
    private int numMetricsPerList;
    private int timeoutProcessQueueMs;
    private long lastOfferTime;
    private long losses;
    private TimerTask reportTriggeringRepeatedTask;
    private ScheduledExecutorService reportTriggeringExec;

    /* loaded from: input_file:com/datarobot/mlops/channel/OutputChannelQueueAsync$Worker.class */
    private class Worker implements Runnable {
        private OutputChannelQueue outputChannelQueue;
        private ArrayBlockingQueue<ArrayList<Metric>> workerQueue;
        private ArrayBlockingQueue<ArrayList<Metric>> readyQueue;
        private final Logger logger = LoggerFactory.getLogger((Class<?>) Worker.class);
        private boolean stop = false;

        Worker(OutputChannelQueue outputChannelQueue, ArrayBlockingQueue<ArrayList<Metric>> arrayBlockingQueue, ArrayBlockingQueue<ArrayList<Metric>> arrayBlockingQueue2) {
            this.outputChannelQueue = outputChannelQueue;
            this.workerQueue = arrayBlockingQueue;
            this.readyQueue = arrayBlockingQueue2;
        }

        void stop() {
            this.stop = true;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    ArrayList<Metric> poll = this.workerQueue.poll(1000L, TimeUnit.MILLISECONDS);
                    if (!this.stop && poll == null) {
                    }
                    while (poll != null) {
                        Iterator<Metric> it = poll.iterator();
                        while (it.hasNext()) {
                            this.outputChannelQueue.pushMetric(it.next());
                        }
                        poll.clear();
                        this.readyQueue.add(poll);
                        poll = this.workerQueue.poll();
                    }
                    if (this.stop) {
                        return;
                    }
                } catch (DRCommonException | InterruptedException e) {
                    this.logger.error(e.getMessage());
                    return;
                }
            }
        }
    }

    public OutputChannelQueueAsync(String str, DataFormat dataFormat, IOutputChannel<Record> iOutputChannel) throws DRCommonException {
        super(str, dataFormat, iOutputChannel);
        this.DEFAULT_REPORT_QUEUE_LIST_SIZE = 100000;
        this.DEFAULT_REPORT_QUEUE_NUM_LISTS = 10;
        this.DEFAULT_TIMEOUT_PROCESS_QUEUE_MS = 1000;
        this.TIMEOUT_FETCH_ACTIVE_LIST_MSEC = 100;
        this.REPORT_TRIGGER_DELAY_MSEC = 1000;
        this.REPORT_TRIGGER_PERIOD_MSEC = 1000;
        this.userActiveList = null;
        this.lastOfferTime = 0L;
        this.losses = 0L;
        this.numMetricsPerList = Config.getInstance().getIntValueWithDefault(ConfigConstants.REPORT_QUEUE_LIST_SIZE, 100000);
        if (this.numMetricsPerList < 1) {
            throw new DRInvalidValue("List size in report queue should be greater then 1");
        }
        int intValueWithDefault = Config.getInstance().getIntValueWithDefault(ConfigConstants.REPORT_QUEUE_NUM_LISTS, 10);
        if (intValueWithDefault < 1) {
            throw new DRInvalidValue("Number of lists in report queue should be greater then 1");
        }
        this.timeoutProcessQueueMs = Config.getInstance().getIntValueWithDefault(ConfigConstants.TIMEOUT_PROCESS_QUEUE_MS, 1000);
        this.workerQueue = new ArrayBlockingQueue<>(intValueWithDefault);
        int i = intValueWithDefault + 1;
        this.readyQueue = new ArrayBlockingQueue<>(i);
        for (int i2 = 0; i2 < i; i2++) {
            try {
                this.readyQueue.put(new ArrayList<>(this.numMetricsPerList));
            } catch (InterruptedException e) {
                throw new DRCommonException(e.getMessage());
            }
        }
        this.userActiveList = this.readyQueue.take();
        this.worker = new Worker(this, this.workerQueue, this.readyQueue);
        this.workerTh = new Thread(this.worker);
        this.workerTh.start();
        setupReportTriggeringRepeatedTask();
    }

    @Override // com.datarobot.mlops.channel.OutputChannelQueue
    public synchronized long getLosses() {
        return this.losses;
    }

    @Override // com.datarobot.mlops.channel.OutputChannelQueue
    public synchronized void submit(Metric metric) throws DRCommonException {
        if (this.userActiveList == null) {
            ArrayList<Metric> poll = this.readyQueue.poll();
            this.userActiveList = poll;
            if (poll == null) {
                this.losses++;
                MLOpsStatsInternal.getInstance().incTotalBufferDrops();
                return;
            }
        }
        if (this.userActiveList.size() == this.numMetricsPerList) {
            offerBuffer();
        }
        this.userActiveList.add(metric);
        this.enqueued++;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized boolean offerNonEmptyBuffer() throws DRCommonException {
        if (this.userActiveList.size() <= 0) {
            return false;
        }
        offerBuffer();
        return true;
    }

    @Override // com.datarobot.mlops.channel.OutputChannelQueue
    public void shutdown() throws DRCommonException {
        offerNonEmptyBuffer();
        this.reportTriggeringExec.shutdown();
        this.reportTriggeringRepeatedTask.cancel();
        this.worker.stop();
        try {
            this.workerTh.join();
        } catch (InterruptedException e) {
            throw new DRCommonException(e.getMessage());
        }
    }

    private void setupReportTriggeringRepeatedTask() {
        this.reportTriggeringRepeatedTask = new TimerTask() { // from class: com.datarobot.mlops.channel.OutputChannelQueueAsync.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                if (System.currentTimeMillis() - OutputChannelQueueAsync.this.lastOfferTime > OutputChannelQueueAsync.this.timeoutProcessQueueMs) {
                    try {
                        OutputChannelQueueAsync.this.offerNonEmptyBuffer();
                    } catch (DRCommonException e) {
                        OutputChannelQueueAsync.logger.error(e.getMessage());
                    }
                }
            }
        };
        this.reportTriggeringExec = Executors.newSingleThreadScheduledExecutor();
        this.reportTriggeringExec.scheduleAtFixedRate(this.reportTriggeringRepeatedTask, 1000L, 1000L, TimeUnit.MILLISECONDS);
    }

    private void offerBuffer() throws DRCommonException {
        if (this.workerQueue.offer(this.userActiveList)) {
            try {
                this.userActiveList = this.readyQueue.poll(100L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new DRCommonException(e.getMessage());
            }
        } else {
            this.losses += this.userActiveList.size();
            MLOpsStatsInternal.getInstance().addTotalBufferDrops(this.userActiveList.size());
            this.userActiveList.clear();
        }
        this.lastOfferTime = System.currentTimeMillis();
    }
}
