package org.apache.iotdb.confignode.manager.load.service;

import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
import org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupStatistics;
import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics;
import org.apache.iotdb.confignode.manager.load.subscriber.ConsensusGroupStatisticsChangeEvent;
import org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
import org.apache.iotdb.confignode.manager.load.subscriber.RegionGroupStatisticsChangeEvent;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/confignode/manager/load/service/EventService.class */
public class EventService {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventService.class);
    private static final long HEARTBEAT_INTERVAL = ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs();
    private Future<?> currentEventServiceFuture;
    private final LoadCache loadCache;
    private final Object eventServiceMonitor = new Object();
    private final ScheduledExecutorService eventServiceExecutor = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(ThreadName.CONFIG_NODE_EVENT_SERVICE.getName());
    private final Map<Integer, NodeStatistics> previousNodeStatisticsMap = new TreeMap();
    private final Map<TConsensusGroupId, RegionGroupStatistics> previousRegionGroupStatisticsMap = new TreeMap();
    private final Map<TConsensusGroupId, ConsensusGroupStatistics> previousConsensusGroupStatisticsMap = new TreeMap();
    private final EventBus eventPublisher = new AsyncEventBus(ThreadName.CONFIG_NODE_LOAD_PUBLISHER.getName(), IoTDBThreadPoolFactory.newFixedThreadPool(5, ThreadName.CONFIG_NODE_LOAD_PUBLISHER.getName()));

    public EventService(IManager iManager, LoadCache loadCache, RouteBalancer routeBalancer) {
        this.loadCache = loadCache;
        this.eventPublisher.register(iManager.getPipeManager().getPipeRuntimeCoordinator());
        this.eventPublisher.register(routeBalancer);
    }

    public void startEventService() {
        synchronized (this.eventServiceMonitor) {
            if (this.currentEventServiceFuture == null) {
                this.currentEventServiceFuture = ScheduledExecutorUtil.safelyScheduleWithFixedDelay(this.eventServiceExecutor, this::broadcastChangeEventIfNecessary, 0L, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
                LOGGER.info("Event service is started successfully.");
            }
        }
    }

    public void stopEventService() {
        synchronized (this.eventServiceMonitor) {
            if (this.currentEventServiceFuture != null) {
                this.currentEventServiceFuture.cancel(false);
                this.currentEventServiceFuture = null;
                LOGGER.info("Event service is stopped successfully.");
            }
        }
    }

    private void broadcastChangeEventIfNecessary() {
        checkAndBroadcastNodeStatisticsChangeEventIfNecessary();
        checkAndBroadcastRegionGroupStatisticsChangeEventIfNecessary();
        checkAndBroadcastConsensusGroupStatisticsChangeEventIfNecessary();
    }

    public synchronized void checkAndBroadcastNodeStatisticsChangeEventIfNecessary() {
        Map<Integer, NodeStatistics> currentNodeStatisticsMap = this.loadCache.getCurrentNodeStatisticsMap();
        TreeMap treeMap = new TreeMap();
        currentNodeStatisticsMap.forEach((num, nodeStatistics) -> {
            NodeStatistics nodeStatistics = this.previousNodeStatisticsMap.get(num);
            if (nodeStatistics == null || (nodeStatistics.isNewerThan(nodeStatistics) && !nodeStatistics.equals(nodeStatistics))) {
                treeMap.put(num, new Pair(nodeStatistics, nodeStatistics));
                this.previousNodeStatisticsMap.put(num, nodeStatistics);
            }
        });
        this.previousNodeStatisticsMap.forEach((num2, nodeStatistics2) -> {
            if (currentNodeStatisticsMap.containsKey(num2)) {
                return;
            }
            treeMap.put(num2, new Pair(nodeStatistics2, (Object) null));
        });
        this.previousNodeStatisticsMap.keySet().retainAll(currentNodeStatisticsMap.keySet());
        if (treeMap.isEmpty()) {
            return;
        }
        this.eventPublisher.post(new NodeStatisticsChangeEvent(treeMap));
        recordNodeStatistics(treeMap);
    }

    private void recordNodeStatistics(Map<Integer, Pair<NodeStatistics, NodeStatistics>> map) {
        LOGGER.info("[NodeStatistics] NodeStatisticsMap: ");
        for (Map.Entry<Integer, Pair<NodeStatistics, NodeStatistics>> entry : map.entrySet()) {
            LOGGER.info("[NodeStatistics]\t {}: {} -> {}", new Object[]{"nodeId{" + entry.getKey() + "}", entry.getValue().getLeft(), entry.getValue().getRight()});
        }
    }

    public synchronized void checkAndBroadcastRegionGroupStatisticsChangeEventIfNecessary() {
        Map<TConsensusGroupId, RegionGroupStatistics> currentRegionGroupStatisticsMap = this.loadCache.getCurrentRegionGroupStatisticsMap();
        TreeMap treeMap = new TreeMap();
        currentRegionGroupStatisticsMap.forEach((tConsensusGroupId, regionGroupStatistics) -> {
            RegionGroupStatistics regionGroupStatistics = this.previousRegionGroupStatisticsMap.get(tConsensusGroupId);
            if (regionGroupStatistics == null || (regionGroupStatistics.isNewerThan(regionGroupStatistics) && !regionGroupStatistics.equals(regionGroupStatistics))) {
                treeMap.put(tConsensusGroupId, new Pair(regionGroupStatistics, regionGroupStatistics));
                this.previousRegionGroupStatisticsMap.put(tConsensusGroupId, regionGroupStatistics);
            }
        });
        this.previousRegionGroupStatisticsMap.forEach((tConsensusGroupId2, regionGroupStatistics2) -> {
            if (currentRegionGroupStatisticsMap.containsKey(tConsensusGroupId2)) {
                return;
            }
            treeMap.put(tConsensusGroupId2, new Pair(regionGroupStatistics2, (Object) null));
        });
        this.previousRegionGroupStatisticsMap.keySet().retainAll(currentRegionGroupStatisticsMap.keySet());
        if (treeMap.isEmpty()) {
            return;
        }
        this.eventPublisher.post(new RegionGroupStatisticsChangeEvent(treeMap));
        recordRegionGroupStatistics(treeMap);
    }

    private void recordRegionGroupStatistics(Map<TConsensusGroupId, Pair<RegionGroupStatistics, RegionGroupStatistics>> map) {
        LOGGER.info("[RegionGroupStatistics] RegionGroupStatisticsMap: ");
        for (Map.Entry<TConsensusGroupId, Pair<RegionGroupStatistics, RegionGroupStatistics>> entry : map.entrySet()) {
            RegionGroupStatistics regionGroupStatistics = (RegionGroupStatistics) entry.getValue().getLeft();
            RegionGroupStatistics regionGroupStatistics2 = (RegionGroupStatistics) entry.getValue().getRight();
            Logger logger = LOGGER;
            Object[] objArr = new Object[3];
            objArr[0] = entry.getKey();
            objArr[1] = regionGroupStatistics == null ? null : regionGroupStatistics.getRegionGroupStatus();
            objArr[2] = regionGroupStatistics2 == null ? null : regionGroupStatistics2.getRegionGroupStatus();
            logger.info("[RegionGroupStatistics]\t RegionGroup {}: {} -> {}", objArr);
            List<Integer> emptyList = regionGroupStatistics == null ? Collections.emptyList() : regionGroupStatistics.getRegionIds();
            List<Integer> emptyList2 = regionGroupStatistics2 == null ? Collections.emptyList() : regionGroupStatistics2.getRegionIds();
            for (Integer num : emptyList) {
                if (emptyList2.contains(num)) {
                    LOGGER.info("[RegionGroupStatistics]\t Region in DataNode {}: {} -> {}", new Object[]{num, regionGroupStatistics.getRegionStatus(num.intValue()), regionGroupStatistics2.getRegionStatus(num.intValue())});
                } else {
                    LOGGER.info("[RegionGroupStatistics]\t Region in DataNode {}: {} -> null", num, regionGroupStatistics.getRegionStatus(num.intValue()));
                }
            }
            for (Integer num2 : emptyList2) {
                if (!emptyList.contains(num2)) {
                    LOGGER.info("[RegionGroupStatistics]\t Region in DataNode {}: null -> {}", num2, regionGroupStatistics2.getRegionStatus(num2.intValue()));
                }
            }
        }
    }

    public synchronized void checkAndBroadcastConsensusGroupStatisticsChangeEventIfNecessary() {
        Map<TConsensusGroupId, ConsensusGroupStatistics> currentConsensusGroupStatisticsMap = this.loadCache.getCurrentConsensusGroupStatisticsMap();
        TreeMap treeMap = new TreeMap();
        currentConsensusGroupStatisticsMap.forEach((tConsensusGroupId, consensusGroupStatistics) -> {
            ConsensusGroupStatistics consensusGroupStatistics = this.previousConsensusGroupStatisticsMap.get(tConsensusGroupId);
            if (consensusGroupStatistics == null || (consensusGroupStatistics.isNewerThan(consensusGroupStatistics) && !consensusGroupStatistics.equals(consensusGroupStatistics))) {
                treeMap.put(tConsensusGroupId, new Pair(consensusGroupStatistics, consensusGroupStatistics));
                this.previousConsensusGroupStatisticsMap.put(tConsensusGroupId, consensusGroupStatistics);
            }
        });
        this.previousConsensusGroupStatisticsMap.forEach((tConsensusGroupId2, consensusGroupStatistics2) -> {
            if (currentConsensusGroupStatisticsMap.containsKey(tConsensusGroupId2)) {
                return;
            }
            treeMap.put(tConsensusGroupId2, new Pair(consensusGroupStatistics2, (Object) null));
        });
        this.previousConsensusGroupStatisticsMap.keySet().retainAll(currentConsensusGroupStatisticsMap.keySet());
        if (treeMap.isEmpty()) {
            return;
        }
        this.eventPublisher.post(new ConsensusGroupStatisticsChangeEvent(treeMap));
        recordConsensusGroupStatistics(treeMap);
    }

    private void recordConsensusGroupStatistics(Map<TConsensusGroupId, Pair<ConsensusGroupStatistics, ConsensusGroupStatistics>> map) {
        LOGGER.info("[ConsensusGroupStatistics] ConsensusGroupStatisticsMap: ");
        for (Map.Entry<TConsensusGroupId, Pair<ConsensusGroupStatistics, ConsensusGroupStatistics>> entry : map.entrySet()) {
            if (!Objects.equals(entry.getValue().getRight(), entry.getValue().getLeft())) {
                LOGGER.info("[ConsensusGroupStatistics]\t {}: {} -> {}", new Object[]{entry.getKey(), entry.getValue().getLeft(), entry.getValue().getRight()});
            }
        }
    }

    @TestOnly
    public EventBus getEventPublisher() {
        return this.eventPublisher;
    }
}
