package net.snowflake.ingest.streaming.internal;

import com.sun.management.OperatingSystemMXBean;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.security.InvalidAlgorithmParameterException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.crypto.BadPaddingException;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.NoSuchPaddingException;
import net.snowflake.client.jdbc.internal.google.common.util.concurrent.ThreadFactoryBuilder;
import net.snowflake.ingest.internal.com.codahale.metrics.Timer;
import net.snowflake.ingest.internal.com.google.common.annotations.VisibleForTesting;
import net.snowflake.ingest.streaming.internal.BlobBuilder;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import net.snowflake.ingest.utils.Utils;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:net/snowflake/ingest/streaming/internal/FlushService.class */
public class FlushService<T> {
    private static final Logging logger = new Logging(FlushService.class);
    private final SnowflakeStreamingIngestClientInternal<T> owningClient;

    @VisibleForTesting
    ScheduledExecutorService flushWorker;

    @VisibleForTesting
    ExecutorService registerWorker;

    @VisibleForTesting
    ExecutorService buildUploadWorkers;
    private final ChannelCache<T> channelCache;
    private final IStorageManager storageManager;
    private final RegisterService<T> registerService;
    private final boolean isTestMode;
    private final Constants.BdecVersion bdecVersion;
    private volatile int numProcessors = Runtime.getRuntime().availableProcessors();

    @VisibleForTesting
    volatile boolean isNeedFlush = false;

    @VisibleForTesting
    volatile long lastFlushTime = System.currentTimeMillis();
    private final Map<String, Timer.Context> latencyTimerContextMap = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:net/snowflake/ingest/streaming/internal/FlushService$BlobData.class */
    public static class BlobData<T> {
        private final String path;
        private final List<List<ChannelData<T>>> data;

