package net.snowflake.ingest.internal.net.snowflake.client.jdbc.internal.apache.tika.pipes.async;

import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import net.snowflake.ingest.internal.net.snowflake.client.jdbc.internal.apache.tika.log.TikaLogger;
import net.snowflake.ingest.internal.net.snowflake.client.jdbc.internal.apache.tika.log.TikaLoggerFactory;
import net.snowflake.ingest.internal.net.snowflake.client.jdbc.internal.apache.tika.pipes.emitter.EmitData;
import net.snowflake.ingest.internal.net.snowflake.client.jdbc.internal.apache.tika.pipes.emitter.Emitter;
import net.snowflake.ingest.internal.net.snowflake.client.jdbc.internal.apache.tika.pipes.emitter.EmitterManager;
import net.snowflake.ingest.internal.net.snowflake.client.jdbc.internal.apache.tika.pipes.emitter.TikaEmitterException;
import net.snowflake.ingest.internal.net.snowflake.client.jdbc.internal.apache.tika.utils.ExceptionUtils;

/* loaded from: input_file:net/snowflake/ingest/internal/net/snowflake/client/jdbc/internal/apache/tika/pipes/async/AsyncEmitter.class */
public class AsyncEmitter implements Callable<Integer> {
    static final int EMITTER_FUTURE_CODE = 2;
    private final AsyncConfig asyncConfig;
    private final EmitterManager emitterManager;
    private final ArrayBlockingQueue<EmitData> emitDataQueue;
    Instant lastEmitted = Instant.now();
    static final EmitData EMIT_DATA_STOP_SEMAPHORE = new EmitData(null, null);
    private static final TikaLogger LOG = TikaLoggerFactory.getLogger(AsyncEmitter.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/snowflake/ingest/internal/net/snowflake/client/jdbc/internal/apache/tika/pipes/async/AsyncEmitter$EmitDataCache.class */
    public class EmitDataCache {
        private final long maxBytes;
        long estimatedSize = 0;
        int size = 0;
        Map<String, List<EmitData>> map = new HashMap();

        public EmitDataCache(long j) {
            this.maxBytes = j;
        }

        void updateEstimatedSize(long j) {
            this.estimatedSize += j;
        }

        void add(EmitData emitData) {
            this.size++;
            long estimatedSizeBytes = emitData.getEstimatedSizeBytes();
            if (this.estimatedSize + estimatedSizeBytes > this.maxBytes) {
                AsyncEmitter.LOG.debug("estimated size ({}) > maxBytes({}), going to emitAll", Long.valueOf(this.estimatedSize + estimatedSizeBytes), Long.valueOf(this.maxBytes));
                emitAll();
            }
            List<EmitData> computeIfAbsent = this.map.computeIfAbsent(emitData.getEmitKey().getEmitterName(), str -> {
                return new ArrayList();
            });
            updateEstimatedSize(estimatedSizeBytes);
            computeIfAbsent.add(emitData);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void emitAll() {
            int i = 0;
            AsyncEmitter.LOG.debug("about to emit {} files, {} estimated bytes", Integer.valueOf(this.size), Long.valueOf(this.estimatedSize));
            for (Map.Entry<String, List<EmitData>> entry : this.map.entrySet()) {
                tryToEmit(AsyncEmitter.this.emitterManager.getEmitter(entry.getKey()), entry.getValue());
                i += entry.getValue().size();
            }
            AsyncEmitter.LOG.debug("emitted: {} files", Integer.valueOf(i));
            this.estimatedSize = 0L;
            this.size = 0;
            this.map.clear();
            AsyncEmitter.this.lastEmitted = Instant.now();
        }

        private void tryToEmit(Emitter emitter, List<EmitData> list) {
            try {
                emitter.emit(list);
            } catch (IOException | TikaEmitterException e) {
                AsyncEmitter.LOG.warn("emitter class ({}): {}", emitter.getClass(), ExceptionUtils.getStackTrace(e));
            }
        }
    }

    public AsyncEmitter(AsyncConfig asyncConfig, ArrayBlockingQueue<EmitData> arrayBlockingQueue, EmitterManager emitterManager) {
        this.asyncConfig = asyncConfig;
        this.emitDataQueue = arrayBlockingQueue;
        this.emitterManager = emitterManager;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public Integer call() throws Exception {
        EmitDataCache emitDataCache = new EmitDataCache(this.asyncConfig.getEmitMaxEstimatedBytes());
        while (true) {
            EmitData poll = this.emitDataQueue.poll(500L, TimeUnit.MILLISECONDS);
            if (poll == EMIT_DATA_STOP_SEMAPHORE) {
                emitDataCache.emitAll();
                return 2;
            }
            if (poll != null) {
                emitDataCache.add(poll);
            } else {
                LOG.trace("Nothing on the async queue");
            }
            LOG.debug("cache size: ({}) bytes and extract count: {}", Long.valueOf(emitDataCache.estimatedSize), Integer.valueOf(emitDataCache.size));
            long between = ChronoUnit.MILLIS.between(this.lastEmitted, Instant.now());
            if (between > this.asyncConfig.getEmitWithinMillis()) {
                LOG.debug("{} elapsed > {}, going to emitAll", Long.valueOf(between), Long.valueOf(this.asyncConfig.getEmitWithinMillis()));
                emitDataCache.emitAll();
            }
        }
    }
}
