package org.apache.iotdb.db.storageengine.load.active;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.time.ZoneId;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.SubStringFunctionColumnTransformer;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesNumberMetricsSet;
import org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesSizeMetricsSet;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.class */
public class ActiveLoadTsFileLoader {
    private static final Logger LOGGER = LoggerFactory.getLogger(ActiveLoadTsFileLoader.class);
    private static final IoTDBConfig IOTDB_CONFIG = IoTDBDescriptor.getInstance().getConfig();
    private static final int MAX_PENDING_SIZE = 1000;
    private final ActiveLoadPendingQueue pendingQueue = new ActiveLoadPendingQueue();
    private final AtomicReference<WrappedThreadPoolExecutor> activeLoadExecutor = new AtomicReference<>();
    private final AtomicReference<String> failDir = new AtomicReference<>();

    public int getCurrentAllowedPendingSize() {
        return MAX_PENDING_SIZE - this.pendingQueue.size();
    }

    public void tryTriggerTsFileLoad(String str, boolean z) {
        if (!CommonDescriptor.getInstance().getConfig().isReadOnly() && this.pendingQueue.enqueue(str, z)) {
            initFailDirIfNecessary();
            adjustExecutorIfNecessary();
        }
    }

    private void initFailDirIfNecessary() {
        if (Objects.equals(this.failDir.get(), IOTDB_CONFIG.getLoadActiveListeningFailDir())) {
            return;
        }
        synchronized (this.failDir) {
            if (!Objects.equals(this.failDir.get(), IOTDB_CONFIG.getLoadActiveListeningFailDir())) {
                File file = new File(IOTDB_CONFIG.getLoadActiveListeningFailDir());
                try {
                    FileUtils.forceMkdir(file);
                } catch (IOException e) {
                    LOGGER.warn("Error occurred during creating fail directory {} for active load.", file.getAbsoluteFile(), e);
                }
                this.failDir.set(IOTDB_CONFIG.getLoadActiveListeningFailDir());
                ActiveLoadingFilesSizeMetricsSet.getInstance().updateFailedDir(this.failDir.get());
                ActiveLoadingFilesNumberMetricsSet.getInstance().updateFailedDir(this.failDir.get());
            }
        }
    }

    private void adjustExecutorIfNecessary() {
        if (this.activeLoadExecutor.get() == null) {
            synchronized (this.activeLoadExecutor) {
                if (this.activeLoadExecutor.get() == null) {
                    this.activeLoadExecutor.set(new WrappedThreadPoolExecutor(IOTDB_CONFIG.getLoadActiveListeningMaxThreadNum(), IOTDB_CONFIG.getLoadActiveListeningMaxThreadNum(), 0L, TimeUnit.SECONDS, new LinkedBlockingQueue(), new IoTThreadFactory(ThreadName.ACTIVE_LOAD_TSFILE_LOADER.name()), ThreadName.ACTIVE_LOAD_TSFILE_LOADER.name()));
                }
            }
        }
        int min = Math.min(this.pendingQueue.size(), IOTDB_CONFIG.getLoadActiveListeningMaxThreadNum());
        if (this.activeLoadExecutor.get().getCorePoolSize() != min) {
            this.activeLoadExecutor.get().setCorePoolSize(min);
        }
        int max = Math.max(min - this.activeLoadExecutor.get().getActiveCount(), 0);
        for (int i = 0; i < max; i++) {
            this.activeLoadExecutor.get().execute(this::tryLoadPendingTsFiles);
        }
    }

    private void tryLoadPendingTsFiles() {
        while (true) {
            Optional<Pair<String, Boolean>> tryGetNextPendingFile = tryGetNextPendingFile();
            if (!tryGetNextPendingFile.isPresent()) {
                return;
            }
            try {
                TSStatus loadTsFile = loadTsFile(tryGetNextPendingFile.get());
                if (loadTsFile.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() || loadTsFile.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
                    LOGGER.info("Successfully auto load tsfile {} (isGeneratedByPipe = {})", tryGetNextPendingFile.get().getLeft(), tryGetNextPendingFile.get().getRight());
                } else {
                    handleLoadFailure(tryGetNextPendingFile.get(), loadTsFile);
                }
            } catch (FileNotFoundException e) {
                handleFileNotFoundException(tryGetNextPendingFile.get());
            } catch (Exception e2) {
                handleOtherException(tryGetNextPendingFile.get(), e2);
            } finally {
                this.pendingQueue.removeFromLoading((String) tryGetNextPendingFile.get().getLeft());
            }
        }
    }

