package org.apache.iotdb.confignode.manager.pipe.agent.task;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.pipe.agent.task.PipeTask;
import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
import org.apache.iotdb.confignode.manager.pipe.extractor.ConfigRegionListeningFilter;
import org.apache.iotdb.confignode.manager.pipe.metric.PipeConfigNodeRemainingTimeMetrics;
import org.apache.iotdb.confignode.manager.pipe.metric.PipeConfigRegionExtractorMetrics;
import org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager;
import org.apache.iotdb.confignode.procedure.impl.testonly.CreateManyDatabasesProcedure;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.class */
public class PipeConfigNodeTaskAgent extends PipeTaskAgent {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeConfigNodeTaskAgent.class);

    protected boolean isShutdown() {
        return PipeConfigNodeAgent.runtime().isShutdown();
    }

    protected void thawRate(String str, long j) {
        PipeConfigNodeRemainingTimeMetrics.getInstance().thawRate(str + "_" + j);
    }

    protected void freezeRate(String str, long j) {
        PipeConfigNodeRemainingTimeMetrics.getInstance().freezeRate(str + "_" + j);
    }

    protected Map<Integer, PipeTask> buildPipeTasks(PipeMeta pipeMeta) throws IllegalPathException {
        return new PipeConfigNodeTaskBuilder(pipeMeta).build();
    }

    protected void createPipeTask(int i, PipeStaticMeta pipeStaticMeta, PipeTaskMeta pipeTaskMeta) throws IllegalPathException {
        if (i == Integer.MIN_VALUE && pipeTaskMeta.getLeaderNodeId() == ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId() && !ConfigRegionListeningFilter.parseListeningPlanTypeSet(pipeStaticMeta.getExtractorParameters()).isEmpty()) {
            PipeConfigNodeTask pipeConfigNodeTask = new PipeConfigNodeTask(new PipeConfigNodeTaskStage(pipeStaticMeta.getPipeName(), pipeStaticMeta.getCreationTime(), pipeStaticMeta.getExtractorParameters().getAttribute(), pipeStaticMeta.getProcessorParameters().getAttribute(), pipeStaticMeta.getConnectorParameters().getAttribute(), pipeTaskMeta));
            pipeConfigNodeTask.create();
            this.pipeTaskManager.addPipeTask(pipeStaticMeta, i, pipeConfigNodeTask);
        }
        this.pipeMetaKeeper.getPipeMeta(pipeStaticMeta.getPipeName()).getRuntimeMeta().getConsensusGroupId2TaskMetaMap().put(Integer.valueOf(i), pipeTaskMeta);
    }

    public void stopAllPipesWithCriticalException() {
        super.stopAllPipesWithCriticalException(ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId());
    }

    protected TPushPipeMetaRespExceptionMessage handleSinglePipeMetaChangesInternal(PipeMeta pipeMeta) {
        try {
            if (PipeConfigNodeAgent.runtime().isLeaderReady()) {
                return super.handleSinglePipeMetaChangesInternal(pipeMeta.deepCopy4TaskAgent());
            }
            return null;
        } catch (Exception e) {
            return new TPushPipeMetaRespExceptionMessage(pipeMeta.getStaticMeta().getPipeName(), e.getMessage(), System.currentTimeMillis());
        }
    }

    protected TPushPipeMetaRespExceptionMessage handleDropPipeInternal(String str) {
        if (PipeConfigNodeAgent.runtime().isLeaderReady()) {
            return super.handleDropPipeInternal(str);
        }
        return null;
    }

    protected List<TPushPipeMetaRespExceptionMessage> handlePipeMetaChangesInternal(List<PipeMeta> list) {
        if (isShutdown() || !PipeConfigNodeAgent.runtime().isLeaderReady()) {
            return Collections.emptyList();
        }
        try {
            List<TPushPipeMetaRespExceptionMessage> handlePipeMetaChangesInternal = super.handlePipeMetaChangesInternal((List) list.stream().map(pipeMeta -> {
                try {
                    return pipeMeta.deepCopy4TaskAgent();
                } catch (Exception e) {
                    throw new PipeException("failed to deep copy pipeMeta", e);
                }
            }).collect(Collectors.toList()));
            clearConfigRegionListeningQueueIfNecessary(list);
            return handlePipeMetaChangesInternal;
        } catch (Exception e) {
            throw new PipeException("failed to handle pipe meta changes", e);
        }
    }

    private void clearConfigRegionListeningQueueIfNecessary(List<PipeMeta> list) {
        AtomicLong atomicLong = new AtomicLong(CreateManyDatabasesProcedure.SLEEP_FOREVER);
        Iterator<PipeMeta> it = list.iterator();
        while (it.hasNext()) {
            ConcurrentMap consensusGroupId2TaskMetaMap = it.next().getRuntimeMeta().getConsensusGroupId2TaskMetaMap();
            for (Integer num : consensusGroupId2TaskMetaMap.keySet()) {
                if (num.intValue() == Integer.MIN_VALUE) {
                    MetaProgressIndex progressIndex = ((PipeTaskMeta) consensusGroupId2TaskMetaMap.get(num)).getProgressIndex();
                    if (!(progressIndex instanceof MetaProgressIndex)) {
                        atomicLong.set(0L);
                    } else if (progressIndex.getIndex() + 1 < atomicLong.get()) {
                        atomicLong.set(progressIndex.getIndex() + 1);
                    }
                }
            }
        }
        if (atomicLong.get() < CreateManyDatabasesProcedure.SLEEP_FOREVER) {
            PipeConfigNodeAgent.runtime().listener().removeBefore(atomicLong.get());
        }
    }

    protected void collectPipeMetaListInternal(TPipeHeartbeatReq tPipeHeartbeatReq, TPipeHeartbeatResp tPipeHeartbeatResp) throws TException {
        if (isShutdown() || !PipeConfigNodeAgent.runtime().isLeaderReady()) {
            return;
        }
        LOGGER.info("Received pipe heartbeat request {} from config coordinator.", Long.valueOf(tPipeHeartbeatReq.heartbeatId));
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        try {
            Optional schedule = PipeConfigNodeResourceManager.log().schedule(PipeConfigNodeTaskAgent.class, PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(), PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(), this.pipeMetaKeeper.getPipeMetaCount());
            for (PipeMeta pipeMeta : this.pipeMetaKeeper.getPipeMetaList()) {
                arrayList.add(pipeMeta.serialize());
                PipeStaticMeta staticMeta = pipeMeta.getStaticMeta();
                long remainingEventCount = PipeConfigRegionExtractorMetrics.getInstance().getRemainingEventCount(staticMeta.getPipeName(), staticMeta.getCreationTime());
                double remainingTime = PipeConfigNodeRemainingTimeMetrics.getInstance().getRemainingTime(staticMeta.getPipeName(), staticMeta.getCreationTime());
                arrayList2.add(Long.valueOf(remainingEventCount));
                arrayList3.add(Double.valueOf(remainingTime));
                schedule.ifPresent(logger -> {
                    logger.info("Reporting pipe meta: {}, remainingEventCount: {}, estimatedRemainingTime: {}", new Object[]{pipeMeta.coreReportMessage(), Long.valueOf(remainingEventCount), Double.valueOf(remainingTime)});
                });
            }
            LOGGER.info("Reported {} pipe metas.", Integer.valueOf(arrayList.size()));
            tPipeHeartbeatResp.setPipeMetaList(arrayList);
            tPipeHeartbeatResp.setPipeRemainingEventCountList(arrayList2);
            tPipeHeartbeatResp.setPipeRemainingTimeList(arrayList3);
        } catch (IOException e) {
            throw new TException(e);
        }
    }
}
