package org.apache.iotdb.confignode.manager.load.service;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
import org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
import org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
import org.apache.iotdb.confignode.manager.load.cache.IFailureDetector;
import org.apache.iotdb.confignode.manager.load.cache.detector.FixedDetector;
import org.apache.iotdb.confignode.manager.load.cache.detector.PhiAccrualDetector;
import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
import org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
import org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
import org.apache.iotdb.confignode.procedure.impl.testonly.CreateManyDatabasesProcedure;
import org.apache.ratis.util.AwaitForSignal;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/confignode/manager/load/service/TopologyService.class */
public class TopologyService implements Runnable, IClusterStatusSubscriber {
    private static final long PROBING_INTERVAL_MS = 5000;
    private static final long PROBING_TIMEOUT_MS = 5000;
    private static final int SAMPLING_WINDOW_SIZE = 100;
    private final Consumer<Map<Integer, Set<Integer>>> topologyChangeListener;
    private final IManager configManager;
    private final IFailureDetector failureDetector;
    private static final Logger LOGGER = LoggerFactory.getLogger(TopologyService.class);
    private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
    private final ExecutorService topologyThread = IoTDBThreadPoolFactory.newSingleThreadExecutor(ThreadName.CONFIG_NODE_TOPOLOGY_SERVICE.getName());
    private Future<?> future = null;
    private final List<Integer> startingDataNodes = new CopyOnWriteArrayList();
    private final Map<Pair<Integer, Integer>, List<AbstractHeartbeatSample>> heartbeats = new ConcurrentHashMap();
    private final AtomicBoolean shouldRun = new AtomicBoolean(false);
    private final AwaitForSignal awaitForSignal = new AwaitForSignal(getClass().getSimpleName());

