package org.apache.iotdb.confignode.manager.load.service;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.iotdb.ainode.rpc.thrift.TAIHeartbeatReq;
import org.apache.iotdb.common.rpc.thrift.TAINodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.confignode.client.async.AsyncAINodeHeartbeatClientPool;
import org.apache.iotdb.confignode.client.async.AsyncConfigNodeHeartbeatClientPool;
import org.apache.iotdb.confignode.client.async.AsyncDataNodeHeartbeatClientPool;
import org.apache.iotdb.confignode.client.async.handlers.heartbeat.AINodeHeartbeatHandler;
import org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler;
import org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
import org.apache.iotdb.confignode.manager.load.cache.node.ConfigNodeHeartbeatCache;
import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatReq;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/confignode/manager/load/service/HeartbeatService.class */
public class HeartbeatService {
    private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatService.class);
    private static final long HEARTBEAT_INTERVAL = ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs();
    protected IManager configManager;
    private final LoadCache loadCache;
    private Future<?> currentHeartbeatFuture;
    private static final int configNodeListPeriodicallySyncInterval = 100;
    private final Object heartbeatScheduleMonitor = new Object();
    private final ScheduledExecutorService heartBeatExecutor = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(ThreadName.CONFIG_NODE_HEART_BEAT_SERVICE.getName());
    private final AtomicLong heartbeatCounter = new AtomicLong(0);

    public HeartbeatService(IManager iManager, LoadCache loadCache) {
        setConfigManager(iManager);
        this.loadCache = loadCache;
    }

    protected void setConfigManager(IManager iManager) {
        this.configManager = iManager;
    }

    public void startHeartbeatService() {
        synchronized (this.heartbeatScheduleMonitor) {
            if (this.currentHeartbeatFuture == null) {
                this.currentHeartbeatFuture = ScheduledExecutorUtil.safelyScheduleWithFixedDelay(this.heartBeatExecutor, this::heartbeatLoopBody, 0L, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
                LOGGER.info("Heartbeat service is started successfully.");
            }
        }
    }

    public void stopHeartbeatService() {
        synchronized (this.heartbeatScheduleMonitor) {
            if (this.currentHeartbeatFuture != null) {
                this.currentHeartbeatFuture.cancel(false);
                this.currentHeartbeatFuture = null;
                LOGGER.info("Heartbeat service is stopped successfully.");
            }
        }
    }

    private void heartbeatLoopBody() {
        Optional.ofNullable(getConsensusManager()).ifPresent(consensusManager -> {
            if (getConsensusManager().isLeader()) {
                pingRegisteredConfigNodes(genConfigNodeHeartbeatReq(), getNodeManager().getRegisteredConfigNodes());
                pingRegisteredDataNodes(genHeartbeatReq(), getNodeManager().getRegisteredDataNodes());
                pingRegisteredAINodes(genMLHeartbeatReq(), getNodeManager().getRegisteredAINodes());
            }
        });
    }

    private TDataNodeHeartbeatReq genHeartbeatReq() {
        TDataNodeHeartbeatReq tDataNodeHeartbeatReq = new TDataNodeHeartbeatReq();
        tDataNodeHeartbeatReq.setHeartbeatTimestamp(System.nanoTime());
        tDataNodeHeartbeatReq.setNeedJudgeLeader(true);
        tDataNodeHeartbeatReq.setNeedSamplingLoad(this.heartbeatCounter.get() % 10 == 0);
        Pair<Long, Long> schemaQuotaRemain = this.configManager.getClusterSchemaManager().getSchemaQuotaRemain();
        tDataNodeHeartbeatReq.setTimeSeriesQuotaRemain(((Long) schemaQuotaRemain.left).longValue());
        tDataNodeHeartbeatReq.setDeviceQuotaRemain(((Long) schemaQuotaRemain.right).longValue());
        tDataNodeHeartbeatReq.setNeedPipeMetaList(!PipeConfig.getInstance().isSeperatedPipeHeartbeatEnabled() && this.heartbeatCounter.get() % ((long) PipeConfig.getInstance().getPipeHeartbeatIntervalSecondsForCollectingPipeMeta()) == 0);
        if (!this.configManager.getClusterQuotaManager().hasSpaceQuotaLimit()) {
            tDataNodeHeartbeatReq.setSchemaRegionIds(this.configManager.getClusterQuotaManager().getSchemaRegionIds());
            tDataNodeHeartbeatReq.setDataRegionIds(this.configManager.getClusterQuotaManager().getDataRegionIds());
            tDataNodeHeartbeatReq.setSpaceQuotaUsage(this.configManager.getClusterQuotaManager().getSpaceQuotaUsage());
        }
        this.heartbeatCounter.getAndIncrement();
        return tDataNodeHeartbeatReq;
    }

    private void addConfigNodeLocationsToReq(int i, TDataNodeHeartbeatReq tDataNodeHeartbeatReq) {
        Set<TEndPoint> confirmedConfigNodeEndPoints = this.loadCache.getConfirmedConfigNodeEndPoints(i);
        Set set = (Set) getNodeManager().getRegisteredConfigNodes().stream().map((v0) -> {
            return v0.getInternalEndPoint();
        }).collect(Collectors.toSet());
        if (!set.equals(confirmedConfigNodeEndPoints) || this.heartbeatCounter.get() % 100 == 0) {
            tDataNodeHeartbeatReq.setConfigNodeEndPoints(set);
        }
    }

    private TConfigNodeHeartbeatReq genConfigNodeHeartbeatReq() {
        TConfigNodeHeartbeatReq tConfigNodeHeartbeatReq = new TConfigNodeHeartbeatReq();
        tConfigNodeHeartbeatReq.setTimestamp(System.nanoTime());
        return tConfigNodeHeartbeatReq;
    }

    private TAIHeartbeatReq genMLHeartbeatReq() {
        TAIHeartbeatReq tAIHeartbeatReq = new TAIHeartbeatReq();
        tAIHeartbeatReq.setHeartbeatTimestamp(System.nanoTime());
        tAIHeartbeatReq.setNeedSamplingLoad(this.heartbeatCounter.get() % 10 == 0);
        return tAIHeartbeatReq;
    }

    private void pingRegisteredConfigNodes(TConfigNodeHeartbeatReq tConfigNodeHeartbeatReq, List<TConfigNodeLocation> list) {
        for (TConfigNodeLocation tConfigNodeLocation : list) {
            int configNodeId = tConfigNodeLocation.getConfigNodeId();
            if (configNodeId != ConfigNodeHeartbeatCache.CURRENT_NODE_ID && !this.loadCache.checkAndSetHeartbeatProcessing(configNodeId)) {
                AsyncConfigNodeHeartbeatClientPool.getInstance().getConfigNodeHeartBeat(tConfigNodeLocation.getInternalEndPoint(), tConfigNodeHeartbeatReq, getConfigNodeHeartbeatHandler(configNodeId));
            }
        }
    }

    protected ConfigNodeHeartbeatHandler getConfigNodeHeartbeatHandler(int i) {
        return new ConfigNodeHeartbeatHandler(i, this.configManager.getLoadManager());
    }

    private void pingRegisteredDataNodes(TDataNodeHeartbeatReq tDataNodeHeartbeatReq, List<TDataNodeConfiguration> list) {
        for (TDataNodeConfiguration tDataNodeConfiguration : list) {
            int dataNodeId = tDataNodeConfiguration.getLocation().getDataNodeId();
            if (!this.loadCache.checkAndSetHeartbeatProcessing(dataNodeId)) {
                LoadManager loadManager = this.configManager.getLoadManager();
                Map<Integer, Long> deviceNum = this.configManager.getClusterQuotaManager().getDeviceNum();
                Map<Integer, Long> timeSeriesNum = this.configManager.getClusterQuotaManager().getTimeSeriesNum();
                Map<Integer, Long> regionDisk = this.configManager.getClusterQuotaManager().getRegionDisk();
                ClusterSchemaManager clusterSchemaManager = this.configManager.getClusterSchemaManager();
                Objects.requireNonNull(clusterSchemaManager);
                Consumer consumer = clusterSchemaManager::updateTimeSeriesUsage;
                ClusterSchemaManager clusterSchemaManager2 = this.configManager.getClusterSchemaManager();
                Objects.requireNonNull(clusterSchemaManager2);
                DataNodeHeartbeatHandler dataNodeHeartbeatHandler = new DataNodeHeartbeatHandler(dataNodeId, loadManager, deviceNum, timeSeriesNum, regionDisk, consumer, clusterSchemaManager2::updateDeviceUsage, this.configManager.getPipeManager().getPipeRuntimeCoordinator());
                this.configManager.getClusterQuotaManager().updateSpaceQuotaUsage();
                addConfigNodeLocationsToReq(dataNodeId, tDataNodeHeartbeatReq);
                AsyncDataNodeHeartbeatClientPool.getInstance().getDataNodeHeartBeat(tDataNodeConfiguration.getLocation().getInternalEndPoint(), tDataNodeHeartbeatReq, dataNodeHeartbeatHandler);
            }
        }
    }

    private void pingRegisteredAINodes(TAIHeartbeatReq tAIHeartbeatReq, List<TAINodeConfiguration> list) {
        for (TAINodeConfiguration tAINodeConfiguration : list) {
            AsyncAINodeHeartbeatClientPool.getInstance().getAINodeHeartBeat(tAINodeConfiguration.getLocation().getInternalEndPoint(), tAIHeartbeatReq, new AINodeHeartbeatHandler(tAINodeConfiguration.getLocation().getAiNodeId(), this.configManager.getLoadManager()));
        }
    }

    private ConsensusManager getConsensusManager() {
        return this.configManager.getConsensusManager();
    }

    private NodeManager getNodeManager() {
        return this.configManager.getNodeManager();
    }
}
