package org.apache.iotdb.db.storageengine.load;

import java.io.File;
import java.io.IOException;
import java.nio.file.DirectoryNotEmptyException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.SubStringFunctionColumnTransformer;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
import org.apache.iotdb.db.storageengine.load.active.ActiveLoadAgent;
import org.apache.iotdb.db.storageengine.load.splitter.ChunkData;
import org.apache.iotdb.db.storageengine.load.splitter.DeletionData;
import org.apache.iotdb.db.storageengine.load.splitter.TsFileData;
import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/storageengine/load/LoadTsFileManager.class */
public class LoadTsFileManager {
    private static final String MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED = "%s TsFileWriterManager has been closed.";
    private static final String MESSAGE_DELETE_FAIL = "failed to delete {}.";
    private final Map<String, TsFileWriterManager> uuid2WriterManager = new ConcurrentHashMap();
    private final Map<String, CleanupTask> uuid2CleanupTask = new ConcurrentHashMap();
    private final PriorityBlockingQueue<CleanupTask> cleanupTaskQueue = new PriorityBlockingQueue<>();
    private final ActiveLoadAgent activeLoadAgent = new ActiveLoadAgent();
    private static final Logger LOGGER = LoggerFactory.getLogger(LoadTsFileManager.class);
    private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
    private static final AtomicReference<String[]> LOAD_BASE_DIRS = new AtomicReference<>(CONFIG.getLoadTsFileDirs());
    private static final AtomicReference<FolderManager> FOLDER_MANAGER = new AtomicReference<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iotdb/db/storageengine/load/LoadTsFileManager$CleanupTask.class */
    public class CleanupTask implements Runnable, Comparable<CleanupTask> {
        private final String uuid;
        private final long delayInMs;
        private long scheduledTime;
        private volatile boolean isLoadTaskRunning;
        private volatile boolean isCanceled;

        private CleanupTask(String str, long j) {
            this.isLoadTaskRunning = false;
            this.isCanceled = false;
            this.uuid = str;
            this.delayInMs = j;
            resetScheduledTime();
        }

        public void markLoadTaskRunning() {
            this.isLoadTaskRunning = true;
            resetScheduledTime();
        }

        public void markLoadTaskNotRunning() {
            this.isLoadTaskRunning = false;
            resetScheduledTime();
        }

        public void resetScheduledTime() {
            this.scheduledTime = System.currentTimeMillis() + this.delayInMs;
        }

        public void cancel() {
            this.isCanceled = true;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this.isCanceled) {
                LoadTsFileManager.LOGGER.info("Load cleanup task {} is canceled.", this.uuid);
                return;
            }
            LoadTsFileManager.LOGGER.info("Load cleanup task {} starts.", this.uuid);
            try {
                LoadTsFileManager.this.forceCloseWriterManager(this.uuid);
            } catch (Exception e) {
                LoadTsFileManager.LOGGER.warn("Load cleanup task {} error.", this.uuid, e);
            }
        }

