package org.apache.iotdb.consensus.iot;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.UnmodifiableIterator;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.PriorityQueue;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.commons.utils.KillPoint.DataNodeKillPoints;
import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.DeserializedBatchIndexedConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.config.IoTConsensusConfig;
import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
import org.apache.iotdb.consensus.iot.client.AsyncIoTConsensusServiceClient;
import org.apache.iotdb.consensus.iot.client.SyncIoTConsensusServiceClient;
import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
import org.apache.iotdb.consensus.iot.log.GetConsensusReqReaderPlan;
import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcher;
import org.apache.iotdb.consensus.iot.snapshot.IoTConsensusRateLimiter;
import org.apache.iotdb.consensus.iot.snapshot.SnapshotFragmentReader;
import org.apache.iotdb.consensus.iot.thrift.TActivatePeerReq;
import org.apache.iotdb.consensus.iot.thrift.TActivatePeerRes;
import org.apache.iotdb.consensus.iot.thrift.TBuildSyncLogChannelReq;
import org.apache.iotdb.consensus.iot.thrift.TCleanupTransferredSnapshotReq;
import org.apache.iotdb.consensus.iot.thrift.TCleanupTransferredSnapshotRes;
import org.apache.iotdb.consensus.iot.thrift.TInactivatePeerReq;
import org.apache.iotdb.consensus.iot.thrift.TInactivatePeerRes;
import org.apache.iotdb.consensus.iot.thrift.TRemoveSyncLogChannelReq;
import org.apache.iotdb.consensus.iot.thrift.TSendSnapshotFragmentReq;
import org.apache.iotdb.consensus.iot.thrift.TTriggerSnapshotLoadReq;
import org.apache.iotdb.consensus.iot.thrift.TTriggerSnapshotLoadRes;
import org.apache.iotdb.consensus.iot.thrift.TWaitReleaseAllRegionRelatedResourceReq;
import org.apache.iotdb.consensus.iot.thrift.TWaitSyncLogCompleteReq;
import org.apache.iotdb.consensus.iot.thrift.TWaitSyncLogCompleteRes;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.class */
public class IoTConsensusServerImpl {
    public static final String SNAPSHOT_DIR_NAME = "snapshot";
    private static final Pattern SNAPSHOT_INDEX_PATTEN = Pattern.compile(".*[^\\d](?=(\\d+))");
    private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS = PerformanceOverviewMetrics.getInstance();
    private final Peer thisNode;
    private final IStateMachine stateMachine;
    private final String storageDir;
    private final TreeSet<Peer> configuration;
    private final AtomicLong searchIndex;
    private final LogDispatcher logDispatcher;
    private IoTConsensusConfig config;
    private final ConsensusReqReader consensusReqReader;
    private String newSnapshotDirName;
    private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager;
    private final String consensusGroupId;
    private final ScheduledExecutorService backgroundTaskService;
    private final Logger logger = LoggerFactory.getLogger(IoTConsensusServerImpl.class);
    private final Lock stateMachineLock = new ReentrantLock();
    private final Condition stateMachineCondition = this.stateMachineLock.newCondition();
    private final IoTConsensusRateLimiter ioTConsensusRateLimiter = IoTConsensusRateLimiter.getInstance();
    private volatile boolean active = true;
    private final ConcurrentHashMap<Integer, SyncLogCacheQueue> cacheQueueMap = new ConcurrentHashMap<>();
    private final IoTConsensusServerMetrics ioTConsensusServerMetrics = new IoTConsensusServerMetrics(this);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iotdb/consensus/iot/IoTConsensusServerImpl$SyncLogCacheQueue.class */
    public class SyncLogCacheQueue {
        private final int sourcePeerId;
        private final Lock queueLock = new ReentrantLock();
        private final Condition queueSortCondition = this.queueLock.newCondition();
        private long nextSyncIndex = -1;
        private final PriorityQueue<DeserializedBatchIndexedConsensusRequest> requestCache = new PriorityQueue<>();

