package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat;

import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
import org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
import org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.class */
public class PipeHeartbeatScheduler {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeHeartbeatScheduler.class);
    private static final boolean IS_SEPERATED_PIPE_HEARTBEAT_ENABLED = PipeConfig.getInstance().isSeperatedPipeHeartbeatEnabled();
    private static final long HEARTBEAT_INTERVAL_SECONDS = PipeConfig.getInstance().getPipeHeartbeatIntervalSecondsForCollectingPipeMeta();
    private static final ScheduledExecutorService HEARTBEAT_EXECUTOR = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(ThreadName.PIPE_RUNTIME_HEARTBEAT.getName());
    private final ConfigManager configManager;
    private final PipeHeartbeatParser pipeHeartbeatParser;
    private Future<?> heartbeatFuture;

    public PipeHeartbeatScheduler(ConfigManager configManager) {
        this.configManager = configManager;
        this.pipeHeartbeatParser = new PipeHeartbeatParser(configManager);
    }

    public synchronized void start() {
        if (IS_SEPERATED_PIPE_HEARTBEAT_ENABLED && this.heartbeatFuture == null) {
            this.heartbeatFuture = ScheduledExecutorUtil.safelyScheduleWithFixedDelay(HEARTBEAT_EXECUTOR, this::heartbeat, HEARTBEAT_INTERVAL_SECONDS, HEARTBEAT_INTERVAL_SECONDS, TimeUnit.SECONDS);
            LOGGER.info("PipeHeartbeat is started successfully.");
        }
    }

    private synchronized void heartbeat() {
        if (this.configManager.getPipeManager().getPipeTaskCoordinator().hasAnyPipe()) {
            if (this.configManager.getPipeManager().getPipeTaskCoordinator().isLocked()) {
                LOGGER.warn("PipeTaskCoordinatorLock is held by another thread, skip this round of heartbeat to avoid procedure and rpc accumulation as much as possible");
                return;
            }
            Map<Integer, TDataNodeLocation> registeredDataNodeLocations = this.configManager.getNodeManager().getRegisteredDataNodeLocations();
            TPipeHeartbeatReq tPipeHeartbeatReq = new TPipeHeartbeatReq(System.currentTimeMillis());
            LOGGER.info("Collecting pipe heartbeat {} from data nodes", Long.valueOf(tPipeHeartbeatReq.heartbeatId));
            DataNodeAsyncRequestContext dataNodeAsyncRequestContext = new DataNodeAsyncRequestContext(CnToDnAsyncRequestType.PIPE_HEARTBEAT, tPipeHeartbeatReq, registeredDataNodeLocations);
            CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestToNodeWithRetryAndTimeoutInMs(dataNodeAsyncRequestContext, ((PipeConfig.getInstance().getPipeHeartbeatIntervalSecondsForCollectingPipeMeta() * 1000) * 2) / 3);
            dataNodeAsyncRequestContext.getResponseMap().forEach((num, tPipeHeartbeatResp) -> {
                this.pipeHeartbeatParser.parseHeartbeat(num.intValue(), new PipeHeartbeat(tPipeHeartbeatResp.getPipeMetaList(), tPipeHeartbeatResp.getPipeCompletedList(), tPipeHeartbeatResp.getPipeRemainingEventCountList(), tPipeHeartbeatResp.getPipeRemainingTimeList()));
            });
            try {
                TPipeHeartbeatResp tPipeHeartbeatResp2 = new TPipeHeartbeatResp();
                PipeConfigNodeAgent.task().collectPipeMetaList(tPipeHeartbeatReq, tPipeHeartbeatResp2);
                this.pipeHeartbeatParser.parseHeartbeat(ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), new PipeHeartbeat(tPipeHeartbeatResp2.getPipeMetaList(), null, tPipeHeartbeatResp2.getPipeRemainingEventCountList(), tPipeHeartbeatResp2.getPipeRemainingTimeList()));
            } catch (Exception e) {
                LOGGER.warn("Failed to collect pipe meta list from config node task agent", e);
            }
        }
    }

    public synchronized void stop() {
        if (!IS_SEPERATED_PIPE_HEARTBEAT_ENABLED || this.heartbeatFuture == null) {
            return;
        }
        this.heartbeatFuture.cancel(false);
        this.heartbeatFuture = null;
        LOGGER.info("PipeHeartbeat is stopped successfully.");
    }

    public void parseHeartbeat(int i, PipeHeartbeat pipeHeartbeat) {
        this.pipeHeartbeatParser.parseHeartbeat(i, pipeHeartbeat);
    }
}
