package org.apache.iotdb.consensus.pipe;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.UnmodifiableIterator;
import java.io.File;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient;
import org.apache.iotdb.commons.client.container.PipeConsensusClientMgrContainer;
import org.apache.iotdb.commons.client.sync.SyncPipeConsensusServiceClient;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
import org.apache.iotdb.commons.service.RegisterManager;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.consensus.config.PipeConsensusConfig;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
import org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeGuardian;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeManager;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
import org.apache.iotdb.consensus.pipe.service.PipeConsensusRPCService;
import org.apache.iotdb.consensus.pipe.service.PipeConsensusRPCServiceProcessor;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/consensus/pipe/PipeConsensus.class */
public class PipeConsensus implements IConsensus {
    private static final String CONSENSUS_PIPE_GUARDIAN_TASK_ID = "consensus_pipe_guardian";
    private static final String CLASS_NAME = PipeConsensus.class.getSimpleName();
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeConsensus.class);
    private final TEndPoint thisNode;
    private final int thisNodeId;
    private final File storageDir;
    private final IStateMachine.Registry registry;
    private final PipeConsensusRPCService rpcService;
    private final PipeConsensusConfig config;
    private final ConsensusPipeManager consensusPipeManager;
    private final ConsensusPipeGuardian consensusPipeGuardian;
    private final Map<ConsensusGroupId, PipeConsensusServerImpl> stateMachineMap = new ConcurrentHashMap();
    private final RegisterManager registerManager = new RegisterManager();
    private final ReentrantLock stateMachineMapLock = new ReentrantLock();
    private Map<ConsensusGroupId, List<Peer>> correctPeerListBeforeStart = null;
    private final IClientManager<TEndPoint, AsyncPipeConsensusServiceClient> asyncClientManager = PipeConsensusClientMgrContainer.getInstance().getAsyncClientManager();
    private final IClientManager<TEndPoint, SyncPipeConsensusServiceClient> syncClientManager = PipeConsensusClientMgrContainer.getInstance().getSyncClientManager();

    public PipeConsensus(ConsensusConfig consensusConfig, IStateMachine.Registry registry) {
        this.thisNode = consensusConfig.getThisNodeEndPoint();
        this.thisNodeId = consensusConfig.getThisNodeId();
        this.storageDir = new File(consensusConfig.getStorageDir());
        this.config = consensusConfig.getPipeConsensusConfig();
        this.registry = registry;
        this.rpcService = new PipeConsensusRPCService(this.thisNode, consensusConfig.getPipeConsensusConfig());
        this.consensusPipeManager = new ConsensusPipeManager(consensusConfig.getPipeConsensusConfig().getPipe(), consensusConfig.getPipeConsensusConfig().getReplicateMode());
        this.consensusPipeGuardian = consensusConfig.getPipeConsensusConfig().getPipe().getConsensusPipeGuardian();
    }

    @Override // org.apache.iotdb.consensus.IConsensus
    public synchronized void start() throws IOException {
        initAndRecover();
        this.rpcService.initSyncedServiceImpl(new PipeConsensusRPCServiceProcessor(this, this.config.getPipe()));
        try {
            this.registerManager.register(this.rpcService);
            this.consensusPipeGuardian.start(CONSENSUS_PIPE_GUARDIAN_TASK_ID, this::checkAllConsensusPipe, this.config.getPipe().getConsensusPipeGuardJobIntervalInSeconds());
        } catch (StartupException e) {
            throw new IOException((Throwable) e);
        }
    }

    private void initAndRecover() throws IOException {
        if (this.storageDir.exists()) {
            CompletableFuture.runAsync(() -> {
                try {
                    DirectoryStream<Path> newDirectoryStream = Files.newDirectoryStream(this.storageDir.toPath());
                    try {
                        for (Path path : newDirectoryStream) {
                            ConsensusGroupId parsePeerFileName = parsePeerFileName(path.getFileName().toString());
                            PipeConsensusServerImpl pipeConsensusServerImpl = new PipeConsensusServerImpl(new Peer(parsePeerFileName, this.thisNodeId, this.thisNode), this.registry.apply(parsePeerFileName), path.toString(), new ArrayList(), this.config, this.consensusPipeManager, this.syncClientManager);
                            this.stateMachineMap.put(parsePeerFileName, pipeConsensusServerImpl);
                            checkPeerListAndStartIfEligible(parsePeerFileName, pipeConsensusServerImpl);
                        }
                        if (newDirectoryStream != null) {
                            newDirectoryStream.close();
                        }
                    } finally {
                    }
                } catch (Exception e) {
                    LOGGER.error("Failed to recover consensus from {}", this.storageDir, e);
                }
            }).exceptionally(th -> {
                LOGGER.error("Failed to recover consensus from {}", this.storageDir, th);
                return null;
            });
        } else {
            if (this.storageDir.mkdirs()) {
                return;
            }
            LOGGER.warn("Unable to create consensus dir at {}", this.storageDir);
            throw new IOException(String.format("Unable to create consensus dir at %s", this.storageDir));
        }
    }

    private void checkPeerListAndStartIfEligible(ConsensusGroupId consensusGroupId, PipeConsensusServerImpl pipeConsensusServerImpl) throws IOException {
        BiConsumer biConsumer = (consensusGroupId2, list) -> {
            try {
                resetPeerList(consensusGroupId2, list);
            } catch (ConsensusGroupNotExistException e) {
            } catch (Exception e2) {
                LOGGER.warn("Failed to reset peer list while start", e2);
            }
        };
        if (this.correctPeerListBeforeStart == null) {
            pipeConsensusServerImpl.start(true);
        } else if (!this.correctPeerListBeforeStart.containsKey(consensusGroupId)) {
            biConsumer.accept(consensusGroupId, Collections.emptyList());
        } else {
            biConsumer.accept(consensusGroupId, this.correctPeerListBeforeStart.get(consensusGroupId));
            pipeConsensusServerImpl.start(true);
        }
    }

    @Override // org.apache.iotdb.consensus.IConsensus
    public synchronized void stop() {
        this.asyncClientManager.close();
        this.syncClientManager.close();
        this.registerManager.deregisterAll();
        this.consensusPipeGuardian.stop();
        this.stateMachineMap.values().parallelStream().forEach((v0) -> {
            v0.stop();
        });
    }

    private void checkAllConsensusPipe() {
        Map map = (Map) this.consensusPipeManager.getAllConsensusPipe().entrySet().stream().filter(entry -> {
            return ((ConsensusPipeName) entry.getKey()).getSenderDataNodeId() == this.thisNodeId;
        }).collect(Collectors.groupingBy(entry2 -> {
            return ((ConsensusPipeName) entry2.getKey()).getConsensusGroupId();
        }, Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        })));
        try {
            this.stateMachineMapLock.lock();
            this.stateMachineMap.forEach((consensusGroupId, pipeConsensusServerImpl) -> {
                pipeConsensusServerImpl.checkConsensusPipe((Map) map.getOrDefault(consensusGroupId, ImmutableMap.of()));
            });
            map.entrySet().stream().filter(entry3 -> {
                return !this.stateMachineMap.containsKey(entry3.getKey());
            }).flatMap(entry4 -> {
                return ((Map) entry4.getValue()).keySet().stream();
            }).forEach(consensusPipeName -> {
                try {
                    LOGGER.warn("{} drop consensus pipe [{}]", consensusPipeName.getConsensusGroupId(), consensusPipeName);
                    this.consensusPipeManager.updateConsensusPipe(consensusPipeName, PipeStatus.DROPPED);
                } catch (Exception e) {
                    LOGGER.warn("{} cannot drop consensus pipe [{}]", new Object[]{consensusPipeName.getConsensusGroupId(), consensusPipeName, e});
                }
            });
        } finally {
            this.stateMachineMapLock.unlock();
        }
    }

    @Override // org.apache.iotdb.consensus.IConsensus
    public TSStatus write(ConsensusGroupId consensusGroupId, IConsensusRequest iConsensusRequest) throws ConsensusException {
        PipeConsensusServerImpl pipeConsensusServerImpl = (PipeConsensusServerImpl) Optional.ofNullable(this.stateMachineMap.get(consensusGroupId)).orElseThrow(() -> {
            return new ConsensusGroupNotExistException(consensusGroupId);
        });
        return pipeConsensusServerImpl.isReadOnly() ? StatusUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY) : !pipeConsensusServerImpl.isActive() ? RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT, "peer is inactive and not ready to receive sync log request.") : pipeConsensusServerImpl.write(iConsensusRequest);
    }

    @Override // org.apache.iotdb.consensus.IConsensus
    public DataSet read(ConsensusGroupId consensusGroupId, IConsensusRequest iConsensusRequest) throws ConsensusException {
        return ((PipeConsensusServerImpl) Optional.ofNullable(this.stateMachineMap.get(consensusGroupId)).orElseThrow(() -> {
            return new ConsensusGroupNotExistException(consensusGroupId);
        })).read(iConsensusRequest);
    }

    private String getPeerDir(ConsensusGroupId consensusGroupId) {
        return this.storageDir + File.separator + consensusGroupId.getType().getValue() + "_" + consensusGroupId.getId();
    }

    private ConsensusGroupId parsePeerFileName(String str) {
        String[] split = str.split("_");
        return ConsensusGroupId.Factory.create(Integer.parseInt(split[0]), Integer.parseInt(split[1]));
    }

    @Override // org.apache.iotdb.consensus.IConsensus
    public void createLocalPeer(ConsensusGroupId consensusGroupId, List<Peer> list) throws ConsensusException {
        int size = list.size();
        if (size == 0) {
            throw new IllegalPeerNumException(size);
        }
        if (!list.contains(new Peer(consensusGroupId, this.thisNodeId, this.thisNode))) {
            throw new IllegalPeerEndpointException(this.thisNode, list);
        }
        try {
            if (this.stateMachineMap.containsKey(consensusGroupId)) {
                throw new ConsensusGroupAlreadyExistException(consensusGroupId);
            }
            try {
                this.stateMachineMapLock.lock();
                String peerDir = getPeerDir(consensusGroupId);
                if (!new File(peerDir).mkdirs()) {
                    LOGGER.warn("Unable to create consensus dir for group {} at {}", consensusGroupId, peerDir);
                    throw new ConsensusException(String.format("Unable to create consensus dir for group %s", consensusGroupId));
                }
                PipeConsensusServerImpl pipeConsensusServerImpl = new PipeConsensusServerImpl(new Peer(consensusGroupId, this.thisNodeId, this.thisNode), this.registry.apply(consensusGroupId), peerDir, list, this.config, this.consensusPipeManager, this.syncClientManager);
                this.stateMachineMap.put(consensusGroupId, pipeConsensusServerImpl);
                pipeConsensusServerImpl.start(false);
                this.stateMachineMapLock.unlock();
            } catch (IOException e) {
                LOGGER.warn("Cannot create local peer for group {} with peers {}", new Object[]{consensusGroupId, list, e});
                throw new ConsensusException(e);
            }
        } catch (Throwable th) {
            this.stateMachineMapLock.unlock();
            throw th;
        }
    }

    @Override // org.apache.iotdb.consensus.IConsensus
    public void deleteLocalPeer(ConsensusGroupId consensusGroupId) throws ConsensusException {
        if (!this.stateMachineMap.containsKey(consensusGroupId)) {
            throw new ConsensusGroupNotExistException(consensusGroupId);
        }
        try {
            try {
                this.stateMachineMapLock.lock();
                this.stateMachineMap.get(consensusGroupId).clear();
                FileUtils.deleteFileOrDirectory(new File(getPeerDir(consensusGroupId)));
                this.stateMachineMapLock.unlock();
            } catch (IOException e) {
                LOGGER.warn("Cannot delete local peer for group {}", consensusGroupId, e);
                throw new ConsensusException(e);
            }
        } catch (Throwable th) {
            this.stateMachineMapLock.unlock();
            throw th;
        }
    }

    @Override // org.apache.iotdb.consensus.IConsensus
    public void addRemotePeer(ConsensusGroupId consensusGroupId, Peer peer) throws ConsensusException {
        PipeConsensusServerImpl pipeConsensusServerImpl = (PipeConsensusServerImpl) Optional.ofNullable(this.stateMachineMap.get(consensusGroupId)).orElseThrow(() -> {
            return new ConsensusGroupNotExistException(consensusGroupId);
        });
        if (pipeConsensusServerImpl.containsPeer(peer)) {
            throw new PeerAlreadyInConsensusGroupException(consensusGroupId, peer);
        }
        try {
            LOGGER.info("[{}] inactivate new peer: {}", CLASS_NAME, peer);
            pipeConsensusServerImpl.setRemotePeerActive(peer, false);
            LOGGER.info("[{}] notify current peers to create consensus pipes...", CLASS_NAME);
            pipeConsensusServerImpl.notifyPeersToCreateConsensusPipes(peer);
            LOGGER.info("[{}] wait until all the other peers finish transferring...", CLASS_NAME);
            pipeConsensusServerImpl.waitPeersToTargetPeerTransmissionCompleted(peer);
            LOGGER.info("[{}] activate new peer...", CLASS_NAME);
            pipeConsensusServerImpl.setRemotePeerActive(peer, true);
        } catch (ConsensusGroupModifyPeerException e) {
            try {
                LOGGER.info("[{}] add remote peer failed, automatic cleanup side effects...", CLASS_NAME);
                pipeConsensusServerImpl.notifyPeersToDropConsensusPipe(peer);
            } catch (ConsensusGroupModifyPeerException e2) {
                LOGGER.error("[{}] failed to cleanup side effects after failed to add remote peer", CLASS_NAME, e2);
            }
            throw new ConsensusException(e);
        }
    }

    @Override // org.apache.iotdb.consensus.IConsensus
    public void removeRemotePeer(ConsensusGroupId consensusGroupId, Peer peer) throws ConsensusException {
        PipeConsensusServerImpl pipeConsensusServerImpl = (PipeConsensusServerImpl) Optional.ofNullable(this.stateMachineMap.get(consensusGroupId)).orElseThrow(() -> {
            return new ConsensusGroupNotExistException(consensusGroupId);
        });
        if (!pipeConsensusServerImpl.containsPeer(peer)) {
            throw new PeerNotInConsensusGroupException(consensusGroupId, peer.toString());
        }
        try {
            pipeConsensusServerImpl.notifyPeersToDropConsensusPipe(peer);
            pipeConsensusServerImpl.setRemotePeerActive(peer, false);
            pipeConsensusServerImpl.waitTargetPeerToPeersTransmissionCompleted(peer);
        } catch (ConsensusGroupModifyPeerException e) {
            throw new ConsensusException(e.getMessage());
        }
    }

    @Override // org.apache.iotdb.consensus.IConsensus
    public void recordCorrectPeerListBeforeStarting(Map<ConsensusGroupId, List<Peer>> map) {
        LOGGER.info("Record correct peer list: {}", map);
        this.correctPeerListBeforeStart = map;
    }

    @Override // org.apache.iotdb.consensus.IConsensus
    public void resetPeerList(ConsensusGroupId consensusGroupId, List<Peer> list) throws ConsensusException {
        PipeConsensusServerImpl pipeConsensusServerImpl = (PipeConsensusServerImpl) Optional.ofNullable(this.stateMachineMap.get(consensusGroupId)).orElseThrow(() -> {
            return new ConsensusGroupNotExistException(consensusGroupId);
        });
        if (!list.contains(new Peer(consensusGroupId, this.thisNodeId, this.thisNode))) {
            LOGGER.warn("[RESET PEER LIST] Local peer is not in the correct configuration, delete local peer {}", consensusGroupId);
            deleteLocalPeer(consensusGroupId);
            return;
        }
        ImmutableList copyOf = ImmutableList.copyOf(pipeConsensusServerImpl.getPeers());
        String obj = pipeConsensusServerImpl.getPeers().toString();
        UnmodifiableIterator it = copyOf.iterator();
        while (it.hasNext()) {
            Peer peer = (Peer) it.next();
            if (!list.contains(peer)) {
                try {
                    pipeConsensusServerImpl.dropConsensusPipeToTargetPeer(peer);
                    LOGGER.info("[RESET PEER LIST] Remove sync channel with: {}", peer);
                } catch (ConsensusGroupModifyPeerException e) {
                    LOGGER.error("[RESET PEER LIST] Failed to remove sync channel with: {}", peer, e);
                }
            }
        }
        for (Peer peer2 : list) {
            if (!pipeConsensusServerImpl.containsPeer(peer2) && peer2.getNodeId() != this.thisNodeId) {
                try {
                    pipeConsensusServerImpl.createConsensusPipeToTargetPeer(peer2);
                    LOGGER.info("[RESET PEER LIST] Build sync channel with: {}", peer2);
                } catch (ConsensusGroupModifyPeerException e2) {
                    LOGGER.warn("[RESET PEER LIST] Failed to build sync channel with: {}", peer2, e2);
                }
            }
        }
        if (obj.equals(pipeConsensusServerImpl.getPeers().toString())) {
            LOGGER.info("[RESET PEER LIST] The current peer list is correct, nothing need to be reset: {}", obj);
        } else {
            LOGGER.info("[RESET PEER LIST] Local peer list has been reset: {} -> {}", obj, pipeConsensusServerImpl.getPeers());
        }
    }

    @Override // org.apache.iotdb.consensus.IConsensus
    public void transferLeader(ConsensusGroupId consensusGroupId, Peer peer) throws ConsensusException {
        throw new ConsensusException(String.format("%s does not support leader transfer", CLASS_NAME));
    }

    @Override // org.apache.iotdb.consensus.IConsensus
    public void triggerSnapshot(ConsensusGroupId consensusGroupId, boolean z) throws ConsensusException {
        if (!this.stateMachineMap.containsKey(consensusGroupId)) {
            throw new ConsensusGroupNotExistException(consensusGroupId);
        }
    }

    @Override // org.apache.iotdb.consensus.IConsensus
    public boolean isLeader(ConsensusGroupId consensusGroupId) {
        return true;
    }

    @Override // org.apache.iotdb.consensus.IConsensus
    public long getLogicalClock(ConsensusGroupId consensusGroupId) {
        return 0L;
    }

    @Override // org.apache.iotdb.consensus.IConsensus
    public boolean isLeaderReady(ConsensusGroupId consensusGroupId) {
        return true;
    }

    @Override // org.apache.iotdb.consensus.IConsensus
    public Peer getLeader(ConsensusGroupId consensusGroupId) {
        if (this.stateMachineMap.containsKey(consensusGroupId)) {
            return new Peer(consensusGroupId, this.thisNodeId, this.thisNode);
        }
        return null;
    }

    @Override // org.apache.iotdb.consensus.IConsensus
    public int getReplicationNum(ConsensusGroupId consensusGroupId) {
        PipeConsensusServerImpl pipeConsensusServerImpl = this.stateMachineMap.get(consensusGroupId);
        if (pipeConsensusServerImpl != null) {
            return pipeConsensusServerImpl.getPeers().size();
        }
        return 0;
    }

    @Override // org.apache.iotdb.consensus.IConsensus
    public List<ConsensusGroupId> getAllConsensusGroupIds() {
        return new ArrayList(this.stateMachineMap.keySet());
    }

    @Override // org.apache.iotdb.consensus.IConsensus
    public String getRegionDirFromConsensusGroupId(ConsensusGroupId consensusGroupId) {
        return getPeerDir(consensusGroupId);
    }

    @Override // org.apache.iotdb.consensus.IConsensus
    public void reloadConsensusConfig(ConsensusConfig consensusConfig) {
    }

    public PipeConsensusServerImpl getImpl(ConsensusGroupId consensusGroupId) {
        return this.stateMachineMap.get(consensusGroupId);
    }

    public int getPipeCount() {
        return this.consensusPipeManager.getAllConsensusPipe().size();
    }
}