    public TopologyService(IManager iManager, Consumer<Map<Integer, Set<Integer>>> consumer) {
        this.configManager = iManager;
        this.topologyChangeListener = consumer;
        String failureDetector = CONF.getFailureDetector();
        boolean z = -1;
        switch (failureDetector.hashCode()) {
            case 97445748:
                if (failureDetector.equals(IFailureDetector.FIXED_DETECTOR)) {
                    z = true;
                    break;
                }
                break;
            case 2019179745:
                if (failureDetector.equals(IFailureDetector.PHI_ACCRUAL_DETECTOR)) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case CreateManyDatabasesProcedure.INITIAL_STATE /* 0 */:
                this.failureDetector = new PhiAccrualDetector(CONF.getFailureDetectorPhiThreshold(), CONF.getFailureDetectorPhiAcceptablePauseInMs() * 1000000, CONF.getHeartbeatIntervalInMs() * 200000, 60, new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() * 1000000));
                return;
            case true:
            default:
                this.failureDetector = new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() * 1000000);
                return;
        }
    }

    public synchronized void startTopologyService() {
        if (this.future == null) {
            this.future = this.topologyThread.submit(this);
        }
        this.shouldRun.set(true);
        LOGGER.info("Topology Probing has started successfully");
    }

    public synchronized void stopTopologyService() {
        this.shouldRun.set(false);
        this.future.cancel(true);
        this.future = null;
        this.heartbeats.clear();
        LOGGER.info("Topology Probing has stopped successfully");
    }

    private boolean mayWait() {
        try {
            this.awaitForSignal.await(5000L, TimeUnit.MILLISECONDS);
            return true;
        } catch (InterruptedException e) {
            return false;
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.shouldRun.get() && mayWait()) {
            topologyProbing();
        }
    }

    private synchronized void topologyProbing() {
        ArrayList arrayList = new ArrayList();
        HashSet hashSet = new HashSet();
        Iterator<TDataNodeConfiguration> it = this.configManager.getNodeManager().getRegisteredDataNodes().iterator();
        while (it.hasNext()) {
            TDataNodeLocation location = it.next().getLocation();
            if (!this.startingDataNodes.contains(Integer.valueOf(location.getDataNodeId()))) {
                arrayList.add(location);
                hashSet.add(Integer.valueOf(location.getDataNodeId()));
            }
        }
        TNodeLocations tNodeLocations = new TNodeLocations();
        tNodeLocations.setDataNodeLocations(arrayList);
        tNodeLocations.setConfigNodeLocations(Collections.emptyList());
        DataNodeAsyncRequestContext dataNodeAsyncRequestContext = new DataNodeAsyncRequestContext(CnToDnAsyncRequestType.SUBMIT_TEST_DN_INTERNAL_CONNECTION_TASK, tNodeLocations, (Map) this.configManager.getNodeManager().getRegisteredDataNodes().stream().map((v0) -> {
            return v0.getLocation();
        }).collect(Collectors.toMap((v0) -> {
            return v0.getDataNodeId();
        }, tDataNodeLocation -> {
            return tDataNodeLocation;
        })));
        CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithTimeoutInMs(dataNodeAsyncRequestContext, 5000L);
        ArrayList<TTestConnectionResult> arrayList2 = new ArrayList();
        dataNodeAsyncRequestContext.getResponseMap().forEach((num, tTestConnectionResp) -> {
            if (tTestConnectionResp.isSetResultList()) {
                arrayList2.addAll(tTestConnectionResp.getResultList());
            }
        });
        for (TTestConnectionResult tTestConnectionResult : arrayList2) {
            int intValue = ((Integer) Optional.ofNullable(tTestConnectionResult.getSender().getDataNodeLocation()).map((v0) -> {
                return v0.getDataNodeId();
            }).orElse(-1)).intValue();
            int nodeId = tTestConnectionResult.getServiceProvider().getNodeId();
            if (tTestConnectionResult.isSuccess() && hashSet.contains(Integer.valueOf(intValue)) && hashSet.contains(Integer.valueOf(nodeId))) {
                List<AbstractHeartbeatSample> computeIfAbsent = this.heartbeats.computeIfAbsent(new Pair<>(Integer.valueOf(intValue), Integer.valueOf(nodeId)), pair -> {
                    return new LinkedList();
                });
                computeIfAbsent.add(new NodeHeartbeatSample(NodeStatus.Running));
                if (computeIfAbsent.size() > 100) {
                    computeIfAbsent.remove(0);
                }
            }
        }
        Map<Integer, Set<Integer>> map = (Map) arrayList.stream().collect(Collectors.toMap((v0) -> {
            return v0.getDataNodeId();
        }, tDataNodeLocation2 -> {
            return new HashSet();
        }));
        for (Map.Entry<Pair<Integer, Integer>, List<AbstractHeartbeatSample>> entry : this.heartbeats.entrySet()) {
            int intValue2 = ((Integer) entry.getKey().getLeft()).intValue();
            int intValue3 = ((Integer) entry.getKey().getRight()).intValue();
            if (entry.getValue().isEmpty() || this.failureDetector.isAvailable(entry.getKey(), entry.getValue())) {
                map.get(Integer.valueOf(intValue2)).add(Integer.valueOf(intValue3));
            } else {
                LOGGER.debug("Connection from DataNode {} to DataNode {} is broken", Integer.valueOf(intValue2), Integer.valueOf(intValue3));
            }
        }
        logAsymmetricPartition(map);
        if (this.shouldRun.get()) {
            this.topologyChangeListener.accept(map);
        }
    }

    private void logAsymmetricPartition(Map<Integer, Set<Integer>> map) {
        Set<Integer> keySet = map.keySet();
        if (keySet.size() == 1) {
            return;
        }
        Iterator<Integer> it = keySet.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            Iterator<Integer> it2 = keySet.iterator();
            while (it2.hasNext()) {
                int intValue2 = it2.next().intValue();
                if (intValue < intValue2) {
                    Set<Integer> set = map.get(Integer.valueOf(intValue));
                    Set<Integer> set2 = map.get(Integer.valueOf(intValue2));
                    if (set.size() > 1 && set2.size() > 1 && !set2.contains(Integer.valueOf(intValue)) && !set.contains(Integer.valueOf(intValue2))) {
                        LOGGER.warn("[Topology] Asymmetric network partition from {} to {}", Integer.valueOf(intValue), Integer.valueOf(intValue2));
                    }
                }
            }
        }
    }

    @Override // org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber
    public void onNodeStatisticsChanged(NodeStatisticsChangeEvent nodeStatisticsChangeEvent) {
        Set<Integer> keySet = this.configManager.getNodeManager().getRegisteredDataNodeLocations().keySet();
        for (Map.Entry<Integer, Pair<NodeStatistics, NodeStatistics>> entry : nodeStatisticsChangeEvent.getDifferentNodeStatisticsMap().entrySet()) {
            Integer key = entry.getKey();
            Pair<NodeStatistics, NodeStatistics> value = entry.getValue();
            if (keySet.contains(key)) {
                if (value.getLeft() == null) {
                    this.startingDataNodes.add(key);
                } else {
                    this.startingDataNodes.remove(key);
                    Set set = (Set) this.heartbeats.keySet().stream().filter(pair -> {
                        return Objects.equals(pair.getLeft(), key) || Objects.equals(pair.getRight(), key);
                    }).collect(Collectors.toSet());
                    if (value.getRight() == null) {
                        Map<Pair<Integer, Integer>, List<AbstractHeartbeatSample>> map = this.heartbeats;
                        Objects.requireNonNull(map);
                        set.forEach((v1) -> {
                            r1.remove(v1);
                        });
                    } else if (NodeStatus.Unknown.equals(((NodeStatistics) value.getLeft()).getStatus()) && NodeStatus.Running.equals(((NodeStatistics) value.getRight()).getStatus())) {
                        set.forEach(pair2 -> {
                            this.heartbeats.put(pair2, new ArrayList());
                        });
                        this.awaitForSignal.signal();
                    }
                }
            }
        }
    }
}