        @Override // java.lang.Comparable
        public int compareTo(CleanupTask cleanupTask) {
            return Long.compare(this.scheduledTime, cleanupTask.scheduledTime);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iotdb/db/storageengine/load/LoadTsFileManager$DataPartitionInfo.class */
    public static class DataPartitionInfo {
        private final DataRegion dataRegion;
        private final TTimePartitionSlot timePartitionSlot;

        private DataPartitionInfo(DataRegion dataRegion, TTimePartitionSlot tTimePartitionSlot) {
            this.dataRegion = dataRegion;
            this.timePartitionSlot = tTimePartitionSlot;
        }

        public DataRegion getDataRegion() {
            return this.dataRegion;
        }

        public String toString() {
            return String.join("-", this.dataRegion.getDatabaseName(), this.dataRegion.getDataRegionId(), Long.toString(this.timePartitionSlot.getStartTime()));
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            DataPartitionInfo dataPartitionInfo = (DataPartitionInfo) obj;
            return Objects.equals(this.dataRegion, dataPartitionInfo.dataRegion) && this.timePartitionSlot.getStartTime() == dataPartitionInfo.timePartitionSlot.getStartTime();
        }

        public int hashCode() {
            return Objects.hash(this.dataRegion, Long.valueOf(this.timePartitionSlot.getStartTime()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iotdb/db/storageengine/load/LoadTsFileManager$TsFileWriterManager.class */
    public static class TsFileWriterManager {
        private final File taskDir;
        private Map<DataPartitionInfo, TsFileIOWriter> dataPartition2Writer;
        private Map<DataPartitionInfo, String> dataPartition2LastDevice;
        private Map<DataPartitionInfo, ModificationFile> dataPartition2ModificationFile;
        private boolean isClosed;

        private TsFileWriterManager(File file) {
            this.taskDir = file;
            this.dataPartition2Writer = new HashMap();
            this.dataPartition2LastDevice = new HashMap();
            this.dataPartition2ModificationFile = new HashMap();
            this.isClosed = false;
            clearDir(file);
        }

        private void clearDir(File file) {
            if (file.exists()) {
                FileUtils.deleteFileOrDirectory(file);
            }
            if (file.mkdirs()) {
                LoadTsFileManager.LOGGER.info("Load TsFile dir {} is created.", file.getPath());
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void write(DataPartitionInfo dataPartitionInfo, ChunkData chunkData) throws IOException {
            if (this.isClosed) {
                throw new IOException(String.format(LoadTsFileManager.MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, this.taskDir));
            }
            if (!this.dataPartition2Writer.containsKey(dataPartitionInfo)) {
                File file = SystemFileFactory.INSTANCE.getFile(this.taskDir, dataPartitionInfo.toString() + ".tsfile");
                if (!file.createNewFile()) {
                    LoadTsFileManager.LOGGER.error("Can not create TsFile {} for writing.", file.getPath());
                    return;
                }
                this.dataPartition2Writer.put(dataPartitionInfo, new TsFileIOWriter(file));
            }
            TsFileIOWriter tsFileIOWriter = this.dataPartition2Writer.get(dataPartitionInfo);
            if (!chunkData.getDevice().equals(this.dataPartition2LastDevice.getOrDefault(dataPartitionInfo, SubStringFunctionColumnTransformer.EMPTY_STRING))) {
                if (this.dataPartition2LastDevice.containsKey(dataPartitionInfo)) {
                    tsFileIOWriter.endChunkGroup();
                }
                tsFileIOWriter.startChunkGroup(new PlainDeviceID(chunkData.getDevice()));
                this.dataPartition2LastDevice.put(dataPartitionInfo, chunkData.getDevice());
            }
            chunkData.writeToFileWriter(tsFileIOWriter);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void writeDeletion(DataRegion dataRegion, DeletionData deletionData) throws IOException {
            if (this.isClosed) {
                throw new IOException(String.format(LoadTsFileManager.MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, this.taskDir));
            }
            for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry : this.dataPartition2Writer.entrySet()) {
                DataPartitionInfo key = entry.getKey();
                if (key.getDataRegion().equals(dataRegion)) {
                    TsFileIOWriter value = entry.getValue();
                    if (!this.dataPartition2ModificationFile.containsKey(key)) {
                        File file = SystemFileFactory.INSTANCE.getFile(value.getFile().getAbsolutePath() + ModificationFile.FILE_SUFFIX);
                        if (!file.createNewFile()) {
                            LoadTsFileManager.LOGGER.error("Can not create ModificationFile {} for writing.", file.getPath());
                            return;
                        }
                        this.dataPartition2ModificationFile.put(key, new ModificationFile(file.getAbsolutePath()));
                    }
                    ModificationFile modificationFile = this.dataPartition2ModificationFile.get(key);
                    value.flush();
                    deletionData.writeToModificationFile(modificationFile, value.getFile().length());
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void loadAll(boolean z, ProgressIndex progressIndex) throws IOException, LoadFileException {
            if (this.isClosed) {
                throw new IOException(String.format(LoadTsFileManager.MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, this.taskDir));
            }
            Iterator<Map.Entry<DataPartitionInfo, ModificationFile>> it = this.dataPartition2ModificationFile.entrySet().iterator();
            while (it.hasNext()) {
                it.next().getValue().close();
            }
            for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry : this.dataPartition2Writer.entrySet()) {
                TsFileIOWriter value = entry.getValue();
                if (value.isWritingChunkGroup()) {
                    value.endChunkGroup();
                }
                value.endFile();
                DataRegion dataRegion = entry.getKey().getDataRegion();
                dataRegion.loadNewTsFile(generateResource(value, progressIndex), true, z);
                dataRegion.getNonSystemDatabaseName().ifPresent(str -> {
                    LoadTsFileManager.updateWritePointCountMetrics(dataRegion, str, getTsFileWritePointCount(value), false);
                });
            }
        }

        private TsFileResource generateResource(TsFileIOWriter tsFileIOWriter, ProgressIndex progressIndex) throws IOException {
            TsFileResource generateTsFileResource = TsFileResourceUtils.generateTsFileResource(tsFileIOWriter);
            generateTsFileResource.setProgressIndex(progressIndex);
            generateTsFileResource.serialize();
            return generateTsFileResource;
        }

        private long getTsFileWritePointCount(TsFileIOWriter tsFileIOWriter) {
            return tsFileIOWriter.getChunkGroupMetadataList().stream().flatMap(chunkGroupMetadata -> {
                return chunkGroupMetadata.getChunkMetadataList().stream();
            }).mapToLong(chunkMetadata -> {
                return chunkMetadata.getStatistics().getCount();
            }).sum();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void close() {
            if (this.isClosed) {
                return;
            }
            if (this.dataPartition2Writer != null) {
                for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry : this.dataPartition2Writer.entrySet()) {
                    try {
                        TsFileIOWriter value = entry.getValue();
                        if (value.canWrite()) {
                            value.close();
                        }
                        Path path = value.getFile().toPath();
                        if (Files.exists(path, new LinkOption[0])) {
                            Files.delete(path);
                        }
                    } catch (IOException e) {
                        LoadTsFileManager.LOGGER.warn("Close TsFileIOWriter {} error.", entry.getValue().getFile().getPath(), e);
                    }
                }
            }
            if (this.dataPartition2ModificationFile != null) {
                for (Map.Entry<DataPartitionInfo, ModificationFile> entry2 : this.dataPartition2ModificationFile.entrySet()) {
                    try {
                        ModificationFile value2 = entry2.getValue();
                        value2.close();
                        Path path2 = new File(value2.getFilePath()).toPath();
                        if (Files.exists(path2, new LinkOption[0])) {
                            Files.delete(path2);
                        }
                    } catch (IOException e2) {
                        LoadTsFileManager.LOGGER.warn("Close ModificationFile {} error.", entry2.getValue().getFilePath(), e2);
                    }
                }
            }
            try {
                Files.delete(this.taskDir.toPath());
            } catch (DirectoryNotEmptyException e3) {
                LoadTsFileManager.LOGGER.info("Task dir {} is not empty, skip deleting.", this.taskDir.getPath());
            } catch (IOException e4) {
                LoadTsFileManager.LOGGER.warn(LoadTsFileManager.MESSAGE_DELETE_FAIL, this.taskDir.getPath(), e4);
            }
            this.dataPartition2Writer = null;
            this.dataPartition2LastDevice = null;
            this.dataPartition2ModificationFile = null;
            this.isClosed = true;
        }
    }

    public LoadTsFileManager() {
        registerCleanupTaskExecutor();
        recover();
        this.activeLoadAgent.start();
    }

    private void registerCleanupTaskExecutor() {
        PipeDataNodeAgent.runtime().registerPeriodicalJob("LoadTsFileManager#cleanupTasks", this::cleanupTasks, CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds() >> 2);
    }

    private void cleanupTasks() {
        while (!this.cleanupTaskQueue.isEmpty()) {
            synchronized (this.uuid2CleanupTask) {
                if (!this.cleanupTaskQueue.isEmpty()) {
                    CleanupTask peek = this.cleanupTaskQueue.peek();
                    if (peek.scheduledTime > System.currentTimeMillis()) {
                        long currentTimeMillis = peek.scheduledTime - System.currentTimeMillis();
                        LOGGER.info("Next load cleanup task {} is not ready to run, wait for at least {} ms ({}s).", new Object[]{peek.uuid, Long.valueOf(currentTimeMillis), Double.valueOf(currentTimeMillis / 1000.0d)});
                        return;
                    } else if (peek.isLoadTaskRunning) {
                        this.cleanupTaskQueue.poll();
                        peek.resetScheduledTime();
                        this.cleanupTaskQueue.add(peek);
                    } else {
                        peek.run();
                        this.uuid2CleanupTask.remove(peek.uuid);
                        this.cleanupTaskQueue.poll();
                    }
                }
            }
        }
    }

    private void recover() {
        if (CONFIG.getLoadTsFileDirs() != LOAD_BASE_DIRS.get()) {
            synchronized (FOLDER_MANAGER) {
                if (CONFIG.getLoadTsFileDirs() != LOAD_BASE_DIRS.get()) {
                    LOAD_BASE_DIRS.set(CONFIG.getLoadTsFileDirs());
                }
            }
        }
        ((Stream) Arrays.stream((File[]) Arrays.stream((File[]) Arrays.stream(LOAD_BASE_DIRS.get()).map(File::new).toArray(i -> {
            return new File[i];
        })).filter((v0) -> {
            return v0.exists();
        }).flatMap(file -> {
            File[] listFiles = file.listFiles();
            return listFiles != null ? Arrays.stream(listFiles) : Stream.empty();
        }).toArray(i2 -> {
            return new File[i2];
        })).parallel()).forEach(file2 -> {
            String name = file2.getName();
            TsFileWriterManager tsFileWriterManager = new TsFileWriterManager(file2);
            this.uuid2WriterManager.put(name, tsFileWriterManager);
            tsFileWriterManager.close();
            synchronized (this.uuid2CleanupTask) {
                CleanupTask cleanupTask = new CleanupTask(name, CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds() * 1000);
                this.uuid2CleanupTask.put(name, cleanupTask);
                this.cleanupTaskQueue.add(cleanupTask);
            }
        });
    }

    public void writeToDataRegion(DataRegion dataRegion, LoadTsFilePieceNode loadTsFilePieceNode, String str) throws IOException {
        if (!this.uuid2WriterManager.containsKey(str)) {
            synchronized (this.uuid2CleanupTask) {
                CleanupTask cleanupTask = new CleanupTask(str, CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds() * 1000);
                this.uuid2CleanupTask.put(str, cleanupTask);
                this.cleanupTaskQueue.add(cleanupTask);
            }
        }
        Optional of = Optional.of(this.uuid2CleanupTask.get(str));
        of.ifPresent((v0) -> {
            v0.markLoadTaskRunning();
        });
        try {
            AtomicReference atomicReference = new AtomicReference();
            TsFileWriterManager computeIfAbsent = this.uuid2WriterManager.computeIfAbsent(str, str2 -> {
                try {
                    return new TsFileWriterManager(new File(getNextFolder(), str));
                } catch (DiskSpaceInsufficientException e) {
                    atomicReference.set(e);
                    return null;
                }
            });
            if (atomicReference.get() != null || computeIfAbsent == null) {
                throw new IOException("Failed to create TsFileWriterManager for uuid " + str + " because of insufficient disk space.", (Throwable) atomicReference.get());
            }
            for (TsFileData tsFileData : loadTsFilePieceNode.getAllTsFileData()) {
                if (tsFileData.isModification()) {
                    computeIfAbsent.writeDeletion(dataRegion, (DeletionData) tsFileData);
                } else {
                    ChunkData chunkData = (ChunkData) tsFileData;
                    computeIfAbsent.write(new DataPartitionInfo(dataRegion, chunkData.getTimePartitionSlot()), chunkData);
                }
            }
        } finally {
            of.ifPresent((v0) -> {
                v0.markLoadTaskNotRunning();
            });
        }
    }

    private String getNextFolder() throws DiskSpaceInsufficientException {
        if (CONFIG.getLoadTsFileDirs() != LOAD_BASE_DIRS.get()) {
            synchronized (FOLDER_MANAGER) {
                if (CONFIG.getLoadTsFileDirs() != LOAD_BASE_DIRS.get()) {
                    LOAD_BASE_DIRS.set(CONFIG.getLoadTsFileDirs());
                    FOLDER_MANAGER.set(new FolderManager(Arrays.asList(LOAD_BASE_DIRS.get()), DirectoryStrategyType.SEQUENCE_STRATEGY));
                    return FOLDER_MANAGER.get().getNextFolder();
                }
            }
        }
        if (FOLDER_MANAGER.get() == null) {
            synchronized (FOLDER_MANAGER) {
                if (FOLDER_MANAGER.get() == null) {
                    FOLDER_MANAGER.set(new FolderManager(Arrays.asList(LOAD_BASE_DIRS.get()), DirectoryStrategyType.SEQUENCE_STRATEGY));
                    return FOLDER_MANAGER.get().getNextFolder();
                }
            }
        }
        return FOLDER_MANAGER.get().getNextFolder();
    }

    public boolean loadAll(String str, boolean z, ProgressIndex progressIndex) throws IOException, LoadFileException {
        if (!this.uuid2WriterManager.containsKey(str)) {
            return false;
        }
        Optional of = Optional.of(this.uuid2CleanupTask.get(str));
        of.ifPresent((v0) -> {
            v0.markLoadTaskRunning();
        });
        try {
            this.uuid2WriterManager.get(str).loadAll(z, progressIndex);
            of.ifPresent((v0) -> {
                v0.markLoadTaskNotRunning();
            });
            clean(str);
            return true;
        } catch (Throwable th) {
            of.ifPresent((v0) -> {
                v0.markLoadTaskNotRunning();
            });
            throw th;
        }
    }

    public boolean deleteAll(String str) {
        if (!this.uuid2WriterManager.containsKey(str)) {
            return false;
        }
        clean(str);
        return true;
    }

    private void clean(String str) {
        synchronized (this.uuid2CleanupTask) {
            CleanupTask cleanupTask = this.uuid2CleanupTask.get(str);
            if (cleanupTask != null) {
                cleanupTask.cancel();
            }
        }
        forceCloseWriterManager(str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void forceCloseWriterManager(String str) {
        TsFileWriterManager remove = this.uuid2WriterManager.remove(str);
        if (Objects.nonNull(remove)) {
            remove.close();
        }
    }

    public static void updateWritePointCountMetrics(DataRegion dataRegion, String str, long j, boolean z) {
        MemTableFlushTask.recordFlushPointsMetricInternal(j, str, dataRegion.getDataRegionId());
        MetricService.getInstance().count(j, Metric.QUANTITY.toString(), MetricLevel.CORE, new String[]{Tag.NAME.toString(), Metric.POINTS_IN.toString(), Tag.DATABASE.toString(), str, Tag.REGION.toString(), dataRegion.getDataRegionId(), Tag.TYPE.toString(), Metric.LOAD_POINT_COUNT.toString()});
        int replicationNum = DataRegionConsensusImpl.getInstance().getReplicationNum(ConsensusGroupId.Factory.create(TConsensusGroupType.DataRegion.getValue(), Integer.parseInt(dataRegion.getDataRegionId())));
        if (replicationNum == 0 || z) {
            return;
        }
        MetricService.getInstance().count(j / replicationNum, Metric.LEADER_QUANTITY.toString(), MetricLevel.CORE, new String[]{Tag.NAME.toString(), Metric.POINTS_IN.toString(), Tag.DATABASE.toString(), str, Tag.REGION.toString(), dataRegion.getDataRegionId(), Tag.TYPE.toString(), Metric.LOAD_POINT_COUNT.toString()});
    }
}
