package org.apache.iotdb.db.pipe.extractor.dataregion.historical;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.commons.pipe.datastructure.resource.PersistentResource;
import org.apache.iotdb.commons.pipe.extractor.IoTDBExtractor;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.consensus.pipe.PipeConsensus;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
import org.apache.iotdb.db.storageengine.dataregion.wal.node.WALNode;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.db.utils.constant.SqlConstant;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.class */
public class PipeHistoricalDataRegionTsFileAndDeletionExtractor implements PipeHistoricalDataRegionExtractor {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeHistoricalDataRegionTsFileAndDeletionExtractor.class);
    private static final Map<Integer, Long> DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP = new HashMap();
    private static final long PIPE_MIN_FLUSH_INTERVAL_IN_MS = 2000;
    private static final String TREE_MODEL_EVENT_TABLE_NAME_PREFIX = "root.";
    private String pipeName;
    private long creationTime;
    private PipeTaskMeta pipeTaskMeta;
    private ProgressIndex startIndex;
    private int dataRegionId;
    private TreePattern treePattern;
    private TablePattern tablePattern;
    private boolean isTableModel;
    private long historicalDataExtractionTimeLowerBound;
    private boolean sloppyTimeRange;
    private boolean sloppyPattern;
    private Pair<Boolean, Boolean> listeningOptionPair;
    private boolean shouldExtractInsertion;
    private boolean shouldExtractDeletion;
    private boolean shouldTransferModFile;
    protected String userName;
    private Queue<PersistentResource> pendingQueue;
    private boolean isModelDetected = false;
    private boolean isDbNameCoveredByPattern = false;
    private boolean isHistoricalExtractorEnabled = false;
    private long historicalDataExtractionStartTime = Long.MIN_VALUE;
    private long historicalDataExtractionEndTime = WALNode.DEFAULT_SAFELY_DELETED_SEARCH_INDEX;
    protected boolean skipIfNoPrivileges = true;
    private boolean isTerminateSignalSent = false;
    private volatile boolean hasBeenStarted = false;

    public void validate(PipeParameterValidator pipeParameterValidator) {
        PipeParameters parameters = pipeParameterValidator.getParameters();
        try {
            this.listeningOptionPair = DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters);
            if (parameters.hasAnyAttributes(new String[]{"extractor.mode.strict", "source.mode.strict"})) {
                boolean booleanOrDefault = parameters.getBooleanOrDefault(Arrays.asList("extractor.mode.strict", "source.mode.strict"), true);
                this.sloppyTimeRange = !booleanOrDefault;
                this.sloppyPattern = !booleanOrDefault;
            } else {
                String trim = parameters.getStringOrDefault(Arrays.asList("extractor.history.loose-range", "source.history.loose-range"), "").trim();
                if ("all".equalsIgnoreCase(trim)) {
                    this.sloppyTimeRange = true;
                    this.sloppyPattern = true;
                } else {
                    Set set = (Set) Arrays.stream(trim.split(",")).map((v0) -> {
                        return v0.trim();
                    }).filter(str -> {
                        return !str.isEmpty();
                    }).map((v0) -> {
                        return v0.toLowerCase();
                    }).collect(Collectors.toSet());
                    this.sloppyTimeRange = set.remove("time");
                    this.sloppyPattern = set.remove("path");
                    if (!set.isEmpty()) {
                        throw new PipeParameterNotValidException(String.format("Parameters in set %s are not allowed in 'history.loose-range'", set));
                    }
                }
            }
            if (!parameters.hasAnyAttributes(new String[]{"source.start-time", "extractor.start-time", "source.end-time", "extractor.end-time"})) {
                this.isHistoricalExtractorEnabled = parameters.getBooleanOrDefault("__system.restart", false) || parameters.getBooleanOrDefault(Arrays.asList("extractor.history.enable", "source.history.enable"), true);
                try {
                    this.historicalDataExtractionStartTime = (this.isHistoricalExtractorEnabled && parameters.hasAnyAttributes(new String[]{"extractor.history.start-time", "source.history.start-time"})) ? DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(parameters.getStringByKeys(new String[]{"extractor.history.start-time", "source.history.start-time"})) : Long.MIN_VALUE;
                    this.historicalDataExtractionEndTime = (this.isHistoricalExtractorEnabled && parameters.hasAnyAttributes(new String[]{"extractor.history.end-time", "source.history.end-time"})) ? DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(parameters.getStringByKeys(new String[]{"extractor.history.end-time", "source.history.end-time"})) : WALNode.DEFAULT_SAFELY_DELETED_SEARCH_INDEX;
                    if (this.historicalDataExtractionStartTime > this.historicalDataExtractionEndTime) {
                        throw new PipeParameterNotValidException(String.format("%s (%s) [%s] should be less than or equal to %s (%s) [%s].", "extractor.history.start-time", "source.history.start-time", Long.valueOf(this.historicalDataExtractionStartTime), "extractor.history.end-time", "source.history.end-time", Long.valueOf(this.historicalDataExtractionEndTime)));
                    }
                    return;
                } catch (Exception e) {
                    throw new PipeParameterNotValidException(e.getMessage());
                }
            }
            this.isHistoricalExtractorEnabled = true;
            try {
                this.historicalDataExtractionStartTime = parameters.hasAnyAttributes(new String[]{"source.start-time", "extractor.start-time"}) ? DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(parameters.getStringByKeys(new String[]{"source.start-time", "extractor.start-time"})) : Long.MIN_VALUE;
                this.historicalDataExtractionEndTime = parameters.hasAnyAttributes(new String[]{"source.end-time", "extractor.end-time"}) ? DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(parameters.getStringByKeys(new String[]{"source.end-time", "extractor.end-time"})) : WALNode.DEFAULT_SAFELY_DELETED_SEARCH_INDEX;
                if (this.historicalDataExtractionStartTime > this.historicalDataExtractionEndTime) {
                    throw new PipeParameterNotValidException(String.format("%s (%s) [%s] should be less than or equal to %s (%s) [%s].", "source.start-time", "extractor.start-time", Long.valueOf(this.historicalDataExtractionStartTime), "source.end-time", "extractor.end-time", Long.valueOf(this.historicalDataExtractionEndTime)));
                }
            } catch (PipeParameterNotValidException e2) {
                throw e2;
            } catch (Exception e3) {
                throw new PipeParameterNotValidException(e3.getMessage());
            }
        } catch (Exception e4) {
            throw new PipeParameterNotValidException(e4.getMessage());
        }
    }

    public void customize(PipeParameters pipeParameters, PipeExtractorRuntimeConfiguration pipeExtractorRuntimeConfiguration) throws IllegalPathException {
        this.shouldExtractInsertion = ((Boolean) this.listeningOptionPair.getLeft()).booleanValue();
        this.shouldExtractDeletion = ((Boolean) this.listeningOptionPair.getRight()).booleanValue();
        if (this.shouldExtractInsertion) {
            PipeTaskExtractorRuntimeEnvironment runtimeEnvironment = pipeExtractorRuntimeConfiguration.getRuntimeEnvironment();
            this.pipeName = runtimeEnvironment.getPipeName();
            this.creationTime = runtimeEnvironment.getCreationTime();
            this.pipeTaskMeta = runtimeEnvironment.getPipeTaskMeta();
            this.startIndex = runtimeEnvironment.getPipeTaskMeta().getProgressIndex();
            this.dataRegionId = runtimeEnvironment.getRegionId();
            synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
                DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.putIfAbsent(Integer.valueOf(this.dataRegionId), 0L);
            }
            this.treePattern = TreePattern.parsePipePatternFromSourceParameters(pipeParameters);
            this.tablePattern = TablePattern.parsePipePatternFromSourceParameters(pipeParameters);
            DataRegion dataRegion = StorageEngine.getInstance().getDataRegion(new DataRegionId(runtimeEnvironment.getRegionId()));
            if (Objects.nonNull(dataRegion)) {
                String databaseName = dataRegion.getDatabaseName();
                if (Objects.nonNull(databaseName)) {
                    this.isTableModel = PathUtils.isTableModelDatabase(databaseName);
                    this.isModelDetected = true;
                    if (this.isTableModel) {
                        this.isDbNameCoveredByPattern = this.tablePattern.coversDb(databaseName);
                    } else {
                        this.isDbNameCoveredByPattern = this.treePattern.coversDb(databaseName);
                    }
                }
            }
            this.historicalDataExtractionTimeLowerBound = this.isHistoricalExtractorEnabled ? Long.MIN_VALUE : runtimeEnvironment.getCreationTime();
            if (this.historicalDataExtractionTimeLowerBound != Long.MIN_VALUE) {
                synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
                    if (System.currentTimeMillis() - DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(Integer.valueOf(this.dataRegionId)).longValue() >= PIPE_MIN_FLUSH_INTERVAL_IN_MS) {
                        flushDataRegionAllTsFiles();
                        DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(Integer.valueOf(this.dataRegionId), Long.valueOf(System.currentTimeMillis()));
                    }
                }
            }
            if (pipeParameters.hasAnyAttributes(new String[]{"extractor.mods", "source.mods"})) {
                this.shouldTransferModFile = pipeParameters.getBooleanOrDefault(Arrays.asList("extractor.mods", "source.mods"), ((Boolean) this.listeningOptionPair.getRight()).booleanValue());
            } else {
                this.shouldTransferModFile = pipeParameters.getBooleanOrDefault(Arrays.asList("source.mods.enable", "extractor.mods.enable"), ((Boolean) this.listeningOptionPair.getRight()).booleanValue());
            }
            this.userName = pipeParameters.getStringByKeys(new String[]{"extractor.user", "source.user", "extractor.username", "source.username"});
            this.skipIfNoPrivileges = IoTDBExtractor.getSkipIfNoPrivileges(pipeParameters);
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Pipe {}@{}: historical data extraction time range, start time {}({}), end time {}({}), sloppy pattern {}, sloppy time range {}, should transfer mod file {}, username: {}, skip if no privileges: {}", new Object[]{this.pipeName, Integer.valueOf(this.dataRegionId), DateTimeUtils.convertLongToDate(this.historicalDataExtractionStartTime), Long.valueOf(this.historicalDataExtractionStartTime), DateTimeUtils.convertLongToDate(this.historicalDataExtractionEndTime), Long.valueOf(this.historicalDataExtractionEndTime), Boolean.valueOf(this.sloppyPattern), Boolean.valueOf(this.sloppyTimeRange), Boolean.valueOf(this.shouldTransferModFile), this.userName, Boolean.valueOf(this.skipIfNoPrivileges)});
            }
        }
    }

    private void flushDataRegionAllTsFiles() {
        DataRegion dataRegion = StorageEngine.getInstance().getDataRegion(new DataRegionId(this.dataRegionId));
        if (Objects.isNull(dataRegion)) {
            return;
        }
        dataRegion.writeLock("Pipe: create historical TsFile extractor");
        try {
            dataRegion.syncCloseAllWorkingTsFileProcessors();
        } finally {
            dataRegion.writeUnlock();
        }
    }

    public synchronized void start() {
        if (!this.shouldExtractInsertion) {
            this.hasBeenStarted = true;
            return;
        }
        if (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
            LOGGER.info("Pipe {}@{}: failed to start to extract historical TsFile, storage engine is not ready. Will retry later.", this.pipeName, Integer.valueOf(this.dataRegionId));
            return;
        }
        this.hasBeenStarted = true;
        DataRegion dataRegion = StorageEngine.getInstance().getDataRegion(new DataRegionId(this.dataRegionId));
        if (Objects.isNull(dataRegion)) {
            this.pendingQueue = new ArrayDeque();
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        dataRegion.writeLock("Pipe: start to extract historical TsFile and Deletion(if uses pipeConsensus)");
        try {
            ArrayList arrayList = new ArrayList();
            if (this.shouldExtractInsertion) {
                flushTsFilesForExtraction(dataRegion, currentTimeMillis);
                extractTsFiles(dataRegion, currentTimeMillis, arrayList);
            }
            if (this.shouldExtractDeletion) {
                Optional.ofNullable(DeletionResourceManager.getInstance(String.valueOf(this.dataRegionId))).ifPresent(deletionResourceManager -> {
                    extractDeletions(deletionResourceManager, arrayList);
                });
            }
            long currentTimeMillis2 = System.currentTimeMillis();
            LOGGER.info("Pipe {}@{}: start to sort all extracted resources", this.pipeName, Integer.valueOf(this.dataRegionId));
            arrayList.sort((persistentResource, persistentResource2) -> {
                return this.startIndex instanceof TimeWindowStateProgressIndex ? Long.compare(persistentResource.getFileStartTime(), persistentResource2.getFileStartTime()) : persistentResource.getProgressIndex().topologicalCompareTo(persistentResource2.getProgressIndex());
            });
            this.pendingQueue = new ArrayDeque(arrayList);
            LOGGER.info("Pipe {}@{}: finish to sort all extracted resources, took {} ms", new Object[]{this.pipeName, Integer.valueOf(this.dataRegionId), Long.valueOf(System.currentTimeMillis() - currentTimeMillis2)});
            dataRegion.writeUnlock();
        } catch (Throwable th) {
            dataRegion.writeUnlock();
            throw th;
        }
    }

    private void flushTsFilesForExtraction(DataRegion dataRegion, long j) {
        LOGGER.info("Pipe {}@{}: start to flush data region", this.pipeName, Integer.valueOf(this.dataRegionId));
        if (this.pipeName.startsWith("__consensus.")) {
            dataRegion.syncCloseAllWorkingTsFileProcessors();
            LOGGER.info("Pipe {}@{}: finish to flush data region, took {} ms", new Object[]{this.pipeName, Integer.valueOf(this.dataRegionId), Long.valueOf(System.currentTimeMillis() - j)});
            return;
        }
        synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
            long longValue = DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(Integer.valueOf(this.dataRegionId)).longValue();
            if (System.currentTimeMillis() - longValue >= PIPE_MIN_FLUSH_INTERVAL_IN_MS) {
                dataRegion.syncCloseAllWorkingTsFileProcessors();
                DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(Integer.valueOf(this.dataRegionId), Long.valueOf(System.currentTimeMillis()));
                LOGGER.info("Pipe {}@{}: finish to flush data region, took {} ms", new Object[]{this.pipeName, Integer.valueOf(this.dataRegionId), Long.valueOf(System.currentTimeMillis() - j)});
            } else {
                LOGGER.info("Pipe {}@{}: skip to flush data region, last flushed time {} ms ago", new Object[]{this.pipeName, Integer.valueOf(this.dataRegionId), Long.valueOf(System.currentTimeMillis() - longValue)});
            }
        }
    }

    private void extractTsFiles(DataRegion dataRegion, long j, List<PersistentResource> list) {
        TsFileManager tsFileManager = dataRegion.getTsFileManager();
        tsFileManager.readLock();
        try {
            int size = tsFileManager.size(true);
            int size2 = tsFileManager.size(false);
            LOGGER.info("Pipe {}@{}: start to extract historical TsFile, original sequence file count {}, original unsequence file count {}, start progress index {}", new Object[]{this.pipeName, Integer.valueOf(this.dataRegionId), Integer.valueOf(size), Integer.valueOf(size2), this.startIndex});
            Collection<? extends PersistentResource> collection = (Collection) tsFileManager.getTsFileList(true).stream().filter(tsFileResource -> {
                return !tsFileResource.isDeleted() && (!tsFileResource.isClosed() || (mayTsFileContainUnprocessedData(tsFileResource) && isTsFileResourceOverlappedWithTimeRange(tsFileResource) && isTsFileGeneratedAfterExtractionTimeLowerBound(tsFileResource) && mayTsFileResourceOverlappedWithPattern(tsFileResource)));
            }).collect(Collectors.toList());
            list.addAll(collection);
            Collection<? extends PersistentResource> collection2 = (Collection) tsFileManager.getTsFileList(false).stream().filter(tsFileResource2 -> {
                return !tsFileResource2.isDeleted() && (!tsFileResource2.isClosed() || (mayTsFileContainUnprocessedData(tsFileResource2) && isTsFileResourceOverlappedWithTimeRange(tsFileResource2) && isTsFileGeneratedAfterExtractionTimeLowerBound(tsFileResource2) && mayTsFileResourceOverlappedWithPattern(tsFileResource2)));
            }).collect(Collectors.toList());
            list.addAll(collection2);
            list.removeIf(persistentResource -> {
                try {
                    PipeDataNodeResourceManager.tsfile().pinTsFileResource((TsFileResource) persistentResource, this.shouldTransferModFile);
                    return false;
                } catch (IOException e) {
                    LOGGER.warn("Pipe: failed to pin TsFileResource {}", ((TsFileResource) persistentResource).getTsFilePath(), e);
                    return true;
                }
            });
            LOGGER.info("Pipe {}@{}: finish to extract historical TsFile, extracted sequence file count {}/{}, extracted unsequence file count {}/{}, extracted file count {}/{}, took {} ms", new Object[]{this.pipeName, Integer.valueOf(this.dataRegionId), Integer.valueOf(collection.size()), Integer.valueOf(size), Integer.valueOf(collection2.size()), Integer.valueOf(size2), Integer.valueOf(list.size()), Integer.valueOf(size + size2), Long.valueOf(System.currentTimeMillis() - j)});
            tsFileManager.readUnlock();
        } catch (Throwable th) {
            tsFileManager.readUnlock();
            throw th;
        }
    }

    private boolean mayTsFileContainUnprocessedData(TsFileResource tsFileResource) {
        if (this.startIndex instanceof TimeWindowStateProgressIndex) {
            return this.startIndex.getMinTime() <= tsFileResource.getFileEndTime();
        }
        if (!(this.startIndex instanceof StateProgressIndex)) {
            return !this.startIndex.isAfter(tsFileResource.getMaxProgressIndexAfterClose());
        }
        ProgressIndex innerProgressIndex = this.startIndex.getInnerProgressIndex();
        return (innerProgressIndex.isAfter(tsFileResource.getMaxProgressIndexAfterClose()) || innerProgressIndex.equals(tsFileResource.getMaxProgressIndexAfterClose())) ? false : true;
    }

    private boolean mayTsFileResourceOverlappedWithPattern(TsFileResource tsFileResource) {
        try {
            Map<IDeviceID, Boolean> deviceIsAlignedMapFromCache = PipeDataNodeResourceManager.tsfile().getDeviceIsAlignedMapFromCache(PipeTsFileResourceManager.getHardlinkOrCopiedFileInPipeDir(tsFileResource.getTsFile()), false);
            return (Objects.nonNull(deviceIsAlignedMapFromCache) ? deviceIsAlignedMapFromCache.keySet() : tsFileResource.getDevices()).stream().anyMatch(iDeviceID -> {
                if (!this.isModelDetected) {
                    detectModel(tsFileResource, iDeviceID);
                    this.isModelDetected = true;
                }
                return this.isTableModel ? this.tablePattern.isTableModelDataAllowedToBeCaptured() && this.tablePattern.matchesDatabase(tsFileResource.getDatabaseName()) && this.tablePattern.matchesTable(iDeviceID.getTableName()) : this.treePattern.isTreeModelDataAllowedToBeCaptured() && this.treePattern.mayOverlapWithDevice(iDeviceID);
            });
        } catch (IOException e) {
            LOGGER.warn("Pipe {}@{}: failed to get devices from TsFile {}, extract it anyway", new Object[]{this.pipeName, Integer.valueOf(this.dataRegionId), tsFileResource.getTsFilePath(), e});
            return true;
        }
    }

    private void detectModel(TsFileResource tsFileResource, IDeviceID iDeviceID) {
        this.isTableModel = ((iDeviceID instanceof PlainDeviceID) || iDeviceID.getTableName().startsWith("root.") || iDeviceID.getTableName().equals(SqlConstant.ROOT)) ? false : true;
        String databaseName = tsFileResource.getDatabaseName();
        this.isDbNameCoveredByPattern = this.isTableModel ? this.tablePattern.isTableModelDataAllowedToBeCaptured() && this.tablePattern.coversDb(databaseName) : this.treePattern.isTreeModelDataAllowedToBeCaptured() && this.treePattern.coversDb(databaseName);
    }

    private boolean isTsFileResourceOverlappedWithTimeRange(TsFileResource tsFileResource) {
        return tsFileResource.getFileEndTime() >= this.historicalDataExtractionStartTime && this.historicalDataExtractionEndTime >= tsFileResource.getFileStartTime();
    }

    private boolean isTsFileResourceCoveredByTimeRange(TsFileResource tsFileResource) {
        return this.historicalDataExtractionStartTime <= tsFileResource.getFileStartTime() && this.historicalDataExtractionEndTime >= tsFileResource.getFileEndTime();
    }

    private boolean isTsFileGeneratedAfterExtractionTimeLowerBound(TsFileResource tsFileResource) {
        try {
            return this.historicalDataExtractionTimeLowerBound <= TsFileNameGenerator.getTsFileName(tsFileResource.getTsFile().getName()).getTime();
        } catch (IOException e) {
            LOGGER.warn("Pipe {}@{}: failed to get the generation time of TsFile {}, extract it anyway (historical data extraction time lower bound: {})", new Object[]{this.pipeName, Integer.valueOf(this.dataRegionId), tsFileResource.getTsFilePath(), Long.valueOf(this.historicalDataExtractionTimeLowerBound), e});
            return true;
        }
    }

    private void extractDeletions(DeletionResourceManager deletionResourceManager, List<PersistentResource> list) {
        LOGGER.info("Pipe {}@{}: start to extract deletions", this.pipeName, Integer.valueOf(this.dataRegionId));
        long currentTimeMillis = System.currentTimeMillis();
        List<DeletionResource> allDeletionResources = deletionResourceManager.getAllDeletionResources();
        int size = allDeletionResources.size();
        allDeletionResources.stream().filter(deletionResource -> {
            return this.startIndex.isAfter(deletionResource.getProgressIndex());
        }).forEach((v0) -> {
            v0.decreaseReference();
        });
        List list2 = (List) allDeletionResources.stream().filter(deletionResource2 -> {
            return !this.startIndex.isAfter(deletionResource2.getProgressIndex());
        }).collect(Collectors.toList());
        list.addAll(list2);
        LOGGER.info("Pipe {}@{}: finish to extract deletions, extract deletions count {}/{}, took {} ms", new Object[]{this.pipeName, Integer.valueOf(this.dataRegionId), Integer.valueOf(list2.size()), Integer.valueOf(size), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
    }

    public synchronized Event supply() {
        if (!this.hasBeenStarted && StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
            start();
        }
        if (Objects.isNull(this.pendingQueue)) {
            return null;
        }
        PersistentResource poll = this.pendingQueue.poll();
        return poll == null ? supplyTerminateEvent() : poll instanceof TsFileResource ? supplyTsFileEvent((TsFileResource) poll) : supplyDeletionEvent((DeletionResource) poll);
    }

    private Event supplyTerminateEvent() {
        PipeTerminateEvent pipeTerminateEvent = new PipeTerminateEvent(this.pipeName, this.creationTime, this.pipeTaskMeta, this.dataRegionId);
        if (pipeTerminateEvent.increaseReferenceCount(PipeHistoricalDataRegionTsFileAndDeletionExtractor.class.getName())) {
            this.isTerminateSignalSent = true;
            return pipeTerminateEvent;
        }
        LOGGER.warn("Pipe {}@{}: failed to increase reference count for terminate event, will resend it", this.pipeName, Integer.valueOf(this.dataRegionId));
        return null;
    }

    private Event supplyTsFileEvent(TsFileResource tsFileResource) {
        Event pipeTsFileInsertionEvent = new PipeTsFileInsertionEvent(this.isModelDetected ? Boolean.valueOf(this.isTableModel) : null, tsFileResource.getDatabaseName(), tsFileResource, this.shouldTransferModFile, false, true, this.pipeName, this.creationTime, this.pipeTaskMeta, this.treePattern, this.tablePattern, this.userName, this.skipIfNoPrivileges, this.historicalDataExtractionStartTime, this.historicalDataExtractionEndTime);
        if ((DataRegionConsensusImpl.getInstance() instanceof PipeConsensus) && PipeConsensusProcessor.isShouldReplicate(pipeTsFileInsertionEvent)) {
            pipeTsFileInsertionEvent.setReplicateIndexForIoTV2(ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(tsFileResource.getDataRegionId()));
            LOGGER.debug("[Region{}]Set {} for event {}", new Object[]{tsFileResource.getDataRegionId(), Long.valueOf(pipeTsFileInsertionEvent.getReplicateIndexForIoTV2()), pipeTsFileInsertionEvent});
        }
        if (this.sloppyPattern || this.isDbNameCoveredByPattern) {
            pipeTsFileInsertionEvent.skipParsingPattern();
        }
        if (this.sloppyTimeRange || isTsFileResourceCoveredByTimeRange(tsFileResource)) {
            pipeTsFileInsertionEvent.skipParsingTime();
        }
        try {
            boolean increaseReferenceCount = pipeTsFileInsertionEvent.increaseReferenceCount(PipeHistoricalDataRegionTsFileAndDeletionExtractor.class.getName());
            if (!increaseReferenceCount) {
                LOGGER.warn("Pipe {}@{}: failed to increase reference count for historical tsfile event {}, will discard it", new Object[]{this.pipeName, Integer.valueOf(this.dataRegionId), pipeTsFileInsertionEvent});
            }
            Event event = increaseReferenceCount ? pipeTsFileInsertionEvent : null;
            try {
                PipeDataNodeResourceManager.tsfile().unpinTsFileResource(tsFileResource);
            } catch (IOException e) {
                LOGGER.warn("Pipe {}@{}: failed to unpin TsFileResource after creating event, original path: {}", new Object[]{this.pipeName, Integer.valueOf(this.dataRegionId), tsFileResource.getTsFilePath()});
            }
            return event;
        } catch (Throwable th) {
            try {
                PipeDataNodeResourceManager.tsfile().unpinTsFileResource(tsFileResource);
            } catch (IOException e2) {
                LOGGER.warn("Pipe {}@{}: failed to unpin TsFileResource after creating event, original path: {}", new Object[]{this.pipeName, Integer.valueOf(this.dataRegionId), tsFileResource.getTsFilePath()});
            }
            throw th;
        }
    }

    private Event supplyDeletionEvent(DeletionResource deletionResource) {
        PipeDeleteDataNodeEvent pipeDeleteDataNodeEvent = new PipeDeleteDataNodeEvent(deletionResource.getDeleteDataNode(), this.pipeName, this.creationTime, this.pipeTaskMeta, this.treePattern, this.tablePattern, this.userName, this.skipIfNoPrivileges, false);
        if (this.sloppyPattern || this.isDbNameCoveredByPattern) {
            pipeDeleteDataNodeEvent.skipParsingPattern();
        }
        if (this.sloppyTimeRange) {
            pipeDeleteDataNodeEvent.skipParsingTime();
        }
        boolean increaseReferenceCount = pipeDeleteDataNodeEvent.increaseReferenceCount(PipeHistoricalDataRegionTsFileAndDeletionExtractor.class.getName());
        if (increaseReferenceCount) {
            Optional.ofNullable(DeletionResourceManager.getInstance(String.valueOf(this.dataRegionId))).ifPresent(deletionResourceManager -> {
                pipeDeleteDataNodeEvent.setDeletionResource(deletionResourceManager.getDeletionResource(pipeDeleteDataNodeEvent.getDeleteDataNode()));
            });
        } else {
            LOGGER.warn("Pipe {}@{}: failed to increase reference count for historical deletion event {}, will discard it", new Object[]{this.pipeName, Integer.valueOf(this.dataRegionId), pipeDeleteDataNodeEvent});
        }
        if (increaseReferenceCount) {
            return pipeDeleteDataNodeEvent;
        }
        return null;
    }

    @Override // org.apache.iotdb.db.pipe.extractor.dataregion.historical.PipeHistoricalDataRegionExtractor
    public synchronized boolean hasConsumedAll() {
        return this.hasBeenStarted && (Objects.isNull(this.pendingQueue) || (this.pendingQueue.isEmpty() && this.isTerminateSignalSent));
    }

    @Override // org.apache.iotdb.db.pipe.extractor.dataregion.historical.PipeHistoricalDataRegionExtractor
    public int getPendingQueueSize() {
        if (Objects.nonNull(this.pendingQueue)) {
            return this.pendingQueue.size();
        }
        return 0;
    }

    public synchronized void close() {
        if (Objects.nonNull(this.pendingQueue)) {
            this.pendingQueue.forEach(persistentResource -> {
                if (persistentResource instanceof TsFileResource) {
                    try {
                        PipeDataNodeResourceManager.tsfile().unpinTsFileResource((TsFileResource) persistentResource);
                    } catch (IOException e) {
                        LOGGER.warn("Pipe {}@{}: failed to unpin TsFileResource after dropping pipe, original path: {}", new Object[]{this.pipeName, Integer.valueOf(this.dataRegionId), ((TsFileResource) persistentResource).getTsFilePath()});
                    }
                }
            });
            this.pendingQueue.clear();
            this.pendingQueue = null;
        }
    }
}