    private Optional<Pair<String, Boolean>> tryGetNextPendingFile() {
        long j;
        long max = Math.max(1L, IOTDB_CONFIG.getLoadActiveListeningCheckIntervalSeconds() << 1);
        long j2 = 0;
        do {
            Pair<String, Boolean> dequeueFromPending = this.pendingQueue.dequeueFromPending();
            if (Objects.nonNull(dequeueFromPending)) {
                return Optional.of(dequeueFromPending);
            }
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1L));
            j = j2;
            j2 = j + 1;
        } while (j < max);
        return Optional.empty();
    }

    private TSStatus loadTsFile(Pair<String, Boolean> pair) throws FileNotFoundException {
        LoadTsFileStatement loadTsFileStatement = new LoadTsFileStatement((String) pair.getLeft());
        loadTsFileStatement.setDeleteAfterLoad(true);
        loadTsFileStatement.setVerifySchema(true);
        loadTsFileStatement.setAutoCreateDatabase(false);
        return executeStatement(((Boolean) pair.getRight()).booleanValue() ? new PipeEnrichedStatement(loadTsFileStatement) : loadTsFileStatement);
    }

    private TSStatus executeStatement(Statement statement) {
        return Coordinator.getInstance().executeForTreeModel(statement, SessionManager.getInstance().requestQueryId(), new SessionInfo(0L, AuthorityChecker.SUPER_USER, ZoneId.systemDefault()), SubStringFunctionColumnTransformer.EMPTY_STRING, ClusterPartitionFetcher.getInstance(), ClusterSchemaFetcher.getInstance(), IOTDB_CONFIG.getQueryTimeoutThreshold()).status;
    }

    private void handleLoadFailure(Pair<String, Boolean> pair, TSStatus tSStatus) {
        if (tSStatus.getMessage() != null && tSStatus.getMessage().contains("memory")) {
            LOGGER.info("Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to memory constraints, will retry later.", pair.getLeft(), pair.getRight());
            return;
        }
        if (CommonDescriptor.getInstance().getConfig().isReadOnly() || (tSStatus.getMessage() != null && tSStatus.getMessage().contains("read only"))) {
            LOGGER.info("Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to the system is read only, will retry later.", pair.getLeft(), pair.getRight());
        } else {
            LOGGER.warn("Failed to auto load tsfile {} (isGeneratedByPipe = {}), status: {}. File will be moved to fail directory.", new Object[]{pair.getLeft(), pair.getRight(), tSStatus});
            removeFileAndResourceAndModsToFailDir((String) pair.getLeft());
        }
    }

    private void handleFileNotFoundException(Pair<String, Boolean> pair) {
        LOGGER.warn("Failed to auto load tsfile {} (isGeneratedByPipe = {}) due to file not found, will skip this file.", pair.getLeft(), pair.getRight());
        removeFileAndResourceAndModsToFailDir((String) pair.getLeft());
    }

    private void handleOtherException(Pair<String, Boolean> pair, Exception exc) {
        if (exc.getMessage() != null && exc.getMessage().contains("memory")) {
            LOGGER.info("Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to memory constraints, will retry later.", pair.getLeft(), pair.getRight());
            return;
        }
        if (CommonDescriptor.getInstance().getConfig().isReadOnly() || (exc.getMessage() != null && exc.getMessage().contains("read only"))) {
            LOGGER.info("Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to the system is read only, will retry later.", pair.getLeft(), pair.getRight());
        } else {
            LOGGER.warn("Failed to auto load tsfile {} (isGeneratedByPipe = {}) because of an unexpected exception. File will be moved to fail directory.", new Object[]{pair.getLeft(), pair.getRight(), exc});
            removeFileAndResourceAndModsToFailDir((String) pair.getLeft());
        }
    }

    private void removeFileAndResourceAndModsToFailDir(String str) {
        removeToFailDir(str);
        removeToFailDir(str + TsFileResource.RESOURCE_SUFFIX);
        removeToFailDir(str + ModificationFile.FILE_SUFFIX);
    }

    private void removeToFailDir(String str) {
        File file = new File(str);
        if (file.exists()) {
            try {
                org.apache.iotdb.commons.utils.FileUtils.moveFileWithMD5Check(file, new File(this.failDir.get()));
            } catch (IOException e) {
                LOGGER.warn("Error occurred during moving file {} to fail directory.", str, e);
            }
        }
    }

    public boolean isFilePendingOrLoading(File file) {
        return this.pendingQueue.isFilePendingOrLoading(file.getAbsolutePath());
    }

    public long countAndReportFailedFileNumber() {
        final long[] jArr = {0};
        final long[] jArr2 = {0};
        try {
            initFailDirIfNecessary();
            Files.walkFileTree(new File(this.failDir.get()).toPath(), new SimpleFileVisitor<Path>() { // from class: org.apache.iotdb.db.storageengine.load.active.ActiveLoadTsFileLoader.1
                @Override // java.nio.file.SimpleFileVisitor, java.nio.file.FileVisitor
                public FileVisitResult visitFile(Path path, BasicFileAttributes basicFileAttributes) {
                    long[] jArr3 = jArr;
                    jArr3[0] = jArr3[0] + 1;
                    try {
                        long[] jArr4 = jArr2;
                        jArr4[0] = jArr4[0] + path.toFile().length();
                    } catch (Exception e) {
                        ActiveLoadTsFileLoader.LOGGER.debug("Failed to count failed files in fail directory.", e);
                    }
                    return FileVisitResult.CONTINUE;
                }
            });
            ActiveLoadingFilesNumberMetricsSet.getInstance().updateTotalFailedFileCounter(jArr[0]);
            ActiveLoadingFilesSizeMetricsSet.getInstance().updateTotalFailedFileCounter(jArr2[0]);
        } catch (IOException e) {
            LOGGER.debug("Failed to count failed files in fail directory.", e);
        }
        return jArr[0];
    }
}
