package com.datarobot.mlops.channel;

import com.datarobot.mlops.common.config.MappedConfig;
import com.datarobot.mlops.common.constants.ConfigConstants;
import com.datarobot.mlops.common.enums.DataFormat;
import com.datarobot.mlops.common.exceptions.DRCommonException;
import com.datarobot.mlops.common.exceptions.DRInvalidValue;
import com.datarobot.mlops.common.records.Record;
import com.datarobot.mlops.stats.MLOpsStatsInternal;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.TimerTask;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datarobot/mlops/channel/OutputChannelQueueAsync.class */
public class OutputChannelQueueAsync extends OutputChannelQueue {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) OutputChannelQueueAsync.class);
    public static final int REPORT_TRIGGER_PERIOD_MSEC = 1000;
    private static final int DEFAULT_REPORT_QUEUE_MAX_SIZE_BYTES = 536870912;
    private static final int DEFAULT_REPORT_QUEUE_LIST_SIZE = 10000;
    private static final int DEFAULT_REPORT_QUEUE_NUM_LISTS = 10;
    private static final int DEFAULT_TIMEOUT_PROCESS_QUEUE_MS = 10000;
    private static final int TIMEOUT_FETCH_ACTIVE_LIST_MSEC = 100;
    private static final int REPORT_TRIGGER_DELAY_MSEC = 1000;
    private ArrayBlockingQueue<List<Record>> readyQueue;
    private ArrayBlockingQueue<List<Record>> workerQueue;
    private List<Record> userActiveList;
    private long queueMaxSizeBytes;
    private AtomicLong currentQueueSize;
    private int numMetricsPerList;
    private int timeoutProcessQueueMs;
    private long lastOfferTime;
    private long losses;
    private TimerTask reportTriggeringRepeatedTask;
    private ScheduledExecutorService reportTriggeringExec;
    private AsyncWorkerExecutor asyncWorkerExecutor;
    private Thread asyncWorkerExecutorThread;

    public OutputChannelQueueAsync(MappedConfig mappedConfig, DataFormat dataFormat, IOutputChannel<Record> iOutputChannel) throws DRCommonException {
        super(dataFormat, iOutputChannel);
        this.lastOfferTime = 0L;
        this.losses = 0L;
        this.queueMaxSizeBytes = mappedConfig.getValueWithDefault(ConfigConstants.REPORT_QUEUE_MAX_SIZE_STR, 536870912);
        this.currentQueueSize = new AtomicLong(0L);
        if (this.queueMaxSizeBytes < 1 || this.queueMaxSizeBytes > Runtime.getRuntime().maxMemory()) {
            throw new DRInvalidValue(String.format("Invalid queue max size %d, values should be > 1 and less than %d", Long.valueOf(this.queueMaxSizeBytes), Long.valueOf(Runtime.getRuntime().maxMemory())));
        }
        this.numMetricsPerList = mappedConfig.getValueWithDefault(ConfigConstants.REPORT_QUEUE_LIST_SIZE_STR, 10000);
        if (this.numMetricsPerList < 1) {
            throw new DRInvalidValue("List size in report queue should be greater than 1");
        }
        int valueWithDefault = mappedConfig.getValueWithDefault(ConfigConstants.REPORT_QUEUE_NUM_LISTS_STR, 10);
        if (valueWithDefault < 1) {
            throw new DRInvalidValue("Number of lists in report queue should be greater than 1");
        }
        this.timeoutProcessQueueMs = mappedConfig.getValueWithDefault(ConfigConstants.TIMEOUT_PROCESS_QUEUE_MS_STR, 10000);
        this.workerQueue = new ArrayBlockingQueue<>(valueWithDefault);
        int i = valueWithDefault + 1;
        this.readyQueue = new ArrayBlockingQueue<>(i);
        for (int i2 = 0; i2 < i; i2++) {
            try {
                this.readyQueue.put(new ArrayList(this.numMetricsPerList));
            } catch (InterruptedException e) {
                throw new DRCommonException(e.getMessage());
            }
        }
        this.userActiveList = this.readyQueue.take();
        this.asyncWorkerExecutor = new AsyncWorkerExecutor(1 + valueWithDefault, 1, this.workerQueue, this.readyQueue, this);
        this.asyncWorkerExecutorThread = new Thread(this.asyncWorkerExecutor);
        this.asyncWorkerExecutorThread.start();
        logger.debug("Executor thread started");
        setupReportTriggeringRepeatedTask();
    }

    @Override // com.datarobot.mlops.channel.OutputChannelQueue
    public synchronized long getLosses() {
        return this.losses;
    }

    /* JADX WARN: Code restructure failed: missing block: B:6:0x0020, code lost:
    
        if (r1 == null) goto L8;
     */
    @Override // com.datarobot.mlops.channel.OutputChannelQueue
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public synchronized void submit(java.util.Collection<com.datarobot.mlops.common.records.Record> r7) throws com.datarobot.mlops.common.exceptions.DRCommonException {
        /*
            r6 = this;
            r0 = r6
            java.lang.Thread r0 = r0.asyncWorkerExecutorThread
            boolean r0 = r0.isAlive()
            if (r0 == 0) goto L23
            r0 = r6
            java.util.List<com.datarobot.mlops.common.records.Record> r0 = r0.userActiveList
            if (r0 != 0) goto L34
            r0 = r6
            r1 = r6
            java.util.concurrent.ArrayBlockingQueue<java.util.List<com.datarobot.mlops.common.records.Record>> r1 = r1.readyQueue
            java.lang.Object r1 = r1.poll()
            java.util.List r1 = (java.util.List) r1
            r2 = r1; r1 = r0; r0 = r2; 
            r1.userActiveList = r2
            if (r0 != 0) goto L34
        L23:
            r0 = r6
            r1 = r0
            long r1 = r1.losses
            r2 = 1
            long r1 = r1 + r2
            r0.losses = r1
            com.datarobot.mlops.stats.MLOpsStatsInternal r0 = com.datarobot.mlops.stats.MLOpsStatsInternal.getInstance()
            r0.incTotalBufferDrops()
            return
        L34:
            r0 = r7
            java.util.Iterator r0 = r0.iterator()
            r8 = r0
        L3b:
            r0 = r8
            boolean r0 = r0.hasNext()
            if (r0 == 0) goto L76
            r0 = r8
            java.lang.Object r0 = r0.next()
            com.datarobot.mlops.common.records.Record r0 = (com.datarobot.mlops.common.records.Record) r0
            r9 = r0
            r0 = r6
            java.util.List<com.datarobot.mlops.common.records.Record> r0 = r0.userActiveList
            r1 = r9
            boolean r0 = r0.add(r1)
            r0 = r6
            r1 = r0
            long r1 = r1.enqueued
            r2 = 1
            long r1 = r1 + r2
            r0.enqueued = r1
            r0 = r6
            java.util.concurrent.atomic.AtomicLong r0 = r0.currentQueueSize
            r1 = r9
            com.datarobot.mlops.common.records.RecordHeader r1 = r1.getHeader()
            int r1 = r1.getDataLen()
            long r1 = (long) r1
            long r0 = r0.addAndGet(r1)
            goto L3b
        L76:
            r0 = r6
            java.util.List<com.datarobot.mlops.common.records.Record> r0 = r0.userActiveList
            int r0 = r0.size()
            r1 = r6
            int r1 = r1.numMetricsPerList
            if (r0 != r1) goto L8a
            r0 = r6
            r0.offerBuffer()
        L8a:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: com.datarobot.mlops.channel.OutputChannelQueueAsync.submit(java.util.Collection):void");
    }

    @Override // com.datarobot.mlops.channel.OutputChannelQueue
    public boolean isMetricQueueFull() {
        return this.currentQueueSize.get() > this.queueMaxSizeBytes;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized boolean offerNonEmptyBuffer() throws DRCommonException {
        if (this.userActiveList == null || this.userActiveList.isEmpty()) {
            return false;
        }
        offerBuffer();
        return true;
    }

    @Override // com.datarobot.mlops.channel.OutputChannelQueue
    public void shutdown() throws DRCommonException {
        try {
            if (this.userActiveList != null && !this.userActiveList.isEmpty()) {
                this.workerQueue.put(this.userActiveList);
            }
            this.reportTriggeringExec.shutdown();
            this.reportTriggeringRepeatedTask.cancel();
            this.asyncWorkerExecutor.shutdown();
            this.asyncWorkerExecutorThread.join();
        } catch (InterruptedException e) {
            this.reportTriggeringExec.shutdownNow();
            throw new DRCommonException("Failed to shut down MLOps, " + e.getMessage());
        }
    }

    private void setupReportTriggeringRepeatedTask() {
        this.reportTriggeringRepeatedTask = new TimerTask() { // from class: com.datarobot.mlops.channel.OutputChannelQueueAsync.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                if (System.currentTimeMillis() - OutputChannelQueueAsync.this.lastOfferTime > OutputChannelQueueAsync.this.timeoutProcessQueueMs) {
                    try {
                        OutputChannelQueueAsync.this.offerNonEmptyBuffer();
                    } catch (DRCommonException e) {
                        OutputChannelQueueAsync.logger.error(e.getMessage());
                    }
                }
            }
        };
        this.reportTriggeringExec = Executors.newSingleThreadScheduledExecutor();
        this.reportTriggeringExec.scheduleAtFixedRate(this.reportTriggeringRepeatedTask, 1000L, 1000L, TimeUnit.MILLISECONDS);
    }

    private void offerBuffer() throws DRCommonException {
        if (this.workerQueue.offer(this.userActiveList)) {
            try {
                this.userActiveList = this.readyQueue.poll(100L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new DRCommonException(e.getMessage());
            }
        } else {
            this.losses += this.userActiveList.size();
            MLOpsStatsInternal.getInstance().addTotalBufferDrops(this.userActiveList.size());
            this.userActiveList.clear();
        }
        this.lastOfferTime = System.currentTimeMillis();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void pushRecords(Collection<Record> collection) throws DRCommonException {
        Iterator<Record> it2 = collection.iterator();
        while (it2.hasNext()) {
            this.currentQueueSize.addAndGet((-1) * it2.next().getHeader().getDataLen());
        }
        this.outputChannel.submit(collection);
        this.totalWritten += collection.size();
    }
}