        public SyncLogCacheQueue(int i) {
            this.sourcePeerId = i;
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* JADX WARN: Code restructure failed: missing block: B:42:0x0150, code lost:
        
            r11.this$0.logger.info("waiting target request timeout. current index: {}, target index: {}", java.lang.Long.valueOf(r12.getStartSyncIndex()), java.lang.Long.valueOf(r11.nextSyncIndex));
            r11.requestCache.remove(r12);
            r11.nextSyncIndex = java.lang.Math.max(r11.nextSyncIndex, r12.getEndSyncIndex() + 1);
         */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public org.apache.iotdb.common.rpc.thrift.TSStatus cacheAndInsertLatestNode(org.apache.iotdb.consensus.common.request.DeserializedBatchIndexedConsensusRequest r12) {
            /*
                Method dump skipped, instructions count: 698
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.iotdb.consensus.iot.IoTConsensusServerImpl.SyncLogCacheQueue.cacheAndInsertLatestNode(org.apache.iotdb.consensus.common.request.DeserializedBatchIndexedConsensusRequest):org.apache.iotdb.common.rpc.thrift.TSStatus");
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:org/apache/iotdb/consensus/iot/IoTConsensusServerImpl$ThrowableFunction.class */
    public interface ThrowableFunction<T, R> {
        R apply(T t) throws Exception;
    }

    public IoTConsensusServerImpl(String str, Peer peer, TreeSet<Peer> treeSet, IStateMachine iStateMachine, ScheduledExecutorService scheduledExecutorService, IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> iClientManager, IClientManager<TEndPoint, SyncIoTConsensusServiceClient> iClientManager2, IoTConsensusConfig ioTConsensusConfig) {
        this.storageDir = str;
        this.thisNode = peer;
        this.stateMachine = iStateMachine;
        this.syncClientManager = iClientManager2;
        this.configuration = treeSet;
        this.backgroundTaskService = scheduledExecutorService;
        this.config = ioTConsensusConfig;
        this.consensusGroupId = peer.getGroupId().toString();
        this.consensusReqReader = (ConsensusReqReader) iStateMachine.read(new GetConsensusReqReaderPlan());
        this.searchIndex = new AtomicLong(this.consensusReqReader.getCurrentSearchIndex());
        this.logDispatcher = new LogDispatcher(this, iClientManager);
    }

    public IStateMachine getStateMachine() {
        return this.stateMachine;
    }

    public void start() {
        checkAndUpdateIndex();
        MetricService.getInstance().addMetricSet(this.ioTConsensusServerMetrics);
        this.stateMachine.start();
        this.logDispatcher.start();
    }

    public void stop() {
        this.logDispatcher.stop();
        this.stateMachine.stop();
        MetricService.getInstance().removeMetricSet(this.ioTConsensusServerMetrics);
    }

    public TSStatus write(IConsensusRequest iConsensusRequest) {
        long nanoTime = System.nanoTime();
        this.stateMachineLock.lock();
        try {
            long nanoTime2 = System.nanoTime();
            this.ioTConsensusServerMetrics.recordGetStateMachineLockTime(nanoTime2 - nanoTime);
            if (needBlockWrite()) {
                this.logger.info("[Throttle Down] index:{}, safeIndex:{}", Long.valueOf(getSearchIndex()), Long.valueOf(getMinSyncIndex()));
                try {
                    if (!this.stateMachineCondition.await(this.config.getReplication().getThrottleTimeOutMs(), TimeUnit.MILLISECONDS)) {
                        TSStatus status = RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT, String.format("The write is rejected because the wal directory size has reached the threshold %d bytes. You may need to adjust the flush policy of the storage storageengine or the IoTConsensus synchronization parameter", Long.valueOf(this.config.getReplication().getWalThrottleThreshold())));
                        this.stateMachineLock.unlock();
                        return status;
                    }
                } catch (InterruptedException e) {
                    this.logger.error("Failed to throttle down because ", e);
                    Thread.currentThread().interrupt();
                }
            }
            long nanoTime3 = System.nanoTime();
            this.ioTConsensusServerMetrics.recordCheckingBeforeWriteTime(nanoTime3 - nanoTime2);
            IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest = buildIndexedConsensusRequestForLocalRequest(iConsensusRequest);
            if (buildIndexedConsensusRequestForLocalRequest.getSearchIndex() % 100000 == 0) {
                this.logger.info("DataRegion[{}]: index after build: safeIndex:{}, searchIndex: {}", new Object[]{this.thisNode.getGroupId(), Long.valueOf(getMinSyncIndex()), Long.valueOf(buildIndexedConsensusRequestForLocalRequest.getSearchIndex())});
            }
            IConsensusRequest deserializeRequest = this.stateMachine.deserializeRequest(buildIndexedConsensusRequestForLocalRequest);
            long nanoTime4 = System.nanoTime();
            TSStatus write = this.stateMachine.write(deserializeRequest);
            PERFORMANCE_OVERVIEW_METRICS.recordEngineCost(System.nanoTime() - nanoTime4);
            long nanoTime5 = System.nanoTime();
            this.ioTConsensusServerMetrics.recordWriteStateMachineTime(nanoTime5 - nanoTime3);
            if (write.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                synchronized (this.searchIndex) {
                    this.logDispatcher.offer(buildIndexedConsensusRequestForLocalRequest);
                    this.searchIndex.incrementAndGet();
                }
                this.ioTConsensusServerMetrics.recordOfferRequestToQueueTime(System.nanoTime() - nanoTime5);
            } else {
                this.logger.debug("{}: write operation failed. searchIndex: {}. Code: {}", new Object[]{this.thisNode.getGroupId(), Long.valueOf(buildIndexedConsensusRequestForLocalRequest.getSearchIndex()), Integer.valueOf(write.getCode())});
            }
            this.ioTConsensusServerMetrics.recordConsensusWriteTime(System.nanoTime() - nanoTime);
            this.stateMachineLock.unlock();
            return write;
        } catch (Throwable th) {
            this.stateMachineLock.unlock();
            throw th;
        }
    }

    public DataSet read(IConsensusRequest iConsensusRequest) {
        return this.stateMachine.read(iConsensusRequest);
    }

    public void takeSnapshot() throws ConsensusGroupModifyPeerException {
        try {
            this.newSnapshotDirName = String.format("%s_%s_%s", SNAPSHOT_DIR_NAME, Integer.valueOf(this.thisNode.getGroupId().getId()), UUID.randomUUID());
            File file = new File(this.storageDir, this.newSnapshotDirName);
            if (file.exists()) {
                FileUtils.deleteDirectory(file);
            }
            if (!file.mkdirs()) {
                throw new ConsensusGroupModifyPeerException(String.format("%s: cannot mkdir for snapshot", this.thisNode.getGroupId()));
            }
            if (!this.stateMachine.takeSnapshot(file)) {
                throw new ConsensusGroupModifyPeerException("unknown error when taking snapshot");
            }
            clearOldSnapshot();
        } catch (IOException e) {
            throw new ConsensusGroupModifyPeerException("error when taking snapshot", e);
        }
    }

    public void transmitSnapshot(Peer peer) throws ConsensusGroupModifyPeerException {
        TSendSnapshotFragmentReq tSendSnapshotFragmentReq;
        File file = new File(this.storageDir, this.newSnapshotDirName);
        List<File> snapshotFiles = this.stateMachine.getSnapshotFiles(file);
        AtomicLong atomicLong = new AtomicLong();
        StringBuilder sb = new StringBuilder();
        snapshotFiles.forEach(file2 -> {
            long length = file2.length();
            atomicLong.addAndGet(length);
            sb.append("\n").append(file2.getName()).append(" ").append(org.apache.iotdb.commons.utils.FileUtils.humanReadableByteCountSI(length));
        });
        long j = atomicLong.get();
        long j2 = 0;
        long j3 = 0;
        long nanoTime = System.nanoTime();
        this.logger.info("[SNAPSHOT TRANSMISSION] Start to transmit snapshots ({} files, total size {}) from dir {}", new Object[]{Integer.valueOf(snapshotFiles.size()), org.apache.iotdb.commons.utils.FileUtils.humanReadableByteCountSI(j), file});
        this.logger.info("[SNAPSHOT TRANSMISSION] All the files below shell be transmitted: {}", sb);
        try {
            SyncIoTConsensusServiceClient syncIoTConsensusServiceClient = (SyncIoTConsensusServiceClient) this.syncClientManager.borrowClient(peer.getEndpoint());
            try {
                for (File file3 : snapshotFiles) {
                    SnapshotFragmentReader snapshotFragmentReader = new SnapshotFragmentReader(this.newSnapshotDirName, file3.toPath());
                    do {
                        try {
                            if (snapshotFragmentReader.hasNext()) {
                                tSendSnapshotFragmentReq = snapshotFragmentReader.next().toTSendSnapshotFragmentReq();
                                tSendSnapshotFragmentReq.setConsensusGroupId(peer.getGroupId().convertToTConsensusGroupId());
                                this.ioTConsensusRateLimiter.acquireTransitDataSizeWithRateLimiter(tSendSnapshotFragmentReq.getChunkLength());
                            } else {
                                j2 += snapshotFragmentReader.getTotalReadSize();
                                j3++;
                                this.logger.info("[SNAPSHOT TRANSMISSION] The overall progress for dir {}: files {}/{} done, size {}/{} done, time {} passed. File {} done.", new Object[]{this.newSnapshotDirName, Long.valueOf(j3), Integer.valueOf(snapshotFiles.size()), org.apache.iotdb.commons.utils.FileUtils.humanReadableByteCountSI(j2), org.apache.iotdb.commons.utils.FileUtils.humanReadableByteCountSI(j), CommonDateTimeUtils.convertMillisecondToDurationStr((System.nanoTime() - nanoTime) / 1000000), file3});
                                snapshotFragmentReader.close();
                            }
                        } catch (Throwable th) {
                            snapshotFragmentReader.close();
                            throw th;
                        }
                    } while (isSuccess(syncIoTConsensusServiceClient.sendSnapshotFragment(tSendSnapshotFragmentReq).getStatus()));
                    throw new ConsensusGroupModifyPeerException(String.format("[SNAPSHOT TRANSMISSION] Error when transmitting snapshot fragment to %s", peer));
                }
                if (syncIoTConsensusServiceClient != null) {
                    syncIoTConsensusServiceClient.close();
                }
                this.logger.info("[SNAPSHOT TRANSMISSION] After {}, successfully transmit all snapshots from dir {}", CommonDateTimeUtils.convertMillisecondToDurationStr((System.nanoTime() - nanoTime) / 1000000), file);
            } finally {
            }
        } catch (Exception e) {
            throw new ConsensusGroupModifyPeerException(String.format("[SNAPSHOT TRANSMISSION] Error when send snapshot file to %s", peer), e);
        }
    }

    public void receiveSnapshotFragment(String str, String str2, ByteBuffer byteBuffer, long j) throws ConsensusGroupModifyPeerException {
        try {
            File file = new File(this.storageDir, calculateSnapshotPath(str, str2));
            Path path = Paths.get(file.getParent(), new String[0]);
            if (!Files.exists(path, new LinkOption[0])) {
                Files.createDirectories(path, new FileAttribute[0]);
            }
            FileOutputStream fileOutputStream = new FileOutputStream(file.getAbsolutePath(), true);
            try {
                FileChannel channel = fileOutputStream.getChannel();
                try {
                    channel.write(byteBuffer.slice(), j);
                    if (channel != null) {
                        channel.close();
                    }
                    fileOutputStream.close();
                } catch (Throwable th) {
                    if (channel != null) {
                        try {
                            channel.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (IOException e) {
            throw new ConsensusGroupModifyPeerException(String.format("error when receiving snapshot %s", str), e);
        }
    }

    private String calculateSnapshotPath(String str, String str2) throws ConsensusGroupModifyPeerException {
        if (str2.contains(str)) {
            return str2.substring(str2.indexOf(str));
        }
        throw new ConsensusGroupModifyPeerException(String.format("invalid snapshot file. snapshotId: %s, filePath: %s", str, str2));
    }

    private void clearOldSnapshot() {
        File[] listFiles = new File(this.storageDir).listFiles((file, str) -> {
            return str.startsWith(SNAPSHOT_DIR_NAME);
        });
        if (listFiles == null || listFiles.length == 0) {
            this.logger.error("Can not find any snapshot dir after build a new snapshot for group {}", this.thisNode.getGroupId());
            return;
        }
        for (File file2 : listFiles) {
            if (!file2.getName().equals(this.newSnapshotDirName)) {
                try {
                    FileUtils.deleteDirectory(file2);
                } catch (IOException e) {
                    this.logger.error("Delete old snapshot dir {} failed", file2.getAbsolutePath(), e);
                }
            }
        }
    }

    public void loadSnapshot(String str) {
        this.stateMachine.loadSnapshot(new File(this.storageDir, str));
    }

    public void inactivatePeer(Peer peer, boolean z) throws ConsensusGroupModifyPeerException {
        SyncIoTConsensusServiceClient syncIoTConsensusServiceClient;
        TInactivatePeerRes inactivatePeer;
        ConsensusGroupModifyPeerException consensusGroupModifyPeerException = null;
        for (int i = 0; i < 2; i++) {
            try {
                syncIoTConsensusServiceClient = (SyncIoTConsensusServiceClient) this.syncClientManager.borrowClient(peer.getEndpoint());
                try {
                    try {
                        inactivatePeer = syncIoTConsensusServiceClient.inactivatePeer(new TInactivatePeerReq(peer.getGroupId().convertToTConsensusGroupId()).setForDeletionPurpose(z));
                    } catch (Exception e) {
                        consensusGroupModifyPeerException = new ConsensusGroupModifyPeerException(String.format("error when inactivating %s", peer), e);
                    }
                } finally {
                }
            } catch (ClientManagerException e2) {
                consensusGroupModifyPeerException = new ConsensusGroupModifyPeerException((Throwable) e2);
            }
            if (isSuccess(inactivatePeer.status)) {
                if (syncIoTConsensusServiceClient != null) {
                    syncIoTConsensusServiceClient.close();
                    return;
                }
                return;
            } else {
                consensusGroupModifyPeerException = new ConsensusGroupModifyPeerException(String.format("error when inactivating %s. %s", peer, inactivatePeer.getStatus()));
                if (syncIoTConsensusServiceClient != null) {
                    syncIoTConsensusServiceClient.close();
                }
            }
        }
        throw consensusGroupModifyPeerException;
    }

    public void triggerSnapshotLoad(Peer peer) throws ConsensusGroupModifyPeerException {
        try {
            SyncIoTConsensusServiceClient syncIoTConsensusServiceClient = (SyncIoTConsensusServiceClient) this.syncClientManager.borrowClient(peer.getEndpoint());
            try {
                TTriggerSnapshotLoadRes triggerSnapshotLoad = syncIoTConsensusServiceClient.triggerSnapshotLoad(new TTriggerSnapshotLoadReq(this.thisNode.getGroupId().convertToTConsensusGroupId(), this.newSnapshotDirName));
                if (!isSuccess(triggerSnapshotLoad.status)) {
                    throw new ConsensusGroupModifyPeerException(String.format("error when triggering snapshot load %s. %s", peer, triggerSnapshotLoad.getStatus()));
                }
                if (syncIoTConsensusServiceClient != null) {
                    syncIoTConsensusServiceClient.close();
                }
            } finally {
            }
        } catch (Exception e) {
            throw new ConsensusGroupModifyPeerException(String.format("error when activating %s", peer), e);
        }
    }

    public void activePeer(Peer peer) throws ConsensusGroupModifyPeerException {
        try {
            SyncIoTConsensusServiceClient syncIoTConsensusServiceClient = (SyncIoTConsensusServiceClient) this.syncClientManager.borrowClient(peer.getEndpoint());
            try {
                TActivatePeerRes activatePeer = syncIoTConsensusServiceClient.activatePeer(new TActivatePeerReq(peer.getGroupId().convertToTConsensusGroupId()));
                if (!isSuccess(activatePeer.status)) {
                    throw new ConsensusGroupModifyPeerException(String.format("error when activating %s. %s", peer, activatePeer.getStatus()));
                }
                if (syncIoTConsensusServiceClient != null) {
                    syncIoTConsensusServiceClient.close();
                }
            } finally {
            }
        } catch (Exception e) {
            throw new ConsensusGroupModifyPeerException(String.format("error when activating %s", peer), e);
        }
    }

    public void notifyPeersToBuildSyncLogChannel(Peer peer) throws ConsensusGroupModifyPeerException {
        ArrayList<Peer> arrayList = new ArrayList(this.configuration);
        this.logger.info("[IoTConsensus] notify current peers to build sync log. group member: {}, target: {}", arrayList, peer);
        for (Peer peer2 : arrayList) {
            this.logger.info("[IoTConsensus] build sync log channel from {}", peer2);
            if (peer2.equals(this.thisNode)) {
                buildSyncLogChannel(peer, true);
            } else {
                try {
                    SyncIoTConsensusServiceClient syncIoTConsensusServiceClient = (SyncIoTConsensusServiceClient) this.syncClientManager.borrowClient(peer2.getEndpoint());
                    try {
                        if (!isSuccess(syncIoTConsensusServiceClient.buildSyncLogChannel(new TBuildSyncLogChannelReq(peer.getGroupId().convertToTConsensusGroupId(), peer.getEndpoint(), peer.getNodeId())).status)) {
                            throw new ConsensusGroupModifyPeerException(String.format("build sync log channel failed from %s to %s", peer2, peer));
                            break;
                        } else if (syncIoTConsensusServiceClient != null) {
                            syncIoTConsensusServiceClient.close();
                        }
                    } catch (Throwable th) {
                        if (syncIoTConsensusServiceClient != null) {
                            try {
                                syncIoTConsensusServiceClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                        break;
                    }
                } catch (Exception e) {
                    this.logger.error("cannot notify {} to build sync log channel. Please check the status of this node manually", peer2, e);
                }
            }
        }
    }

    public void notifyPeersToRemoveSyncLogChannel(Peer peer) {
        ImmutableList copyOf = ImmutableList.copyOf(this.configuration);
        removeSyncLogChannel(peer);
        UnmodifiableIterator it = copyOf.iterator();
        while (it.hasNext()) {
            Peer peer2 = (Peer) it.next();
            if (!peer2.equals(peer) && !peer2.equals(this.thisNode)) {
                try {
                    SyncIoTConsensusServiceClient syncIoTConsensusServiceClient = (SyncIoTConsensusServiceClient) this.syncClientManager.borrowClient(peer2.getEndpoint());
                    try {
                        if (!isSuccess(syncIoTConsensusServiceClient.removeSyncLogChannel(new TRemoveSyncLogChannelReq(peer.getGroupId().convertToTConsensusGroupId(), peer.getEndpoint(), peer.getNodeId())).status)) {
                            this.logger.warn("removing sync log channel failed from {} to {}", peer2, peer);
                        }
                        if (syncIoTConsensusServiceClient != null) {
                            syncIoTConsensusServiceClient.close();
                        }
                    } catch (Throwable th) {
                        if (syncIoTConsensusServiceClient != null) {
                            try {
                                syncIoTConsensusServiceClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                        break;
                    }
                } catch (Exception e) {
                    this.logger.warn("Exception happened during removing sync log channel from {} to {}", new Object[]{peer2, peer, e});
                }
            }
        }
    }

    public void waitTargetPeerUntilSyncLogCompleted(Peer peer) throws ConsensusGroupModifyPeerException {
        TWaitSyncLogCompleteRes waitSyncLogComplete;
        try {
            try {
                SyncIoTConsensusServiceClient syncIoTConsensusServiceClient = (SyncIoTConsensusServiceClient) this.syncClientManager.borrowClient(peer.getEndpoint());
                while (true) {
                    try {
                        waitSyncLogComplete = syncIoTConsensusServiceClient.waitSyncLogComplete(new TWaitSyncLogCompleteReq(peer.getGroupId().convertToTConsensusGroupId()));
                        if (waitSyncLogComplete.complete) {
                            break;
                        }
                        this.logger.info("[WAIT LOG SYNC] {} SyncLog is still in progress. TargetIndex: {}, CurrentSyncIndex: {}", new Object[]{peer, Long.valueOf(waitSyncLogComplete.searchIndex), Long.valueOf(waitSyncLogComplete.safeIndex)});
                        Thread.sleep(10000L);
                    } catch (Throwable th) {
                        if (syncIoTConsensusServiceClient != null) {
                            try {
                                syncIoTConsensusServiceClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                }
                this.logger.info("[WAIT LOG SYNC] {} SyncLog is completed. TargetIndex: {}, CurrentSyncIndex: {}", new Object[]{peer, Long.valueOf(waitSyncLogComplete.searchIndex), Long.valueOf(waitSyncLogComplete.safeIndex)});
                if (syncIoTConsensusServiceClient != null) {
                    syncIoTConsensusServiceClient.close();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new ConsensusGroupModifyPeerException(String.format("thread interrupted when waiting %s to complete SyncLog. %s", peer, e.getMessage()), e);
            }
        } catch (ClientManagerException | TException e2) {
            throw new ConsensusGroupModifyPeerException(String.format("error when waiting %s to complete SyncLog. %s", peer, e2.getMessage()), e2);
        }
    }

    public boolean hasReleaseAllRegionRelatedResource(ConsensusGroupId consensusGroupId) {
        return this.stateMachine.hasReleaseAllRegionRelatedResource(consensusGroupId);
    }

    public void waitReleaseAllRegionRelatedResource(Peer peer) throws ConsensusGroupModifyPeerException {
        try {
            try {
                SyncIoTConsensusServiceClient syncIoTConsensusServiceClient = (SyncIoTConsensusServiceClient) this.syncClientManager.borrowClient(peer.getEndpoint());
                while (!syncIoTConsensusServiceClient.waitReleaseAllRegionRelatedResource(new TWaitReleaseAllRegionRelatedResourceReq(peer.getGroupId().convertToTConsensusGroupId())).releaseAllResource) {
                    try {
                        this.logger.info("[WAIT RELEASE] {} is still releasing all region related resource", peer);
                        Thread.sleep(10000L);
                    } catch (Throwable th) {
                        if (syncIoTConsensusServiceClient != null) {
                            try {
                                syncIoTConsensusServiceClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                }
                this.logger.info("[WAIT RELEASE] {} has released all region related resource", peer);
                if (syncIoTConsensusServiceClient != null) {
                    syncIoTConsensusServiceClient.close();
                }
            } catch (ClientManagerException | TException e) {
                throw new ConsensusGroupModifyPeerException(String.format("error when waiting %s to release all region related resource. %s", peer, e.getMessage()), e);
            }
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw new ConsensusGroupModifyPeerException(String.format("thread interrupted when waiting %s to release all region related resource. %s", peer, e2.getMessage()), e2);
        }
    }

    private boolean isSuccess(TSStatus tSStatus) {
        return tSStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
    }

    public void buildSyncLogChannel(Peer peer, boolean z) {
        buildSyncLogChannel(peer, getMinSyncIndex(), z);
    }

    public void buildSyncLogChannel(Peer peer, long j, boolean z) {
        KillPoint.setKillPoint(DataNodeKillPoints.ORIGINAL_ADD_PEER_DONE);
        this.configuration.add(peer);
        if (Objects.equals(peer, this.thisNode)) {
            return;
        }
        this.logDispatcher.addLogDispatcherThread(peer, j, z);
        Logger logger = this.logger;
        Object[] objArr = new Object[3];
        objArr[0] = peer;
        objArr[1] = Long.valueOf(j);
        objArr[2] = z ? "Sync log channel has started." : "Sync log channel maybe start later.";
        logger.info("[IoTConsensus] Successfully build sync log channel to {} with initialSyncIndex {}. {}", objArr);
    }

    public boolean removeSyncLogChannel(Peer peer) {
        boolean z = false;
        Object obj = "";
        try {
            this.logDispatcher.removeLogDispatcherThread(peer);
            this.logger.info("[IoTConsensus] log dispatcher to {} removed and cleanup", peer);
        } catch (Exception e) {
            this.logger.warn("[IoTConsensus] Exception happened during removing log dispatcher thread, but configuration.dat will still be removed.", e);
            obj = "It's suggested restart the DataNode to remove log dispatcher thread.";
            z = true;
        }
        if (!z) {
            this.logger.info("[IoTConsensus] Log dispatcher thread to {} has been removed and cleanup", peer);
        }
        this.configuration.remove(peer);
        checkAndUpdateSafeDeletedSearchIndex();
        this.logger.info("[IoTConsensus Configuration] Configuration updated to {}. {}", this.configuration, obj);
        return !z;
    }

    public static String generateConfigurationDatFileName(int i, String str) {
        return i + "_" + str;
    }

    public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest(IConsensusRequest iConsensusRequest) {
        if (iConsensusRequest instanceof ComparableConsensusRequest) {
            ((ComparableConsensusRequest) iConsensusRequest).setProgressIndex(new IoTProgressIndex(Integer.valueOf(this.thisNode.getNodeId()), Long.valueOf(this.searchIndex.get() + 1)));
        }
        return new IndexedConsensusRequest(this.searchIndex.get() + 1, Collections.singletonList(iConsensusRequest));
    }

    public IndexedConsensusRequest buildIndexedConsensusRequestForRemoteRequest(long j, List<IConsensusRequest> list) {
        return new IndexedConsensusRequest(-1L, j, list);
    }

    public long getMinSyncIndex() {
        OptionalLong minSyncIndex = this.logDispatcher.getMinSyncIndex();
        AtomicLong atomicLong = this.searchIndex;
        Objects.requireNonNull(atomicLong);
        return minSyncIndex.orElseGet(atomicLong::get);
    }

    public long getMinFlushedSyncIndex() {
        OptionalLong minFlushedSyncIndex = this.logDispatcher.getMinFlushedSyncIndex();
        AtomicLong atomicLong = this.searchIndex;
        Objects.requireNonNull(atomicLong);
        return minFlushedSyncIndex.orElseGet(atomicLong::get);
    }

    public String getStorageDir() {
        return this.storageDir;
    }

    public Peer getThisNode() {
        return this.thisNode;
    }

    public List<Peer> getConfiguration() {
        return new ArrayList(this.configuration);
    }

    public long getSearchIndex() {
        return this.searchIndex.get();
    }

    public long getSyncLag() {
        return getSearchIndex() - getMinSyncIndex();
    }

    public IoTConsensusConfig getConfig() {
        return this.config;
    }

    public long getLogEntriesFromWAL() {
        return this.logDispatcher.getLogEntriesFromWAL();
    }

    public long getLogEntriesFromQueue() {
        return this.logDispatcher.getLogEntriesFromQueue();
    }

    public boolean needBlockWrite() {
        return this.consensusReqReader.getTotalSize() > this.config.getReplication().getWalThrottleThreshold();
    }

    public boolean unblockWrite() {
        return this.consensusReqReader.getTotalSize() < this.config.getReplication().getWalThrottleThreshold();
    }

    public void signal() {
        this.stateMachineLock.lock();
        try {
            this.stateMachineCondition.signalAll();
        } finally {
            this.stateMachineLock.unlock();
        }
    }

    public AtomicLong getIndexObject() {
        return this.searchIndex;
    }

    public ScheduledExecutorService getBackgroundTaskService() {
        return this.backgroundTaskService;
    }

    public LogDispatcher getLogDispatcher() {
        return this.logDispatcher;
    }

    public IoTConsensusServerMetrics getIoTConsensusServerMetrics() {
        return this.ioTConsensusServerMetrics;
    }

    public boolean isReadOnly() {
        return this.stateMachine.isReadOnly();
    }

    public boolean isActive() {
        return this.active;
    }

    public void setActive(boolean z) {
        this.logger.info("set {} active status to {}", this.thisNode, Boolean.valueOf(z));
        this.active = z;
    }

    public void cleanupRemoteSnapshot(Peer peer) throws ConsensusGroupModifyPeerException {
        try {
            SyncIoTConsensusServiceClient syncIoTConsensusServiceClient = (SyncIoTConsensusServiceClient) this.syncClientManager.borrowClient(peer.getEndpoint());
            try {
                TCleanupTransferredSnapshotRes cleanupTransferredSnapshot = syncIoTConsensusServiceClient.cleanupTransferredSnapshot(new TCleanupTransferredSnapshotReq(peer.getGroupId().convertToTConsensusGroupId(), this.newSnapshotDirName));
                if (!isSuccess(cleanupTransferredSnapshot.getStatus())) {
                    throw new ConsensusGroupModifyPeerException(String.format("cleanup remote snapshot failed of %s ,status is %s", peer, cleanupTransferredSnapshot.getStatus()));
                }
                if (syncIoTConsensusServiceClient != null) {
                    syncIoTConsensusServiceClient.close();
                }
            } finally {
            }
        } catch (Exception e) {
            throw new ConsensusGroupModifyPeerException(String.format("cleanup remote snapshot failed of %s", peer), e);
        }
    }

    public void cleanupSnapshot(String str) throws ConsensusGroupModifyPeerException {
        File file = new File(this.storageDir, str);
        if (!file.exists()) {
            this.logger.info("File not exist: {}", file);
            return;
        }
        try {
            FileUtils.deleteDirectory(file);
        } catch (IOException e) {
            throw new ConsensusGroupModifyPeerException(e);
        }
    }

    public void cleanupLocalSnapshot() {
        try {
            cleanupSnapshot(this.newSnapshotDirName);
            this.stateMachine.clearSnapshot();
        } catch (ConsensusGroupModifyPeerException e) {
            this.logger.warn("Cleanup local snapshot fail. You may manually delete {}.", this.newSnapshotDirName, e);
        }
    }

    void checkAndUpdateIndex() {
        checkAndUpdateSafeDeletedSearchIndex();
        checkAndUpdateSearchIndex();
    }

    void checkAndUpdateSafeDeletedSearchIndex() {
        if (this.configuration.isEmpty()) {
            this.logger.error("Configuration is empty, which is unexpected. Safe deleted search index won't be updated this time.");
        } else if (this.configuration.size() == 1) {
            this.consensusReqReader.setSafelyDeletedSearchIndex(Long.MAX_VALUE);
        } else {
            this.consensusReqReader.setSafelyDeletedSearchIndex(getMinFlushedSyncIndex());
        }
    }

    public void checkAndUpdateSearchIndex() {
        long j = this.searchIndex.get();
        long minFlushedSyncIndex = getMinFlushedSyncIndex();
        if (j < minFlushedSyncIndex) {
            this.logger.warn("The searchIndex for this region({}) is smaller than the safelyDeletedSearchIndex when the node is restarted, which means that the data of the current region is not flushed by the wal, but has been synchronized to other nodes. At this point, different replicas have been inconsistent and cannot be automatically recovered. To prevent subsequent logs from marking smaller searchIndex and exacerbating the inconsistency, we manually set the searchIndex({}) to safelyDeletedSearchIndex({}) here to reduce the impact of this problem in the future", new Object[]{this.consensusGroupId, Long.valueOf(j), Long.valueOf(minFlushedSyncIndex)});
            this.searchIndex.set(minFlushedSyncIndex);
        }
    }

    public TSStatus syncLog(int i, IConsensusRequest iConsensusRequest) {
        return this.cacheQueueMap.computeIfAbsent(Integer.valueOf(i), i2 -> {
            return new SyncLogCacheQueue(i2);
        }).cacheAndInsertLatestNode((DeserializedBatchIndexedConsensusRequest) iConsensusRequest);
    }

    public String getConsensusGroupId() {
        return this.consensusGroupId;
    }

    public void reloadConsensusConfig(IoTConsensusConfig ioTConsensusConfig) {
        this.config = ioTConsensusConfig;
    }
}