        BlobData(String str, List<List<ChannelData<T>>> list) {
            this.path = str;
            this.data = list;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public String getPath() {
            return this.path;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public List<List<ChannelData<T>>> getData() {
            return this.data;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FlushService(SnowflakeStreamingIngestClientInternal<T> snowflakeStreamingIngestClientInternal, ChannelCache<T> channelCache, IStorageManager iStorageManager, boolean z) {
        this.owningClient = snowflakeStreamingIngestClientInternal;
        this.channelCache = channelCache;
        this.storageManager = iStorageManager;
        this.registerService = new RegisterService<>(snowflakeStreamingIngestClientInternal, z);
        this.isTestMode = z;
        this.bdecVersion = this.owningClient.getParameterProvider().getBlobFormatVersion();
        createWorkers();
    }

    private CompletableFuture<Void> statsFuture() {
        return CompletableFuture.runAsync(() -> {
            if (this.owningClient.cpuHistogram != null) {
                this.owningClient.cpuHistogram.update((long) (ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class).getProcessCpuLoad() * 100.0d));
            }
        }, this.flushWorker);
    }

    private CompletableFuture<Void> distributeFlush(boolean z, Set<String> set, Long l) {
        return CompletableFuture.runAsync(() -> {
            logFlushTask(z, set, l.longValue());
            distributeFlushTasks(set);
            long currentTimeMillis = System.currentTimeMillis();
            this.lastFlushTime = currentTimeMillis;
            this.isNeedFlush = false;
            set.forEach(str -> {
                this.channelCache.setLastFlushTime(str, Long.valueOf(currentTimeMillis));
                this.channelCache.setNeedFlush(str, false);
            });
        }, this.flushWorker);
    }

    private void logFlushTask(boolean z, Set<String> set, long j) {
        boolean z2;
        if (this.owningClient.getParameterProvider().getMaxChunksInBlob() == 1) {
            Stream<String> stream = set.stream();
            ChannelCache<T> channelCache = this.channelCache;
            Objects.requireNonNull(channelCache);
            z2 = stream.anyMatch(channelCache::getNeedFlush);
        } else {
            z2 = this.isNeedFlush;
        }
        boolean z3 = z2;
        long currentTimeMillis = System.currentTimeMillis();
        String format = String.format("Submit forced or ad-hoc flush task on client=%s, isForce=%s, %s", this.owningClient.getName(), Boolean.valueOf(z), this.owningClient.getParameterProvider().getMaxChunksInBlob() == 1 ? String.format("Tables=[%s]", set.stream().map(str -> {
            return String.format("(name=%s, isNeedFlush=%s, timeDiffMillis=%s, currentDiffMillis=%s)", str, Boolean.valueOf(this.channelCache.getNeedFlush(str)), Long.valueOf(j - this.channelCache.getLastFlushTime(str).longValue()), Long.valueOf(currentTimeMillis - this.channelCache.getLastFlushTime(str).longValue()));
        }).collect(Collectors.joining(", "))) : String.format("isNeedFlush=%s, timeDiffMillis=%s, currentDiffMillis=%s", Boolean.valueOf(z3), Long.valueOf(j - this.lastFlushTime), Long.valueOf(currentTimeMillis - this.lastFlushTime)));
        if (logger.isTraceEnabled()) {
            logger.logTrace(format);
        }
        if (logger.isTraceEnabled()) {
            return;
        }
        if (z3 || z) {
            logger.logDebug(format);
        }
    }

    private CompletableFuture<Void> registerFuture() {
        return CompletableFuture.runAsync(() -> {
            this.registerService.registerBlobs(this.latencyTimerContextMap);
        }, this.registerWorker);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Void> flush(boolean z) {
        long currentTimeMillis = System.currentTimeMillis();
        long cachedMaxClientLagInMs = this.owningClient.getParameterProvider().getCachedMaxClientLagInMs();
        Set<String> keySet = this.owningClient.getParameterProvider().getMaxChunksInBlob() == 1 ? (Set) this.channelCache.keySet().stream().filter(str -> {
            return z || currentTimeMillis - this.channelCache.getLastFlushTime(str).longValue() >= cachedMaxClientLagInMs || this.channelCache.getNeedFlush(str);
        }).collect(Collectors.toSet()) : (z || (!isTestMode() && (this.isNeedFlush || currentTimeMillis - this.lastFlushTime >= cachedMaxClientLagInMs))) ? this.channelCache.keySet() : null;
        if (!z && (isTestMode() || keySet == null || keySet.isEmpty())) {
            return statsFuture();
        }
        Set<String> set = keySet;
        return statsFuture().thenCompose(r11 -> {
            return distributeFlush(z, set, Long.valueOf(currentTimeMillis));
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r3 -> {
            return registerFuture();
        });
    }

    private void createWorkers() {
        this.flushWorker = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("ingest-flush-thread").build());
        this.flushWorker.scheduleWithFixedDelay(() -> {
            try {
                flush(false).exceptionally(th -> {
                    String format = String.format("Background flush task failed, client=%s, exception=%s, detail=%s, trace=%s.", this.owningClient.getName(), th.getCause(), th.getCause().getMessage(), Utils.getStackTrace(th.getCause()));
                    logger.logError(format);
                    if (this.owningClient.getTelemetryService() == null) {
                        return null;
                    }
                    this.owningClient.getTelemetryService().reportClientFailure(getClass().getSimpleName(), format);
                    return null;
                });
            } catch (Exception e) {
                String format = String.format("Failed to schedule a flush task, client=%s, exception=%s, detail=%s, trace=%s.", this.owningClient.getName(), e.getClass().getName(), e.getMessage(), Utils.getStackTrace(e));
                logger.logError(format);
                if (this.owningClient.getTelemetryService() != null) {
                    this.owningClient.getTelemetryService().reportClientFailure(getClass().getSimpleName(), format);
                }
                throw e;
            }
        }, 0L, this.owningClient.getParameterProvider().getBufferFlushCheckIntervalInMs(), TimeUnit.MILLISECONDS);
        this.registerWorker = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("ingest-register-thread").build());
        ThreadFactory build = new ThreadFactoryBuilder().setNameFormat("ingest-build-upload-thread-%d").build();
        int min = Math.min(Runtime.getRuntime().availableProcessors() * (1 + this.owningClient.getParameterProvider().getIOTimeCpuRatio()), Integer.MAX_VALUE);
        this.buildUploadWorkers = Executors.newFixedThreadPool(min, build);
        logger.logInfo("Create {} threads for build/upload blobs for client={}, total available processors={}", Integer.valueOf(min), this.owningClient.getName(), Integer.valueOf(Runtime.getRuntime().availableProcessors()));
    }

    void distributeFlushTasks(Set<String> set) {
        Iterator<Map.Entry<String, ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>>>> it = this.channelCache.entrySet().stream().filter(entry -> {
            return set.contains(entry.getKey());
        }).iterator();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        this.numProcessors = Runtime.getRuntime().availableProcessors();
        while (true) {
            if (!it.hasNext() && arrayList2.isEmpty()) {
                this.registerService.addBlobs(arrayList);
                return;
            }
            ArrayList arrayList3 = new ArrayList();
            float f = 0.0f;
            while (true) {
                if (!it.hasNext() && arrayList2.isEmpty()) {
                    break;
                }
                List synchronizedList = Collections.synchronizedList(new ArrayList());
                if (!arrayList2.isEmpty()) {
                    synchronizedList.addAll(arrayList2);
                    arrayList2.clear();
                } else {
                    if (arrayList3.size() >= this.owningClient.getParameterProvider().getMaxChunksInBlob()) {
                        logger.logInfo("Max allowed number of chunks in the current blob reached. chunkCount={} maxChunkCount={}", Integer.valueOf(arrayList3.size()), Integer.valueOf(this.owningClient.getParameterProvider().getMaxChunksInBlob()));
                        break;
                    }
                    it.next().getValue().values().parallelStream().forEach(snowflakeStreamingIngestChannelInternal -> {
                        ChannelData<T> data;
                        if (!snowflakeStreamingIngestChannelInternal.isValid() || (data = snowflakeStreamingIngestChannelInternal.getData()) == null) {
                            return;
                        }
                        synchronizedList.add(data);
                    });
                }
                if (!synchronizedList.isEmpty()) {
                    int i = 0;
                    float f2 = 0.0f;
                    while (true) {
                        if (i >= synchronizedList.size()) {
                            break;
                        }
                        ChannelData<T> channelData = (ChannelData) synchronizedList.get(i);
                        if (i > 0 && shouldStopProcessing(f, f2, channelData, (ChannelData) synchronizedList.get(i - 1))) {
                            arrayList2.addAll(synchronizedList.subList(i, synchronizedList.size()));
                            logger.logInfo("Creation of another blob is needed because of blob/chunk size limit or different encryption ids or different schema, client={}, table={}, blobSize={}, chunkSize={}, nextChannelSize={}, encryptionId1={}, encryptionId2={}, schema1={}, schema2={}", this.owningClient.getName(), channelData.getChannelContext().getTableName(), Float.valueOf(f), Float.valueOf(f2), Float.valueOf(channelData.getBufferSize()), channelData.getChannelContext().getEncryptionKeyId(), ((ChannelData) synchronizedList.get(i - 1)).getChannelContext().getEncryptionKeyId(), channelData.getColumnEps().keySet(), ((ChannelData) synchronizedList.get(i - 1)).getColumnEps().keySet());
                            break;
                        } else {
                            f += channelData.getBufferSize();
                            f2 += channelData.getBufferSize();
                            i++;
                        }
                    }
                    arrayList3.add(synchronizedList.subList(0, i));
                    if (i != synchronizedList.size()) {
                        break;
                    }
                }
            }
            if (!arrayList3.isEmpty()) {
                String fullyQualifiedTableName = ((ChannelData) ((List) arrayList3.get(0)).get(0)).getChannelContext().getFullyQualifiedTableName();
                BlobPath generateBlobPath = this.storageManager.generateBlobPath(fullyQualifiedTableName);
                long currentTimeMillis = System.currentTimeMillis();
                if (this.owningClient.flushLatency != null) {
                    this.latencyTimerContextMap.putIfAbsent(generateBlobPath.fileRegistrationPath, this.owningClient.flushLatency.time());
                }
                ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
                this.owningClient.getEncryptionKeysPerTable().forEach((fullyQualifiedTableName2, encryptionKey) -> {
                    concurrentHashMap.put(fullyQualifiedTableName2, new EncryptionKey(encryptionKey));
                });
                arrayList.add(new Pair(new BlobData(generateBlobPath.fileRegistrationPath, arrayList3), CompletableFuture.supplyAsync(() -> {
                    try {
                        BlobMetadata buildAndUpload = buildAndUpload(generateBlobPath, arrayList3, fullyQualifiedTableName, concurrentHashMap);
                        buildAndUpload.getBlobStats().setFlushStartMs(currentTimeMillis);
                        return buildAndUpload;
                    } catch (Throwable th) {
                        Throwable cause = th.getCause() == null ? th : th.getCause();
                        String format = String.format("Building blob failed, client=%s, blob=%s, exception=%s, detail=%s, trace=%s, all channels in the blob will be invalidated", this.owningClient.getName(), generateBlobPath.fileRegistrationPath, cause, cause.getMessage(), Utils.getStackTrace(cause));
                        logger.logError(format);
                        if (this.owningClient.getTelemetryService() != null) {
                            this.owningClient.getTelemetryService().reportClientFailure(getClass().getSimpleName(), format);
                        }
                        if (th instanceof IOException) {
                            invalidateAllChannelsInBlob(arrayList3, format);
                            return null;
                        }
                        if (th instanceof NoSuchAlgorithmException) {
                            throw new SFException(th, ErrorCode.MD5_HASHING_NOT_AVAILABLE, new Object[0]);
                        }
                        if (((th instanceof InvalidAlgorithmParameterException) | (th instanceof NoSuchPaddingException) | (th instanceof IllegalBlockSizeException) | (th instanceof BadPaddingException)) || (th instanceof InvalidKeyException)) {
                            throw new SFException(th, ErrorCode.ENCRYPTION_FAILURE, new Object[0]);
                        }
                        throw new SFException(th, ErrorCode.INTERNAL_ERROR, th.getMessage());
                    }
                }, this.buildUploadWorkers)));
                logger.logInfo("buildAndUpload task added for client={}, blob={}, buildUploadWorkers stats={}", this.owningClient.getName(), generateBlobPath, this.buildUploadWorkers.toString());
            }
        }
    }

    private boolean shouldStopProcessing(float f, float f2, ChannelData<T> channelData, ChannelData<T> channelData2) {
        return f + channelData.getBufferSize() > 1.0737418E9f || f2 + channelData.getBufferSize() > ((float) this.owningClient.getParameterProvider().getMaxChunkSizeInBytes()) || !Objects.equals(channelData.getChannelContext().getEncryptionKeyId(), channelData2.getChannelContext().getEncryptionKeyId()) || !channelData.getColumnEps().keySet().equals(channelData2.getColumnEps().keySet());
    }

    BlobMetadata buildAndUpload(BlobPath blobPath, List<List<ChannelData<T>>> list, String str, Map<FullyQualifiedTableName, EncryptionKey> map) throws IOException, NoSuchAlgorithmException, InvalidAlgorithmParameterException, NoSuchPaddingException, IllegalBlockSizeException, BadPaddingException, InvalidKeyException {
        Timer.Context createTimerContext = Utils.createTimerContext(this.owningClient.buildLatency);
        BlobBuilder.Blob constructBlobAndMetadata = BlobBuilder.constructBlobAndMetadata(blobPath.fileRegistrationPath, list, this.bdecVersion, this.owningClient.getInternalParameterProvider(), map);
        constructBlobAndMetadata.blobStats.setBuildDurationMs(createTimerContext);
        return upload(this.storageManager.getStorage(str), blobPath, constructBlobAndMetadata.blobBytes, constructBlobAndMetadata.chunksMetadataList, constructBlobAndMetadata.blobStats);
    }

    BlobMetadata upload(IStorage iStorage, BlobPath blobPath, byte[] bArr, List<ChunkMetadata> list, BlobStats blobStats) throws NoSuchAlgorithmException {
        logger.logInfo("Start uploading blob={}, size={}", blobPath.fileRegistrationPath, Integer.valueOf(bArr.length));
        long currentTimeMillis = System.currentTimeMillis();
        Timer.Context createTimerContext = Utils.createTimerContext(this.owningClient.uploadLatency);
        iStorage.put(blobPath, bArr);
        if (createTimerContext != null) {
            blobStats.setUploadDurationMs(createTimerContext);
            this.owningClient.uploadThroughput.mark(bArr.length);
            this.owningClient.blobSizeHistogram.update(bArr.length);
            this.owningClient.blobRowCountHistogram.update(list.stream().mapToLong(chunkMetadata -> {
                return chunkMetadata.getEpInfo().getRowCount();
            }).sum());
        }
        logger.logInfo("Finish uploading blob={}, size={}, timeInMillis={}", blobPath.fileRegistrationPath, Integer.valueOf(bArr.length), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        return BlobMetadata.createBlobMetadata(blobPath.fileRegistrationPath, BlobBuilder.computeMD5(bArr), this.bdecVersion, list, blobStats, list == null ? false : list.size() > 1);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdown() throws InterruptedException {
        this.flushWorker.shutdown();
        this.registerWorker.shutdown();
        this.buildUploadWorkers.shutdown();
        if (this.flushWorker.awaitTermination(300L, TimeUnit.SECONDS) && this.registerWorker.awaitTermination(300L, TimeUnit.SECONDS) && this.buildUploadWorkers.awaitTermination(300L, TimeUnit.SECONDS)) {
            return;
        }
        logger.logWarn("Tasks can't be terminated within the timeout, force shutdown now.");
        this.flushWorker.shutdownNow();
        this.registerWorker.shutdownNow();
        this.buildUploadWorkers.shutdownNow();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setNeedFlush(String str) {
        this.isNeedFlush = true;
        if (this.owningClient.getParameterProvider().getMaxChunksInBlob() == 1) {
            this.channelCache.setNeedFlush(str, true);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <CD> void invalidateAllChannelsInBlob(List<List<ChannelData<CD>>> list, String str) {
        list.forEach(list2 -> {
            list2.forEach(channelData -> {
                this.owningClient.getChannelCache().invalidateChannelIfSequencersMatch(channelData.getChannelContext().getDbName(), channelData.getChannelContext().getSchemaName(), channelData.getChannelContext().getTableName(), channelData.getChannelContext().getName(), channelData.getChannelContext().getChannelSequencer(), str);
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean throttleDueToQueuedFlushTasks() {
        boolean z = ((ThreadPoolExecutor) this.buildUploadWorkers).getQueue().size() > this.numProcessors;
        if (z) {
            logger.logWarn("Throttled due too many queue flush tasks (probably because of slow uploading speed), client={}, buildUploadWorkers stats={}", this.owningClient.getName(), this.buildUploadWorkers.toString());
        }
        return z;
    }

    boolean isTestMode() {
        return this.isTestMode;
    }
}
