package org.apache.iotdb.confignode.manager.load.service;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.confignode.client.async.AsyncConfigNodeHeartbeatClientPool;
import org.apache.iotdb.confignode.client.async.AsyncDataNodeHeartbeatClientPool;
import org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler;
import org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
import org.apache.iotdb.confignode.manager.load.cache.node.ConfigNodeHeartbeatCache;
import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/confignode/manager/load/service/HeartbeatService.class */
public class HeartbeatService {
    private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatService.class);
    private static final long HEARTBEAT_INTERVAL = ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs();
    private final IManager configManager;
    private final LoadCache loadCache;
    private Future<?> currentHeartbeatFuture;
    private final Object heartbeatScheduleMonitor = new Object();
    private final ScheduledExecutorService heartBeatExecutor = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(ThreadName.CONFIG_NODE_HEART_BEAT_SERVICE.getName());
    private final AtomicLong heartbeatCounter = new AtomicLong(0);

    public HeartbeatService(IManager iManager, LoadCache loadCache) {
        this.configManager = iManager;
        this.loadCache = loadCache;
    }

    public void startHeartbeatService() {
        synchronized (this.heartbeatScheduleMonitor) {
            if (this.currentHeartbeatFuture == null) {
                this.currentHeartbeatFuture = ScheduledExecutorUtil.safelyScheduleWithFixedDelay(this.heartBeatExecutor, this::heartbeatLoopBody, 0L, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
                LOGGER.info("Heartbeat service is started successfully.");
            }
        }
    }

    public void stopHeartbeatService() {
        synchronized (this.heartbeatScheduleMonitor) {
            if (this.currentHeartbeatFuture != null) {
                this.currentHeartbeatFuture.cancel(false);
                this.currentHeartbeatFuture = null;
                LOGGER.info("Heartbeat service is stopped successfully.");
            }
        }
    }

    private void heartbeatLoopBody() {
        Optional.ofNullable(getConsensusManager()).ifPresent(consensusManager -> {
            if (getConsensusManager().isLeader()) {
                THeartbeatReq genHeartbeatReq = genHeartbeatReq();
                pingRegisteredConfigNodes(genHeartbeatReq, getNodeManager().getRegisteredConfigNodes());
                pingRegisteredDataNodes(genHeartbeatReq, getNodeManager().getRegisteredDataNodes());
            }
        });
    }

    private THeartbeatReq genHeartbeatReq() {
        THeartbeatReq tHeartbeatReq = new THeartbeatReq();
        tHeartbeatReq.setHeartbeatTimestamp(System.currentTimeMillis());
        tHeartbeatReq.setNeedJudgeLeader(true);
        tHeartbeatReq.setNeedSamplingLoad(this.heartbeatCounter.get() % 10 == 0);
        tHeartbeatReq.setSchemaQuotaCount(this.configManager.getClusterSchemaManager().getSchemaQuotaCount());
        tHeartbeatReq.setNeedPipeMetaList(!PipeConfig.getInstance().isSeperatedPipeHeartbeatEnabled() && this.heartbeatCounter.get() % ((long) PipeConfig.getInstance().getPipeHeartbeatIntervalSecondsForCollectingPipeMeta()) == 0);
        if (!this.configManager.getClusterQuotaManager().hasSpaceQuotaLimit()) {
            tHeartbeatReq.setSchemaRegionIds(this.configManager.getClusterQuotaManager().getSchemaRegionIds());
            tHeartbeatReq.setDataRegionIds(this.configManager.getClusterQuotaManager().getDataRegionIds());
            tHeartbeatReq.setSpaceQuotaUsage(this.configManager.getClusterQuotaManager().getSpaceQuotaUsage());
        }
        this.heartbeatCounter.getAndIncrement();
        return tHeartbeatReq;
    }

    private void pingRegisteredConfigNodes(THeartbeatReq tHeartbeatReq, List<TConfigNodeLocation> list) {
        for (TConfigNodeLocation tConfigNodeLocation : list) {
            if (tConfigNodeLocation.getConfigNodeId() != ConfigNodeHeartbeatCache.CURRENT_NODE_ID) {
                AsyncConfigNodeHeartbeatClientPool.getInstance().getConfigNodeHeartBeat(tConfigNodeLocation.getInternalEndPoint(), tHeartbeatReq.getHeartbeatTimestamp(), new ConfigNodeHeartbeatHandler(tConfigNodeLocation.getConfigNodeId(), this.loadCache));
            }
        }
    }

    private void pingRegisteredDataNodes(THeartbeatReq tHeartbeatReq, List<TDataNodeConfiguration> list) {
        for (TDataNodeConfiguration tDataNodeConfiguration : list) {
            int dataNodeId = tDataNodeConfiguration.getLocation().getDataNodeId();
            LoadCache loadCache = this.loadCache;
            Map<Integer, Long> deviceNum = this.configManager.getClusterQuotaManager().getDeviceNum();
            Map<Integer, Long> timeSeriesNum = this.configManager.getClusterQuotaManager().getTimeSeriesNum();
            Map<Integer, Long> regionDisk = this.configManager.getClusterQuotaManager().getRegionDisk();
            ClusterSchemaManager clusterSchemaManager = this.configManager.getClusterSchemaManager();
            Objects.requireNonNull(clusterSchemaManager);
            DataNodeHeartbeatHandler dataNodeHeartbeatHandler = new DataNodeHeartbeatHandler(dataNodeId, loadCache, deviceNum, timeSeriesNum, regionDisk, clusterSchemaManager::updateSchemaQuota, this.configManager.getPipeManager().getPipeRuntimeCoordinator());
            this.configManager.getClusterQuotaManager().updateSpaceQuotaUsage();
            AsyncDataNodeHeartbeatClientPool.getInstance().getDataNodeHeartBeat(tDataNodeConfiguration.getLocation().getInternalEndPoint(), tHeartbeatReq, dataNodeHeartbeatHandler);
        }
    }

    private ConsensusManager getConsensusManager() {
        return this.configManager.getConsensusManager();
    }

    private NodeManager getNodeManager() {
        return this.configManager.getNodeManager();
    }
}
