package org.apache.iotdb.db.pipe.agent.task;

import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.pipe.agent.task.PipeTask;
import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeType;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.agent.task.builder.PipeDataNodeBuilder;
import org.apache.iotdb.db.pipe.agent.task.builder.PipeDataNodeTaskBuilder;
import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.pipe.extractor.schemaregion.SchemaRegionListeningFilter;
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeOperateSchemaQueueNode;
import org.apache.iotdb.db.schemaengine.SchemaEngine;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import org.apache.iotdb.db.storageengine.dataregion.wal.node.WALNode;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.SystemMetric;
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.thrift.TException;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.class */
public class PipeDataNodeTaskAgent extends PipeTaskAgent {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeDataNodeTaskAgent.class);
    protected static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
    private static final AtomicLong LAST_FORCED_RESTART_TIME = new AtomicLong(System.currentTimeMillis());
    private static final Map<String, AtomicLong> PIPE_NAME_TO_LAST_RESTART_TIME_MAP = new ConcurrentHashMap();

    protected boolean isShutdown() {
        return PipeDataNodeAgent.runtime().isShutdown();
    }

    protected Map<Integer, PipeTask> buildPipeTasks(PipeMeta pipeMeta) throws IllegalPathException {
        return new PipeDataNodeBuilder(pipeMeta).build();
    }

    protected void startPipe(String str, long j) {
        PipeMeta pipeMeta = this.pipeMetaKeeper.getPipeMeta(str);
        PipeStatus pipeStatus = (PipeStatus) pipeMeta.getRuntimeMeta().getStatus().get();
        if (PipeStatus.STOPPED.equals(pipeStatus) || pipeStatus == null) {
            restartPipeToReloadResourceIfNeeded(pipeMeta);
        }
        super.startPipe(str, j);
    }

    private void restartPipeToReloadResourceIfNeeded(PipeMeta pipeMeta) {
        if (System.currentTimeMillis() - pipeMeta.getStaticMeta().getCreationTime() < PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs()) {
            return;
        }
        AtomicLong atomicLong = PIPE_NAME_TO_LAST_RESTART_TIME_MAP.get(pipeMeta.getStaticMeta().getPipeName());
        if (atomicLong != null && System.currentTimeMillis() - atomicLong.get() < PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs()) {
            LOGGER.info("Skipping reload resource for stopped pipe {} before starting it because reloading resource is too frequent.", pipeMeta.getStaticMeta().getPipeName());
            return;
        }
        if (PIPE_NAME_TO_LAST_RESTART_TIME_MAP.isEmpty()) {
            LOGGER.info("Flushing storage engine before restarting pipe {}.", pipeMeta.getStaticMeta().getPipeName());
            long currentTimeMillis = System.currentTimeMillis();
            StorageEngine.getInstance().syncCloseAllProcessor();
            WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes();
            LOGGER.info("Finished flushing storage engine, time cost: {} ms.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        }
        restartStuckPipe(pipeMeta);
        LOGGER.info("Reloaded resource for stopped pipe {} before starting it.", pipeMeta.getStaticMeta().getPipeName());
    }

    protected void createPipeTask(int i, PipeStaticMeta pipeStaticMeta, PipeTaskMeta pipeTaskMeta) throws IllegalPathException {
        if (pipeTaskMeta.getLeaderNodeId() == CONFIG.getDataNodeId()) {
            PipeParameters extractorParameters = pipeStaticMeta.getExtractorParameters();
            DataRegionId dataRegionId = new DataRegionId(i);
            boolean z = StorageEngine.getInstance().getAllDataRegionIds().contains(dataRegionId) && DataRegionListeningFilter.shouldDataRegionBeListened(extractorParameters, dataRegionId);
            boolean z2 = SchemaEngine.getInstance().getAllSchemaRegionIds().contains(new SchemaRegionId(i)) && !SchemaRegionListeningFilter.parseListeningPlanTypeSet(extractorParameters).isEmpty();
            if (z || z2) {
                PipeDataNodeTask build = new PipeDataNodeTaskBuilder(pipeStaticMeta, i, pipeTaskMeta).build();
                build.create();
                this.pipeTaskManager.addPipeTask(pipeStaticMeta, i, build);
            }
        }
        this.pipeMetaKeeper.getPipeMeta(pipeStaticMeta.getPipeName()).getRuntimeMeta().getConsensusGroupId2TaskMetaMap().put(Integer.valueOf(i), pipeTaskMeta);
    }

    public List<TPushPipeMetaRespExceptionMessage> handlePipeMetaChangesInternal(List<PipeMeta> list) {
        if (isShutdown()) {
            return Collections.emptyList();
        }
        List<TPushPipeMetaRespExceptionMessage> handlePipeMetaChangesInternal = super.handlePipeMetaChangesInternal(list);
        try {
            closeSchemaRegionListeningQueueIfNecessary(clearSchemaRegionListeningQueueIfNecessary(list), handlePipeMetaChangesInternal);
        } catch (Exception e) {
            LOGGER.warn("Failed to clear/close the schema region listening queue, because {}. Will wait until success or the region's state machine is stopped.", e.getMessage());
            handlePipeMetaChangesInternal.add(new TPushPipeMetaRespExceptionMessage("", e.getMessage(), System.currentTimeMillis()));
        }
        return handlePipeMetaChangesInternal;
    }

    private Set<Integer> clearSchemaRegionListeningQueueIfNecessary(List<PipeMeta> list) throws IllegalPathException {
        HashMap hashMap = new HashMap();
        for (PipeMeta pipeMeta : list) {
            if (!SchemaRegionListeningFilter.parseListeningPlanTypeSet(pipeMeta.getStaticMeta().getExtractorParameters()).isEmpty()) {
                ConcurrentMap consensusGroupId2TaskMetaMap = pipeMeta.getRuntimeMeta().getConsensusGroupId2TaskMetaMap();
                Iterator<SchemaRegionId> it = SchemaEngine.getInstance().getAllSchemaRegionIds().iterator();
                while (it.hasNext()) {
                    int id = it.next().getId();
                    PipeTaskMeta pipeTaskMeta = (PipeTaskMeta) consensusGroupId2TaskMetaMap.get(Integer.valueOf(id));
                    if (pipeTaskMeta != null) {
                        MetaProgressIndex progressIndex = pipeTaskMeta.getProgressIndex();
                        if (!(progressIndex instanceof MetaProgressIndex)) {
                            hashMap.put(Integer.valueOf(id), 0L);
                        } else if (progressIndex.getIndex() + 1 < ((Long) hashMap.getOrDefault(Integer.valueOf(id), Long.valueOf(WALNode.DEFAULT_SAFELY_DELETED_SEARCH_INDEX))).longValue()) {
                            hashMap.put(Integer.valueOf(id), Long.valueOf(progressIndex.getIndex() + 1));
                        }
                    }
                }
            }
        }
        hashMap.forEach((num, l) -> {
            PipeDataNodeAgent.runtime().schemaListener(new SchemaRegionId(num.intValue())).removeBefore(l.longValue());
        });
        return hashMap.keySet();
    }

    private void closeSchemaRegionListeningQueueIfNecessary(Set<Integer> set, List<TPushPipeMetaRespExceptionMessage> list) {
        if (list.isEmpty()) {
            PipeDataNodeAgent.runtime().listeningSchemaRegionIds().stream().filter(schemaRegionId -> {
                return !set.contains(Integer.valueOf(schemaRegionId.getId())) && PipeDataNodeAgent.runtime().isSchemaLeaderReady(schemaRegionId);
            }).forEach(schemaRegionId2 -> {
                try {
                    SchemaRegionConsensusImpl.getInstance().write(schemaRegionId2, new PipeOperateSchemaQueueNode(new PlanNodeId(""), false));
                } catch (ConsensusException e) {
                    throw new PipeException("Failed to close listening queue for SchemaRegion " + schemaRegionId2 + ", because " + e.getMessage(), e);
                }
            });
        }
    }

    protected void thawRate(String str, long j) {
        PipeDataNodeRemainingEventAndTimeMetrics.getInstance().thawRate(str + "_" + j);
    }

    protected void freezeRate(String str, long j) {
        PipeDataNodeRemainingEventAndTimeMetrics.getInstance().freezeRate(str + "_" + j);
    }

    protected boolean dropPipe(String str, long j) {
        if (!super.dropPipe(str, j)) {
            return false;
        }
        String str2 = str + "_" + j;
        PipeTsFileToTabletsMetrics.getInstance().deregister(str2);
        PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(str2);
        if (!str.startsWith("__consensus.")) {
            return true;
        }
        PipeDataNodeAgent.receiver().pipeConsensus().handleDropPipeConsensusTask(new ConsensusPipeName(str));
        return true;
    }

    protected boolean dropPipe(String str) {
        PipeMeta pipeMeta = this.pipeMetaKeeper.getPipeMeta(str);
        if (!super.dropPipe(str)) {
            return false;
        }
        if (Objects.nonNull(pipeMeta)) {
            String str2 = str + "_" + pipeMeta.getStaticMeta().getCreationTime();
            PipeTsFileToTabletsMetrics.getInstance().deregister(str2);
            PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(str2);
        }
        if (!str.startsWith("__consensus.")) {
            return true;
        }
        PipeDataNodeAgent.receiver().pipeConsensus().handleDropPipeConsensusTask(new ConsensusPipeName(str));
        return true;
    }

    public void stopAllPipesWithCriticalException() {
        super.stopAllPipesWithCriticalException(CONFIG.getDataNodeId());
    }

    public void collectPipeMetaList(TDataNodeHeartbeatResp tDataNodeHeartbeatResp) throws TException {
        if (tryReadLockWithTimeOut(10L)) {
            try {
                collectPipeMetaListInternal(tDataNodeHeartbeatResp);
            } finally {
                releaseReadLock();
            }
        }
    }

    private void collectPipeMetaListInternal(TDataNodeHeartbeatResp tDataNodeHeartbeatResp) throws TException {
        if (PipeDataNodeAgent.runtime().isShutdown()) {
            return;
        }
        Set set = (Set) StorageEngine.getInstance().getAllDataRegionIds().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toSet());
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        ArrayList arrayList4 = new ArrayList();
        try {
            Optional schedule = PipeDataNodeResourceManager.log().schedule(PipeDataNodeTaskAgent.class, PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(), PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(), this.pipeMetaKeeper.getPipeMetaCount());
            for (PipeMeta pipeMeta : this.pipeMetaKeeper.getPipeMetaList()) {
                arrayList.add(pipeMeta.serialize());
                PipeStaticMeta staticMeta = pipeMeta.getStaticMeta();
                Map pipeTasks = this.pipeTaskManager.getPipeTasks(staticMeta);
                boolean z = pipeTasks == null || pipeTasks.entrySet().stream().filter(entry -> {
                    return set.contains(entry.getKey());
                }).allMatch(entry2 -> {
                    return ((PipeDataNodeTask) entry2.getValue()).isCompleted();
                });
                String stringOrDefault = pipeMeta.getStaticMeta().getExtractorParameters().getStringOrDefault(Arrays.asList("extractor.mode", "source.mode"), "live");
                boolean z2 = z && (((Boolean) DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(pipeMeta.getStaticMeta().getExtractorParameters()).getLeft()).booleanValue() && (stringOrDefault.equalsIgnoreCase("query") || stringOrDefault.equalsIgnoreCase("snapshot")));
                Pair<Long, Double> remainingEventAndTime = PipeDataNodeRemainingEventAndTimeMetrics.getInstance().getRemainingEventAndTime(staticMeta.getPipeName(), staticMeta.getCreationTime());
                arrayList2.add(Boolean.valueOf(z2));
                arrayList3.add((Long) remainingEventAndTime.getLeft());
                arrayList4.add((Double) remainingEventAndTime.getRight());
                schedule.ifPresent(logger -> {
                    logger.info("Reporting pipe meta: {}, isCompleted: {}, remainingEventCount: {}, estimatedRemainingTime: {}", new Object[]{pipeMeta.coreReportMessage(), Boolean.valueOf(z2), remainingEventAndTime.getLeft(), remainingEventAndTime.getRight()});
                });
            }
            LOGGER.info("Reported {} pipe metas.", Integer.valueOf(arrayList.size()));
            tDataNodeHeartbeatResp.setPipeMetaList(arrayList);
            tDataNodeHeartbeatResp.setPipeCompletedList(arrayList2);
            tDataNodeHeartbeatResp.setPipeRemainingEventCountList(arrayList3);
            tDataNodeHeartbeatResp.setPipeRemainingTimeList(arrayList4);
            PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(true);
        } catch (IOException | IllegalPathException e) {
            throw new TException(e);
        }
    }

    protected void collectPipeMetaListInternal(TPipeHeartbeatReq tPipeHeartbeatReq, TPipeHeartbeatResp tPipeHeartbeatResp) throws TException {
        if (PipeDataNodeAgent.runtime().isShutdown()) {
            return;
        }
        LOGGER.info("Received pipe heartbeat request {} from config node.", Long.valueOf(tPipeHeartbeatReq.heartbeatId));
        Set set = (Set) StorageEngine.getInstance().getAllDataRegionIds().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toSet());
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        ArrayList arrayList4 = new ArrayList();
        try {
            Optional schedule = PipeDataNodeResourceManager.log().schedule(PipeDataNodeTaskAgent.class, PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(), PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(), this.pipeMetaKeeper.getPipeMetaCount());
            for (PipeMeta pipeMeta : this.pipeMetaKeeper.getPipeMetaList()) {
                arrayList.add(pipeMeta.serialize());
                PipeStaticMeta staticMeta = pipeMeta.getStaticMeta();
                Map pipeTasks = this.pipeTaskManager.getPipeTasks(staticMeta);
                boolean z = (pipeTasks == null || pipeTasks.entrySet().stream().filter(entry -> {
                    return set.contains(entry.getKey());
                }).allMatch(entry2 -> {
                    return ((PipeDataNodeTask) entry2.getValue()).isCompleted();
                })) && (((Boolean) DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(pipeMeta.getStaticMeta().getExtractorParameters()).getLeft()).booleanValue() && isSnapshotMode(pipeMeta.getStaticMeta().getExtractorParameters()));
                Pair<Long, Double> remainingEventAndTime = PipeDataNodeRemainingEventAndTimeMetrics.getInstance().getRemainingEventAndTime(staticMeta.getPipeName(), staticMeta.getCreationTime());
                arrayList2.add(Boolean.valueOf(z));
                arrayList3.add((Long) remainingEventAndTime.getLeft());
                arrayList4.add((Double) remainingEventAndTime.getRight());
                schedule.ifPresent(logger -> {
                    logger.info("Reporting pipe meta: {}, isCompleted: {}, remainingEventCount: {}, estimatedRemainingTime: {}", new Object[]{pipeMeta.coreReportMessage(), Boolean.valueOf(z), remainingEventAndTime.getLeft(), remainingEventAndTime.getRight()});
                });
            }
            LOGGER.info("Reported {} pipe metas.", Integer.valueOf(arrayList.size()));
            tPipeHeartbeatResp.setPipeMetaList(arrayList);
            tPipeHeartbeatResp.setPipeCompletedList(arrayList2);
            tPipeHeartbeatResp.setPipeRemainingEventCountList(arrayList3);
            tPipeHeartbeatResp.setPipeRemainingTimeList(arrayList4);
            PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(true);
        } catch (IOException | IllegalPathException e) {
            throw new TException(e);
        }
    }

    public void restartAllStuckPipes() {
        List<String> removeOutdatedPipeInfoFromLastRestartTimeMap = removeOutdatedPipeInfoFromLastRestartTimeMap();
        if (!removeOutdatedPipeInfoFromLastRestartTimeMap.isEmpty()) {
            long currentTimeMillis = System.currentTimeMillis();
            LOGGER.info("Pipes {} now can dynamically adjust their extraction strategies. Start to flush storage engine to trigger the adjustment.", removeOutdatedPipeInfoFromLastRestartTimeMap);
            StorageEngine.getInstance().syncCloseAllProcessor();
            WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes();
            LOGGER.info("Finished flushing storage engine, time cost: {} ms.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            LOGGER.info("Skipping restarting pipes this round because of the dynamic flushing.");
            return;
        }
        if (tryWriteLockWithTimeOut(5L)) {
            try {
                Set<PipeMeta> findAllStuckPipes = findAllStuckPipes();
                releaseWriteLock();
                findAllStuckPipes.removeIf(pipeMeta -> {
                    AtomicLong atomicLong = PIPE_NAME_TO_LAST_RESTART_TIME_MAP.get(pipeMeta.getStaticMeta().getPipeName());
                    return atomicLong != null && System.currentTimeMillis() - atomicLong.get() < PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs();
                });
                findAllStuckPipes.forEach(this::restartStuckPipe);
            } catch (Throwable th) {
                releaseWriteLock();
                throw th;
            }
        }
    }

    private List<String> removeOutdatedPipeInfoFromLastRestartTimeMap() {
        ArrayList arrayList = new ArrayList();
        PIPE_NAME_TO_LAST_RESTART_TIME_MAP.entrySet().removeIf(entry -> {
            AtomicLong atomicLong = (AtomicLong) entry.getValue();
            boolean z = atomicLong == null || PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs() <= System.currentTimeMillis() - atomicLong.get();
            if (z) {
                arrayList.add((String) entry.getKey());
            }
            return z;
        });
        return arrayList;
    }

    private Set<PipeMeta> findAllStuckPipes() {
        HashSet hashSet = new HashSet();
        if (System.currentTimeMillis() - LAST_FORCED_RESTART_TIME.get() > PipeConfig.getInstance().getPipeSubtaskExecutorForcedRestartIntervalMs()) {
            LAST_FORCED_RESTART_TIME.set(System.currentTimeMillis());
            Iterator it = this.pipeMetaKeeper.getPipeMetaList().iterator();
            while (it.hasNext()) {
                hashSet.add((PipeMeta) it.next());
            }
            if (!hashSet.isEmpty()) {
                LOGGER.warn("All {} pipe(s) will be restarted because of forced restart policy.", Integer.valueOf(hashSet.size()));
            }
            return hashSet;
        }
        long totalLinkedButDeletedTsFileResourceRamSize = PipeDataNodeResourceManager.tsfile().getTotalLinkedButDeletedTsFileResourceRamSize();
        long allFloatingMemoryUsageInByte = getAllFloatingMemoryUsageInByte();
        long totalFloatingMemorySizeInBytes = PipeDataNodeResourceManager.memory().getTotalFloatingMemorySizeInBytes();
        if (allFloatingMemoryUsageInByte + totalLinkedButDeletedTsFileResourceRamSize >= totalFloatingMemorySizeInBytes) {
            Iterator it2 = this.pipeMetaKeeper.getPipeMetaList().iterator();
            while (it2.hasNext()) {
                hashSet.add((PipeMeta) it2.next());
            }
            if (!hashSet.isEmpty()) {
                LOGGER.warn("All {} pipe(s) will be restarted because linked but deleted tsFiles' resource size {} and all insertNode's size {} exceeds limit {}.", new Object[]{Integer.valueOf(hashSet.size()), Long.valueOf(totalLinkedButDeletedTsFileResourceRamSize), Long.valueOf(allFloatingMemoryUsageInByte), Long.valueOf(totalFloatingMemorySizeInBytes)});
            }
            return hashSet;
        }
        Map<String, IoTDBDataRegionExtractor> extractorMap = PipeDataRegionExtractorMetrics.getInstance().getExtractorMap();
        for (PipeMeta pipeMeta : this.pipeMetaKeeper.getPipeMetaList()) {
            String pipeName = pipeMeta.getStaticMeta().getPipeName();
            List list = (List) extractorMap.values().stream().filter(ioTDBDataRegionExtractor -> {
                return ioTDBDataRegionExtractor.getPipeName().equals(pipeName) && ioTDBDataRegionExtractor.shouldExtractInsertion();
            }).collect(Collectors.toList());
            if (!list.isEmpty()) {
                if ((CONFIG.isEnableSeqSpaceCompaction() || CONFIG.isEnableUnseqSpaceCompaction() || CONFIG.isEnableCrossSpaceCompaction()) && mayDeletedTsFileSizeReachDangerousThreshold()) {
                    LOGGER.warn("Pipe {} needs to restart because too many TsFiles are out-of-date.", pipeMeta.getStaticMeta());
                    hashSet.add(pipeMeta);
                } else if (((IoTDBDataRegionExtractor) list.get(0)).isStreamMode() && list.stream().anyMatch((v0) -> {
                    return v0.hasConsumedAllHistoricalTsFiles();
                }) && (mayMemTablePinnedCountReachDangerousThreshold() || mayWalSizeReachThrottleThreshold())) {
                    LOGGER.warn("Pipe {} needs to restart because too many memtables are pinned. mayMemTablePinnedCountReachDangerousThreshold: {}, mayWalSizeReachThrottleThreshold: {}", new Object[]{pipeMeta.getStaticMeta(), Boolean.valueOf(mayMemTablePinnedCountReachDangerousThreshold()), Boolean.valueOf(mayWalSizeReachThrottleThreshold())});
                    hashSet.add(pipeMeta);
                }
            }
        }
        return hashSet;
    }

    private boolean mayDeletedTsFileSizeReachDangerousThreshold() {
        try {
            long totalLinkedButDeletedTsfileSize = PipeDataNodeResourceManager.tsfile().getTotalLinkedButDeletedTsfileSize();
            double value = MetricService.getInstance().getAutoGauge(SystemMetric.SYS_DISK_TOTAL_SPACE.toString(), MetricLevel.CORE, new String[]{Tag.NAME.toString(), "system"}).getValue();
            if (totalLinkedButDeletedTsfileSize > 0 && value > 0.0d) {
                if (totalLinkedButDeletedTsfileSize > PipeConfig.getInstance().getPipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage() * value) {
                    return true;
                }
            }
            return false;
        } catch (Exception e) {
            LOGGER.warn("Failed to judge if deleted TsFile size reaches dangerous threshold.", e);
            return false;
        }
    }

    private boolean mayMemTablePinnedCountReachDangerousThreshold() {
        return PipeDataNodeResourceManager.wal().getPinnedWalCount() >= (5 * PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount()) * StorageEngine.getInstance().getDataRegionNumber();
    }

    private boolean mayWalSizeReachThrottleThreshold() {
        return 3 * WALManager.getInstance().getTotalDiskUsage() > 2 * CONFIG.getThrottleThreshold();
    }

    private void restartStuckPipe(PipeMeta pipeMeta) {
        LOGGER.warn("Pipe {} will be restarted because it is stuck or has encountered issues such as data backlog or being stopped for too long.", pipeMeta.getStaticMeta());
        acquireWriteLock();
        try {
            try {
                long currentTimeMillis = System.currentTimeMillis();
                PipeMeta deepCopy4TaskAgent = pipeMeta.deepCopy4TaskAgent();
                handleDropPipe(pipeMeta.getStaticMeta().getPipeName());
                long currentTimeMillis2 = System.currentTimeMillis();
                PIPE_NAME_TO_LAST_RESTART_TIME_MAP.computeIfAbsent(pipeMeta.getStaticMeta().getPipeName(), str -> {
                    return new AtomicLong(currentTimeMillis2);
                }).set(currentTimeMillis2);
                handleSinglePipeMetaChanges(deepCopy4TaskAgent);
                LOGGER.warn("Pipe {} was restarted because of stuck or data backlog, time cost: {} ms.", deepCopy4TaskAgent.getStaticMeta(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                releaseWriteLock();
            } catch (Exception e) {
                LOGGER.warn("Failed to restart stuck pipe {}.", pipeMeta.getStaticMeta(), e);
                releaseWriteLock();
            }
        } catch (Throwable th) {
            releaseWriteLock();
            throw th;
        }
    }

    public boolean isPipeTaskCurrentlyRestarted(String str) {
        return PIPE_NAME_TO_LAST_RESTART_TIME_MAP.containsKey(str);
    }

    public void markCompleted(String str, int i) {
        acquireWriteLock();
        try {
            if (this.pipeMetaKeeper.containsPipeMeta(str)) {
                PipeDataNodeTask pipeDataNodeTask = (PipeDataNodeTask) this.pipeTaskManager.getPipeTask(this.pipeMetaKeeper.getPipeMeta(str).getStaticMeta(), i);
                if (Objects.nonNull(pipeDataNodeTask)) {
                    pipeDataNodeTask.markCompleted();
                }
            }
        } finally {
            releaseWriteLock();
        }
    }

    public Set<Integer> getPipeTaskRegionIdSet(String str, long j) {
        PipeMeta pipeMeta = this.pipeMetaKeeper.getPipeMeta(str);
        return (pipeMeta == null || pipeMeta.getStaticMeta().getCreationTime() != j) ? Collections.emptySet() : pipeMeta.getRuntimeMeta().getConsensusGroupId2TaskMetaMap().keySet();
    }

    public boolean hasPipeReleaseRegionRelatedResource(int i) {
        if (!tryReadLockWithTimeOut(10L)) {
            LOGGER.warn("Failed to check if pipe has release region related resource with consensus group id: {}.", Integer.valueOf(i));
            return false;
        }
        try {
            return !this.pipeTaskManager.hasPipeTaskInConsensusGroup(i);
        } finally {
            releaseReadLock();
        }
    }

    private boolean isSnapshotMode(PipeParameters pipeParameters) {
        boolean z;
        if (pipeParameters.hasAnyAttributes(new String[]{"extractor.mode.snapshot", "source.mode.snapshot"})) {
            z = pipeParameters.getBooleanOrDefault(Arrays.asList("extractor.mode.snapshot", "source.mode.snapshot"), false);
        } else {
            String stringOrDefault = pipeParameters.getStringOrDefault(Arrays.asList("extractor.mode", "source.mode"), "live");
            z = stringOrDefault.equalsIgnoreCase("snapshot") || stringOrDefault.equalsIgnoreCase("query");
        }
        return z;
    }

    public ProgressIndex getPipeTaskProgressIndex(String str, int i) {
        if (!tryReadLockWithTimeOut(10L)) {
            throw new PipeException(String.format("Failed to get pipe task progress index with pipe name: %s, consensus group id %s.", str, Integer.valueOf(i)));
        }
        try {
            if (!this.pipeMetaKeeper.containsPipeMeta(str)) {
                throw new PipeException("Pipe meta not found: " + str);
            }
            ProgressIndex progressIndex = ((PipeTaskMeta) this.pipeMetaKeeper.getPipeMeta(str).getRuntimeMeta().getConsensusGroupId2TaskMetaMap().get(Integer.valueOf(i))).getProgressIndex();
            releaseReadLock();
            return progressIndex;
        } catch (Throwable th) {
            releaseReadLock();
            throw th;
        }
    }

    public Map<ConsensusPipeName, PipeStatus> getAllConsensusPipe() {
        if (!tryReadLockWithTimeOut(10L)) {
            throw new PipeException("Failed to get all consensus pipe.");
        }
        try {
            return (Map) StreamSupport.stream(this.pipeMetaKeeper.getPipeMetaList().spliterator(), false).filter(pipeMeta -> {
                return PipeType.CONSENSUS.equals(pipeMeta.getStaticMeta().getPipeType());
            }).collect(ImmutableMap.toImmutableMap(pipeMeta2 -> {
                return new ConsensusPipeName(pipeMeta2.getStaticMeta().getPipeName());
            }, pipeMeta3 -> {
                return (PipeStatus) pipeMeta3.getRuntimeMeta().getStatus().get();
            }));
        } finally {
            releaseReadLock();
        }
    }
}
