package org.apache.iotdb.confignode.manager.load;

import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException;
import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.load.balancer.PartitionBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap;
import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.manager.node.heartbeat.NodeStatistics;
import org.apache.iotdb.confignode.manager.observer.NodeStatisticsEvent;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupStatistics;
import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionStatistics;
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/confignode/manager/load/LoadManager.class */
public class LoadManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(LoadManager.class);
    private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
    private static final long HEARTBEAT_INTERVAL = CONF.getHeartbeatIntervalInMs();
    private final IManager configManager;
    private final RegionBalancer regionBalancer;
    private final PartitionBalancer partitionBalancer;
    private final RouteBalancer routeBalancer;
    private Future<?> currentLoadStatisticsFuture;
    private final ScheduledExecutorService loadStatisticsExecutor = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-LoadStatistics-Service");
    private final Object scheduleMonitor = new Object();
    private final EventBus eventBus = new AsyncEventBus("LoadManager-EventBus", Executors.newFixedThreadPool(5));

    public LoadManager(IManager iManager) {
        this.configManager = iManager;
        this.regionBalancer = new RegionBalancer(iManager);
        this.partitionBalancer = new PartitionBalancer(iManager);
        this.routeBalancer = new RouteBalancer(iManager);
        this.eventBus.register(iManager.getClusterSchemaManager());
        this.eventBus.register(iManager.getSyncManager());
    }

    public CreateRegionGroupsPlan allocateRegionGroups(Map<String, Integer> map, TConsensusGroupType tConsensusGroupType) throws NotEnoughDataNodeException, DatabaseNotExistsException {
        return this.regionBalancer.genRegionGroupsAllocationPlan(map, tConsensusGroupType);
    }

    public Map<String, SchemaPartitionTable> allocateSchemaPartition(Map<String, List<TSeriesPartitionSlot>> map) throws NoAvailableRegionGroupException {
        return this.partitionBalancer.allocateSchemaPartition(map);
    }

    public Map<String, DataPartitionTable> allocateDataPartition(Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> map) throws NoAvailableRegionGroupException {
        return this.partitionBalancer.allocateDataPartition(map);
    }

    public Map<TConsensusGroupId, Integer> getLatestRegionLeaderMap() {
        return this.routeBalancer.getLatestRegionLeaderMap();
    }

    public int getRegionGroupLeaderCount(int i, TConsensusGroupType tConsensusGroupType) {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        this.routeBalancer.getLatestRegionLeaderMap().forEach((tConsensusGroupId, num) -> {
            if (i == num.intValue() && tConsensusGroupType.equals(tConsensusGroupId.getType())) {
                atomicInteger.getAndIncrement();
            }
        });
        return atomicInteger.get();
    }

    public Map<TConsensusGroupId, TRegionReplicaSet> getLatestRegionRouteMap() {
        return this.routeBalancer.getLatestRegionPriorityMap();
    }

    public void startLoadStatisticsService() {
        synchronized (this.scheduleMonitor) {
            if (this.currentLoadStatisticsFuture == null) {
                this.currentLoadStatisticsFuture = ScheduledExecutorUtil.safelyScheduleWithFixedDelay(this.loadStatisticsExecutor, this::updateLoadStatistics, 0L, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
                LOGGER.info("LoadStatistics service is started successfully.");
            }
        }
    }

    public void stopLoadStatisticsService() {
        synchronized (this.scheduleMonitor) {
            if (this.currentLoadStatisticsFuture != null) {
                this.currentLoadStatisticsFuture.cancel(false);
                this.currentLoadStatisticsFuture = null;
                LOGGER.info("LoadStatistics service is stopped successfully.");
            }
        }
    }

    private void updateLoadStatistics() {
        boolean z = false;
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        getNodeManager().getNodeCacheMap().forEach((num, baseNodeCache) -> {
            NodeStatistics deepCopy = baseNodeCache.getPreviousStatistics().deepCopy();
            if (baseNodeCache.periodicUpdate()) {
                concurrentHashMap.put(num, new Pair(baseNodeCache.getStatistics(), deepCopy));
            }
        });
        if (!concurrentHashMap.isEmpty()) {
            z = true;
            recordNodeStatistics(concurrentHashMap);
            this.eventBus.post(new NodeStatisticsEvent(concurrentHashMap));
        }
        ConcurrentHashMap concurrentHashMap2 = new ConcurrentHashMap();
        getPartitionManager().getRegionGroupCacheMap().forEach((tConsensusGroupId, regionGroupCache) -> {
            if (regionGroupCache.periodicUpdate()) {
                concurrentHashMap2.put(tConsensusGroupId, regionGroupCache.getStatistics());
            }
        });
        if (!concurrentHashMap2.isEmpty()) {
            z = true;
            recordRegionGroupStatistics(concurrentHashMap2);
        }
        if (this.routeBalancer.updateRegionRouteMap()) {
            z = true;
            recordRegionRouteMap(this.routeBalancer.getRegionRouteMap());
        }
        if (z) {
            broadcastLatestRegionRouteMap();
        }
    }

    private void recordNodeStatistics(Map<Integer, Pair<NodeStatistics, NodeStatistics>> map) {
        LOGGER.info("[UpdateLoadStatistics] NodeStatisticsMap: ");
        for (Map.Entry<Integer, Pair<NodeStatistics, NodeStatistics>> entry : map.entrySet()) {
            LOGGER.info("[UpdateLoadStatistics]\t {}={}", "nodeId{" + entry.getKey() + "}", entry.getValue().left);
        }
    }

    private void recordRegionGroupStatistics(Map<TConsensusGroupId, RegionGroupStatistics> map) {
        LOGGER.info("[UpdateLoadStatistics] RegionGroupStatisticsMap: ");
        for (Map.Entry<TConsensusGroupId, RegionGroupStatistics> entry : map.entrySet()) {
            LOGGER.info("[UpdateLoadStatistics]\t RegionGroup: {}", entry.getKey());
            LOGGER.info("[UpdateLoadStatistics]\t {}", entry.getValue());
            for (Map.Entry<Integer, RegionStatistics> entry2 : entry.getValue().getRegionStatisticsMap().entrySet()) {
                LOGGER.info("[UpdateLoadStatistics]\t dataNodeId{}={}", entry2.getKey(), entry2.getValue());
            }
        }
    }

    private void recordRegionRouteMap(RegionRouteMap regionRouteMap) {
        LOGGER.info("[UpdateLoadStatistics] RegionLeaderMap: ");
        for (Map.Entry<TConsensusGroupId, Integer> entry : regionRouteMap.getRegionLeaderMap().entrySet()) {
            LOGGER.info("[UpdateLoadStatistics]\t {}={}", entry.getKey(), entry.getValue());
        }
        LOGGER.info("[UpdateLoadStatistics] RegionPriorityMap: ");
        for (Map.Entry<TConsensusGroupId, TRegionReplicaSet> entry2 : regionRouteMap.getRegionPriorityMap().entrySet()) {
            LOGGER.info("[UpdateLoadStatistics]\t {}={}", entry2.getKey(), entry2.getValue().getDataNodeLocations().stream().map((v0) -> {
                return v0.getDataNodeId();
            }).collect(Collectors.toList()));
        }
    }

    public void broadcastLatestRegionRouteMap() {
        Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap = getLatestRegionRouteMap();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.Removing, NodeStatus.ReadOnly).forEach(tDataNodeConfiguration -> {
            concurrentHashMap.put(Integer.valueOf(tDataNodeConfiguration.getLocation().getDataNodeId()), tDataNodeConfiguration.getLocation());
        });
        LOGGER.info("[UpdateLoadStatistics] Begin to broadcast RegionRouteMap:");
        AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(new AsyncClientHandler<>(DataNodeRequestType.UPDATE_REGION_ROUTE_MAP, new TRegionRouteReq(System.currentTimeMillis(), latestRegionRouteMap), concurrentHashMap));
        LOGGER.info("[UpdateLoadStatistics] Broadcast the latest RegionRouteMap finished.");
    }

    public void initHeartbeatCache() {
        getNodeManager().initNodeHeartbeatCache();
        getPartitionManager().initRegionGroupHeartbeatCache();
        this.routeBalancer.initRegionRouteMap();
    }

    public RouteBalancer getRouteBalancer() {
        return this.routeBalancer;
    }

    private NodeManager getNodeManager() {
        return this.configManager.getNodeManager();
    }

    private PartitionManager getPartitionManager() {
        return this.configManager.getPartitionManager();
    }

    public EventBus getEventBus() {
        return this.eventBus;
    }
}
