package org.apache.iotdb.consensus.pipe;

import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncPipeConsensusServiceClient;
import org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.config.PipeConsensusConfig;
import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeManager;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
import org.apache.iotdb.consensus.pipe.consensuspipe.ProgressIndexManager;
import org.apache.iotdb.consensus.pipe.metric.PipeConsensusServerMetrics;
import org.apache.iotdb.consensus.pipe.thrift.TCheckConsensusPipeCompletedReq;
import org.apache.iotdb.consensus.pipe.thrift.TCheckConsensusPipeCompletedResp;
import org.apache.iotdb.consensus.pipe.thrift.TNotifyPeerToCreateConsensusPipeReq;
import org.apache.iotdb.consensus.pipe.thrift.TNotifyPeerToDropConsensusPipeReq;
import org.apache.iotdb.consensus.pipe.thrift.TSetActiveReq;
import org.apache.iotdb.consensus.pipe.thrift.TSetActiveResp;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.RpcUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.class */
public class PipeConsensusServerImpl {
    private static final long CHECK_TRANSMISSION_COMPLETION_INTERVAL_IN_MILLISECONDS = 2000;
    private static final long RETRY_WAIT_TIME_IN_MS = 500;
    private static final long MAX_RETRY_TIMES = 20;
    private final Peer thisNode;
    private final IStateMachine stateMachine;
    private final PipeConsensusPeerManager peerManager;
    private final String consensusGroupId;
    private final ConsensusPipeManager consensusPipeManager;
    private final ProgressIndexManager progressIndexManager;
    private final IClientManager<TEndPoint, SyncPipeConsensusServiceClient> syncClientManager;
    private final PipeConsensusConfig.ReplicateMode replicateMode;
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeConsensusServerImpl.class);
    private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS = PerformanceOverviewMetrics.getInstance();
    private final Lock stateMachineLock = new ReentrantLock();
    private ProgressIndex cachedProgressIndex = MinimumProgressIndex.INSTANCE;
    private final AtomicBoolean active = new AtomicBoolean(true);
    private final AtomicBoolean isStarted = new AtomicBoolean(false);
    private final PipeConsensusServerMetrics pipeConsensusServerMetrics = new PipeConsensusServerMetrics(this);

    public PipeConsensusServerImpl(Peer peer, IStateMachine iStateMachine, String str, List<Peer> list, PipeConsensusConfig pipeConsensusConfig, ConsensusPipeManager consensusPipeManager, IClientManager<TEndPoint, SyncPipeConsensusServiceClient> iClientManager) throws IOException {
        this.thisNode = peer;
        this.stateMachine = iStateMachine;
        this.peerManager = new PipeConsensusPeerManager(str, list);
        this.consensusGroupId = peer.getGroupId().toString();
        this.consensusPipeManager = consensusPipeManager;
        this.progressIndexManager = pipeConsensusConfig.getPipe().getProgressIndexManager();
        this.syncClientManager = iClientManager;
        this.replicateMode = pipeConsensusConfig.getReplicateMode();
        if (list.isEmpty()) {
            this.peerManager.recover();
            return;
        }
        List<Peer> list2 = (List) list.stream().filter(peer2 -> {
            return !peer2.equals(peer);
        }).collect(Collectors.toList());
        List<Peer> createConsensusPipes = createConsensusPipes(list2);
        if (createConsensusPipes.size() < list2.size()) {
            updateConsensusPipesStatus(createConsensusPipes, PipeStatus.DROPPED);
            throw new IOException(String.format("%s cannot create all consensus pipes", peer));
        }
        try {
            this.peerManager.persistAll();
        } catch (Exception e) {
            LOGGER.warn("{} cannot persist all peers", peer, e);
            this.peerManager.deleteAllFiles();
            updateConsensusPipesStatus(createConsensusPipes, PipeStatus.DROPPED);
            throw e;
        }
    }

    public synchronized void start(boolean z) throws IOException {
        this.stateMachine.start();
        MetricService.getInstance().addMetricSet(this.pipeConsensusServerMetrics);
        if (z) {
            List<Peer> otherPeers = this.peerManager.getOtherPeers(this.thisNode);
            List<Peer> updateConsensusPipesStatus = updateConsensusPipesStatus(new ArrayList(otherPeers), PipeStatus.RUNNING);
            for (int i = 0; i < MAX_RETRY_TIMES && !updateConsensusPipesStatus.isEmpty(); i++) {
                try {
                    updateConsensusPipesStatus = updateConsensusPipesStatus(updateConsensusPipesStatus, PipeStatus.RUNNING);
                    Thread.sleep(RETRY_WAIT_TIME_IN_MS);
                } catch (InterruptedException e) {
                    LOGGER.warn("PipeConsensusImpl-peer{}: pipeConsensusImpl thread get interrupted when start consensus pipe. May because IoTDB process is killed.", this.thisNode);
                    throw new IOException(String.format("%s cannot start all consensus pipes", this.thisNode));
                }
            }
            if (!updateConsensusPipesStatus.isEmpty()) {
                ArrayList arrayList = new ArrayList(otherPeers);
                arrayList.removeAll(updateConsensusPipesStatus);
                updateConsensusPipesStatus(arrayList, PipeStatus.STOPPED);
                throw new IOException(String.format("%s cannot start all consensus pipes", this.thisNode));
            }
        }
        this.isStarted.set(true);
    }

    public synchronized void stop() {
        if (!updateConsensusPipesStatus(new ArrayList(this.peerManager.getOtherPeers(this.thisNode)), PipeStatus.STOPPED).isEmpty()) {
            LOGGER.warn("{} cannot stop all consensus pipes", this.thisNode);
        }
        MetricService.getInstance().removeMetricSet(this.pipeConsensusServerMetrics);
        this.stateMachine.stop();
        this.isStarted.set(false);
    }

    public synchronized void clear() throws IOException {
        if (!updateConsensusPipesStatus(new ArrayList(this.peerManager.getOtherPeers(this.thisNode)), PipeStatus.DROPPED).isEmpty()) {
            LOGGER.warn("{} cannot drop all consensus pipes", this.thisNode);
        }
        MetricService.getInstance().removeMetricSet(this.pipeConsensusServerMetrics);
        this.peerManager.clear();
        this.stateMachine.stop();
        this.isStarted.set(false);
        this.active.set(false);
    }

    private List<Peer> createConsensusPipes(List<Peer> list) {
        return (List) list.stream().filter(peer -> {
            try {
                if (list.equals(this.thisNode)) {
                    return true;
                }
                this.consensusPipeManager.createConsensusPipe(this.thisNode, peer);
                return true;
            } catch (Exception e) {
                LOGGER.warn("{}: cannot create consensus pipe between {} and {}", new Object[]{e.getMessage(), this.thisNode, peer});
                return false;
            }
        }).collect(Collectors.toList());
    }

    private List<Peer> updateConsensusPipesStatus(List<Peer> list, PipeStatus pipeStatus) {
        return (List) list.stream().filter(peer -> {
            try {
                if (peer.equals(this.thisNode)) {
                    return false;
                }
                this.consensusPipeManager.updateConsensusPipe(new ConsensusPipeName(this.thisNode, peer), pipeStatus);
                return false;
            } catch (Exception e) {
                LOGGER.warn("{}: cannot update consensus pipe between {} and {} to status {}", new Object[]{e.getMessage(), this.thisNode, peer, pipeStatus});
                return true;
            }
        }).collect(Collectors.toList());
    }

    public synchronized void checkConsensusPipe(Map<ConsensusPipeName, PipeStatus> map) {
        PipeStatus pipeStatus = this.isStarted.get() ? PipeStatus.RUNNING : PipeStatus.STOPPED;
        Map map2 = (Map) this.peerManager.getOtherPeers(this.thisNode).stream().collect(ImmutableMap.toImmutableMap(peer -> {
            return new ConsensusPipeName(this.thisNode, peer);
        }, peer2 -> {
            return peer2;
        }));
        map.forEach((consensusPipeName, pipeStatus2) -> {
            if (!map2.containsKey(consensusPipeName)) {
                try {
                    LOGGER.warn("{} drop consensus pipe [{}]", this.consensusGroupId, consensusPipeName);
                    this.consensusPipeManager.updateConsensusPipe(consensusPipeName, PipeStatus.DROPPED);
                    return;
                } catch (Exception e) {
                    LOGGER.warn("{} cannot drop consensus pipe [{}]", new Object[]{this.consensusGroupId, consensusPipeName, e});
                    return;
                }
            }
            if (pipeStatus.equals(pipeStatus2)) {
                return;
            }
            try {
                LOGGER.warn("{} update consensus pipe [{}] to status {}", new Object[]{this.consensusGroupId, consensusPipeName, pipeStatus});
                if (pipeStatus.equals(PipeStatus.RUNNING)) {
                    return;
                }
                this.consensusPipeManager.updateConsensusPipe(consensusPipeName, pipeStatus);
            } catch (Exception e2) {
                LOGGER.warn("{} cannot update consensus pipe [{}] to status {}", new Object[]{this.consensusGroupId, consensusPipeName, pipeStatus, e2});
            }
        });
        map2.forEach((consensusPipeName2, peer3) -> {
            if (map.containsKey(consensusPipeName2)) {
                return;
            }
            try {
                LOGGER.warn("{} create and update consensus pipe [{}] to status {}", new Object[]{this.consensusGroupId, consensusPipeName2, pipeStatus});
                this.consensusPipeManager.createConsensusPipe(this.thisNode, peer3);
                this.consensusPipeManager.updateConsensusPipe(consensusPipeName2, pipeStatus);
            } catch (Exception e) {
                LOGGER.warn("{} cannot create and update consensus pipe [{}] to status {}", new Object[]{this.consensusGroupId, consensusPipeName2, pipeStatus, e});
            }
        });
    }

    public TSStatus write(IConsensusRequest iConsensusRequest) {
        try {
            long nanoTime = System.nanoTime();
            this.stateMachineLock.lock();
            this.pipeConsensusServerMetrics.recordGetStateMachineLockTime(System.nanoTime() - nanoTime);
            long nanoTime2 = System.nanoTime();
            if (iConsensusRequest instanceof ComparableConsensusRequest) {
                ((ComparableConsensusRequest) iConsensusRequest).setProgressIndex(this.progressIndexManager.assignProgressIndex(this.thisNode.getGroupId()));
            }
            TSStatus write = this.stateMachine.write(iConsensusRequest);
            long nanoTime3 = System.nanoTime();
            PERFORMANCE_OVERVIEW_METRICS.recordEngineCost(nanoTime3 - nanoTime2);
            this.pipeConsensusServerMetrics.recordUserWriteStateMachineTime(nanoTime3 - nanoTime2);
            this.stateMachineLock.unlock();
            return write;
        } catch (Throwable th) {
            this.stateMachineLock.unlock();
            throw th;
        }
    }

    public TSStatus writeOnFollowerReplica(IConsensusRequest iConsensusRequest) {
        try {
            long nanoTime = System.nanoTime();
            this.stateMachineLock.lock();
            this.pipeConsensusServerMetrics.recordGetStateMachineLockTime(System.nanoTime() - nanoTime);
            long nanoTime2 = System.nanoTime();
            TSStatus write = this.stateMachine.write(iConsensusRequest);
            long nanoTime3 = System.nanoTime();
            PERFORMANCE_OVERVIEW_METRICS.recordEngineCost(nanoTime3 - nanoTime2);
            this.pipeConsensusServerMetrics.recordReplicaWriteStateMachineTime(nanoTime3 - nanoTime2);
            this.stateMachineLock.unlock();
            return write;
        } catch (Throwable th) {
            this.stateMachineLock.unlock();
            throw th;
        }
    }

    public DataSet read(IConsensusRequest iConsensusRequest) {
        return this.stateMachine.read(iConsensusRequest);
    }

    public void setRemotePeerActive(Peer peer, boolean z) throws ConsensusGroupModifyPeerException {
        try {
            SyncPipeConsensusServiceClient syncPipeConsensusServiceClient = (SyncPipeConsensusServiceClient) this.syncClientManager.borrowClient(peer.getEndpoint());
            try {
                try {
                    TSetActiveResp active = syncPipeConsensusServiceClient.setActive(new TSetActiveReq(peer.getGroupId().convertToTConsensusGroupId(), z));
                    if (!RpcUtils.SUCCESS_STATUS.equals(active.getStatus())) {
                        throw new ConsensusGroupModifyPeerException(String.format("error when set peer %s to active %s. result status: %s", peer, Boolean.valueOf(z), active.getStatus()));
                    }
                    if (syncPipeConsensusServiceClient != null) {
                        syncPipeConsensusServiceClient.close();
                    }
                } catch (Exception e) {
                    throw new ConsensusGroupModifyPeerException(String.format("error when set peer %s to active %s", peer, Boolean.valueOf(z)), e);
                }
            } finally {
            }
        } catch (ClientManagerException e2) {
            throw new ConsensusGroupModifyPeerException((Throwable) e2);
        }
    }

    public void notifyPeersToCreateConsensusPipes(Peer peer) throws ConsensusGroupModifyPeerException {
        SyncPipeConsensusServiceClient syncPipeConsensusServiceClient;
        Exception exc = null;
        for (Peer peer2 : this.peerManager.getOtherPeers(this.thisNode)) {
            try {
                syncPipeConsensusServiceClient = (SyncPipeConsensusServiceClient) this.syncClientManager.borrowClient(peer2.getEndpoint());
                try {
                } finally {
                }
            } catch (Exception e) {
                exc = e;
                LOGGER.warn("{} cannot notify peer {} to create consensus pipe", new Object[]{this.thisNode, peer2, e});
            }
            if (!RpcUtils.SUCCESS_STATUS.equals(syncPipeConsensusServiceClient.notifyPeerToCreateConsensusPipe(new TNotifyPeerToCreateConsensusPipeReq(peer.getGroupId().convertToTConsensusGroupId(), peer.getEndpoint(), peer.getNodeId())).getStatus())) {
                throw new ConsensusGroupModifyPeerException(String.format("error when notify peer %s to create consensus pipe", peer2));
                break;
            } else if (syncPipeConsensusServiceClient != null) {
                syncPipeConsensusServiceClient.close();
            }
        }
        createConsensusPipeToTargetPeer(peer);
        if (exc != null) {
            throw new ConsensusGroupModifyPeerException(exc);
        }
    }

    public synchronized void createConsensusPipeToTargetPeer(Peer peer) throws ConsensusGroupModifyPeerException {
        try {
            this.consensusPipeManager.createConsensusPipe(this.thisNode, peer);
            this.peerManager.addAndPersist(peer);
        } catch (IOException e) {
            LOGGER.warn("{} cannot persist peer {}", new Object[]{this.thisNode, peer, e});
            throw new ConsensusGroupModifyPeerException(String.format("%s cannot persist peer %s", this.thisNode, peer), e);
        } catch (Exception e2) {
            LOGGER.warn("{} cannot create consensus pipe to {}", new Object[]{this.thisNode, peer, e2});
            throw new ConsensusGroupModifyPeerException(String.format("%s cannot create consensus pipe to %s", this.thisNode, peer), e2);
        }
    }

    public void notifyPeersToDropConsensusPipe(Peer peer) throws ConsensusGroupModifyPeerException {
        SyncPipeConsensusServiceClient syncPipeConsensusServiceClient;
        Exception exc = null;
        for (Peer peer2 : this.peerManager.getOtherPeers(this.thisNode)) {
            try {
                syncPipeConsensusServiceClient = (SyncPipeConsensusServiceClient) this.syncClientManager.borrowClient(peer2.getEndpoint());
                try {
                } finally {
                }
            } catch (Exception e) {
                exc = e;
                LOGGER.warn("{} cannot notify peer {} to drop consensus pipe", new Object[]{this.thisNode, peer2, e});
            }
            if (!RpcUtils.SUCCESS_STATUS.equals(syncPipeConsensusServiceClient.notifyPeerToDropConsensusPipe(new TNotifyPeerToDropConsensusPipeReq(peer.getGroupId().convertToTConsensusGroupId(), peer.getEndpoint(), peer.getNodeId())).getStatus())) {
                throw new ConsensusGroupModifyPeerException(String.format("error when notify peer %s to drop consensus pipe", peer2));
                break;
            } else if (syncPipeConsensusServiceClient != null) {
                syncPipeConsensusServiceClient.close();
            }
        }
        dropConsensusPipeToTargetPeer(peer);
        if (exc != null) {
            throw new ConsensusGroupModifyPeerException(exc);
        }
    }

    public synchronized void dropConsensusPipeToTargetPeer(Peer peer) throws ConsensusGroupModifyPeerException {
        try {
            this.consensusPipeManager.dropConsensusPipe(this.thisNode, peer);
            this.peerManager.removeAndPersist(peer);
        } catch (IOException e) {
            LOGGER.warn("{} cannot persist peer {}", new Object[]{this.thisNode, peer, e});
            throw new ConsensusGroupModifyPeerException(String.format("%s cannot persist peer %s", this.thisNode, peer), e);
        } catch (Exception e2) {
            LOGGER.warn("{} cannot drop consensus pipe to {}", new Object[]{this.thisNode, peer, e2});
            throw new ConsensusGroupModifyPeerException(String.format("%s cannot drop consensus pipe to %s", this.thisNode, peer), e2);
        }
    }

    public void waitPeersToTargetPeerTransmissionCompleted(Peer peer) throws ConsensusGroupModifyPeerException {
        boolean z = false;
        boolean z2 = true;
        while (!z) {
            try {
                Thread.sleep(CHECK_TRANSMISSION_COMPLETION_INTERVAL_IN_MILLISECONDS);
                if (isConsensusPipesTransmissionCompleted(Collections.singletonList(new ConsensusPipeName(this.thisNode, peer).toString()), z2)) {
                    z = true;
                    for (Peer peer2 : this.peerManager.getOtherPeers(this.thisNode)) {
                        z &= isRemotePeerConsensusPipesTransmissionCompleted(peer2, Collections.singletonList(new ConsensusPipeName(peer2, peer).toString()), z2);
                    }
                }
                z2 = false;
            } catch (InterruptedException e) {
                LOGGER.warn("{} is interrupted when waiting for transfer completed", this.thisNode, e);
                Thread.currentThread().interrupt();
                throw new ConsensusGroupModifyPeerException(String.format("%s is interrupted when waiting for transfer completed", this.thisNode), e);
            }
        }
    }

    public void waitTargetPeerToPeersTransmissionCompleted(Peer peer) throws ConsensusGroupModifyPeerException {
        boolean z = false;
        boolean z2 = true;
        while (!z) {
            try {
                Thread.sleep(CHECK_TRANSMISSION_COMPLETION_INTERVAL_IN_MILLISECONDS);
                z = isRemotePeerConsensusPipesTransmissionCompleted(peer, (List) this.peerManager.getPeers().stream().map(peer2 -> {
                    return new ConsensusPipeName(peer, peer2).toString();
                }).collect(Collectors.toList()), z2);
                z2 = false;
            } catch (InterruptedException e) {
                LOGGER.warn("{} is interrupted when waiting for transfer completed", this.thisNode, e);
                Thread.currentThread().interrupt();
                throw new ConsensusGroupModifyPeerException(String.format("%s is interrupted when waiting for transfer completed", this.thisNode), e);
            }
        }
    }

    private boolean isRemotePeerConsensusPipesTransmissionCompleted(Peer peer, List<String> list, boolean z) throws ConsensusGroupModifyPeerException {
        try {
            SyncPipeConsensusServiceClient syncPipeConsensusServiceClient = (SyncPipeConsensusServiceClient) this.syncClientManager.borrowClient(peer.getEndpoint());
            try {
                TCheckConsensusPipeCompletedResp checkConsensusPipeCompleted = syncPipeConsensusServiceClient.checkConsensusPipeCompleted(new TCheckConsensusPipeCompletedReq(this.thisNode.getGroupId().convertToTConsensusGroupId(), list, z));
                if (!RpcUtils.SUCCESS_STATUS.equals(checkConsensusPipeCompleted.getStatus())) {
                    LOGGER.warn("{} cannot check consensus pipes transmission completed to peer {}", this.thisNode, peer);
                    throw new ConsensusGroupModifyPeerException(String.format("error when check consensus pipes transmission completed to peer %s", peer));
                }
                boolean z2 = checkConsensusPipeCompleted.isCompleted;
                if (syncPipeConsensusServiceClient != null) {
                    syncPipeConsensusServiceClient.close();
                }
                return z2;
            } finally {
            }
        } catch (Exception e) {
            LOGGER.warn("{} cannot check consensus pipes transmission completed", this.thisNode, e);
            throw new ConsensusGroupModifyPeerException(String.format("%s cannot check consensus pipes transmission completed", this.thisNode), e);
        }
    }

    public synchronized boolean isConsensusPipesTransmissionCompleted(List<String> list, boolean z) {
        if (z) {
            this.cachedProgressIndex = this.cachedProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex(this.progressIndexManager.getMaxAssignedProgressIndex(this.thisNode.getGroupId()));
        }
        try {
            return list.stream().noneMatch(str -> {
                return this.cachedProgressIndex.isAfter(this.progressIndexManager.getProgressIndex(new ConsensusPipeName(str)));
            });
        } catch (PipeException e) {
            LOGGER.info(e.getMessage());
            return false;
        }
    }

    public boolean isReadOnly() {
        return this.stateMachine.isReadOnly();
    }

    public boolean isActive() {
        return this.active.get();
    }

    public void setActive(boolean z) {
        LOGGER.info("set {} active status to {}", this.thisNode, Boolean.valueOf(z));
        this.active.set(z);
    }

    public boolean containsPeer(Peer peer) {
        return this.peerManager.contains(peer);
    }

    public List<Peer> getPeers() {
        return this.peerManager.getPeers();
    }

    public String getConsensusGroupId() {
        return this.consensusGroupId;
    }

    public long getReplicateMode() {
        return this.replicateMode == PipeConsensusConfig.ReplicateMode.BATCH ? 2L : 1L;
    }
}
