package io.openmessaging.storage.dledger;

import com.alibaba.fastjson.JSON;
import io.openmessaging.storage.dledger.entry.DLedgerEntry;
import io.openmessaging.storage.dledger.exception.DLedgerException;
import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
import io.openmessaging.storage.dledger.protocol.PushEntryRequest;
import io.openmessaging.storage.dledger.protocol.PushEntryResponse;
import io.openmessaging.storage.dledger.store.DLedgerMemoryStore;
import io.openmessaging.storage.dledger.store.DLedgerStore;
import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
import io.openmessaging.storage.dledger.utils.Pair;
import io.openmessaging.storage.dledger.utils.PreConditions;
import io.openmessaging.storage.dledger.utils.Quota;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/dledger-0.2.3.jar:io/openmessaging/storage/dledger/DLedgerEntryPusher.class */
public class DLedgerEntryPusher {
    private static Logger logger = LoggerFactory.getLogger((Class<?>) DLedgerEntryPusher.class);
    private DLedgerConfig dLedgerConfig;
    private DLedgerStore dLedgerStore;
    private final MemberState memberState;
    private DLedgerRpcService dLedgerRpcService;
    private EntryHandler entryHandler;
    private QuorumAckChecker quorumAckChecker;
    private Map<Long, ConcurrentMap<String, Long>> peerWaterMarksByTerm = new ConcurrentHashMap();
    private Map<Long, ConcurrentMap<Long, TimeoutFuture<AppendEntryResponse>>> pendingAppendResponsesByTerm = new ConcurrentHashMap();
    private Map<String, EntryDispatcher> dispatcherMap = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/dledger-0.2.3.jar:io/openmessaging/storage/dledger/DLedgerEntryPusher$EntryDispatcher.class */
    public class EntryDispatcher extends ShutdownAbleThread {
        private AtomicReference<PushEntryRequest.Type> type;
        private long lastPushCommitTimeMs;
        private String peerId;
        private long compareIndex;
        private long writeIndex;
        private int maxPendingSize;
        private long term;
        private String leaderId;
        private long lastCheckLeakTimeMs;
        private ConcurrentMap<Long, Long> pendingMap;
        private ConcurrentMap<Long, Pair<Long, Integer>> batchPendingMap;
        private PushEntryRequest batchAppendEntryRequest;
        private Quota quota;

        public EntryDispatcher(String str, Logger logger) {
            super("EntryDispatcher-" + DLedgerEntryPusher.this.memberState.getSelfId() + "-" + str, logger);
            this.type = new AtomicReference<>(PushEntryRequest.Type.COMPARE);
            this.lastPushCommitTimeMs = -1L;
            this.compareIndex = -1L;
            this.writeIndex = -1L;
            this.maxPendingSize = 1000;
            this.term = -1L;
            this.leaderId = null;
            this.lastCheckLeakTimeMs = System.currentTimeMillis();
            this.pendingMap = new ConcurrentHashMap();
            this.batchPendingMap = new ConcurrentHashMap();
            this.batchAppendEntryRequest = new PushEntryRequest();
            this.quota = new Quota(DLedgerEntryPusher.this.dLedgerConfig.getPeerPushQuota());
            this.peerId = str;
        }

        private boolean checkAndFreshState() {
            if (!DLedgerEntryPusher.this.memberState.isLeader()) {
                return false;
            }
            if (this.term == DLedgerEntryPusher.this.memberState.currTerm() && this.leaderId != null && this.leaderId.equals(DLedgerEntryPusher.this.memberState.getLeaderId())) {
                return true;
            }
            synchronized (DLedgerEntryPusher.this.memberState) {
                if (!DLedgerEntryPusher.this.memberState.isLeader()) {
                    return false;
                }
                PreConditions.check(DLedgerEntryPusher.this.memberState.getSelfId().equals(DLedgerEntryPusher.this.memberState.getLeaderId()), DLedgerResponseCode.UNKNOWN);
                this.term = DLedgerEntryPusher.this.memberState.currTerm();
                this.leaderId = DLedgerEntryPusher.this.memberState.getSelfId();
                changeState(-1L, PushEntryRequest.Type.COMPARE);
                return true;
            }
        }

        private PushEntryRequest buildPushRequest(DLedgerEntry dLedgerEntry, PushEntryRequest.Type type) {
            PushEntryRequest pushEntryRequest = new PushEntryRequest();
            pushEntryRequest.setGroup(DLedgerEntryPusher.this.memberState.getGroup());
            pushEntryRequest.setRemoteId(this.peerId);
            pushEntryRequest.setLeaderId(this.leaderId);
            pushEntryRequest.setTerm(this.term);
            pushEntryRequest.setEntry(dLedgerEntry);
            pushEntryRequest.setType(type);
            pushEntryRequest.setCommitIndex(DLedgerEntryPusher.this.dLedgerStore.getCommittedIndex());
            return pushEntryRequest;
        }

        private void resetBatchAppendEntryRequest() {
            this.batchAppendEntryRequest.setGroup(DLedgerEntryPusher.this.memberState.getGroup());
            this.batchAppendEntryRequest.setRemoteId(this.peerId);
            this.batchAppendEntryRequest.setLeaderId(this.leaderId);
            this.batchAppendEntryRequest.setTerm(this.term);
            this.batchAppendEntryRequest.setType(PushEntryRequest.Type.APPEND);
            this.batchAppendEntryRequest.clear();
        }

        private void checkQuotaAndWait(DLedgerEntry dLedgerEntry) {
            if (DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex() - dLedgerEntry.getIndex() > this.maxPendingSize && !(DLedgerEntryPusher.this.dLedgerStore instanceof DLedgerMemoryStore) && ((DLedgerMmapFileStore) DLedgerEntryPusher.this.dLedgerStore).getDataFileList().getMaxWrotePosition() - dLedgerEntry.getPos() >= DLedgerEntryPusher.this.dLedgerConfig.getPeerPushThrottlePoint()) {
                this.quota.sample(dLedgerEntry.getSize());
                if (this.quota.validateNow()) {
                    long leftNow = this.quota.leftNow();
                    this.logger.warn("[Push-{}]Quota exhaust, will sleep {}ms", this.peerId, Long.valueOf(leftNow));
                    DLedgerUtils.sleep(leftNow);
                }
            }
        }

        private void doAppendInner(long j) throws Exception {
            DLedgerEntry dLedgerEntryForAppend = getDLedgerEntryForAppend(j);
            if (null == dLedgerEntryForAppend) {
                return;
            }
            checkQuotaAndWait(dLedgerEntryForAppend);
            CompletableFuture<PushEntryResponse> push = DLedgerEntryPusher.this.dLedgerRpcService.push(buildPushRequest(dLedgerEntryForAppend, PushEntryRequest.Type.APPEND));
            this.pendingMap.put(Long.valueOf(j), Long.valueOf(System.currentTimeMillis()));
            push.whenComplete((pushEntryResponse, th) -> {
                try {
                    PreConditions.check(th == null, DLedgerResponseCode.UNKNOWN);
                    DLedgerResponseCode valueOf = DLedgerResponseCode.valueOf(pushEntryResponse.getCode());
                    switch (valueOf) {
                        case SUCCESS:
                            this.pendingMap.remove(pushEntryResponse.getIndex());
                            DLedgerEntryPusher.this.updatePeerWaterMark(pushEntryResponse.getTerm(), this.peerId, pushEntryResponse.getIndex().longValue());
                            DLedgerEntryPusher.this.quorumAckChecker.wakeup();
                            break;
                        case INCONSISTENT_STATE:
                            this.logger.info("[Push-{}]Get INCONSISTENT_STATE when push index={} term={}", this.peerId, pushEntryResponse.getIndex(), Long.valueOf(pushEntryResponse.getTerm()));
                            changeState(-1L, PushEntryRequest.Type.COMPARE);
                            break;
                        default:
                            this.logger.warn("[Push-{}]Get error response code {} {}", this.peerId, valueOf, pushEntryResponse.baseInfo());
                            break;
                    }
                } catch (Throwable th) {
                    this.logger.error("", th);
                }
            });
            this.lastPushCommitTimeMs = System.currentTimeMillis();
        }

        private DLedgerEntry getDLedgerEntryForAppend(long j) {
            try {
                DLedgerEntry dLedgerEntry = DLedgerEntryPusher.this.dLedgerStore.get(Long.valueOf(j));
                PreConditions.check(dLedgerEntry != null, DLedgerResponseCode.UNKNOWN, "writeIndex=%d", Long.valueOf(j));
                return dLedgerEntry;
            } catch (DLedgerException e) {
                if (!DLedgerResponseCode.INDEX_LESS_THAN_LOCAL_BEGIN.equals(e.getCode())) {
                    throw e;
                }
                this.logger.info("[Push-{}]Get INDEX_LESS_THAN_LOCAL_BEGIN when requested index is {}, try to compare", this.peerId, Long.valueOf(j));
                changeState(-1L, PushEntryRequest.Type.COMPARE);
                return null;
            }
        }

        private void doCommit() throws Exception {
            if (DLedgerUtils.elapsed(this.lastPushCommitTimeMs) > 1000) {
                DLedgerEntryPusher.this.dLedgerRpcService.push(buildPushRequest(null, PushEntryRequest.Type.COMMIT));
                this.lastPushCommitTimeMs = System.currentTimeMillis();
            }
        }

        private void doCheckAppendResponse() throws Exception {
            long peerWaterMark = DLedgerEntryPusher.this.getPeerWaterMark(this.term, this.peerId);
            Long l = this.pendingMap.get(Long.valueOf(peerWaterMark + 1));
            if (l == null || System.currentTimeMillis() - l.longValue() <= DLedgerEntryPusher.this.dLedgerConfig.getMaxPushTimeOutMs()) {
                return;
            }
            this.logger.warn("[Push-{}]Retry to push entry at {}", this.peerId, Long.valueOf(peerWaterMark + 1));
            doAppendInner(peerWaterMark + 1);
        }

        private void doAppend() throws Exception {
            while (checkAndFreshState() && this.type.get() == PushEntryRequest.Type.APPEND) {
                if (this.writeIndex > DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex()) {
                    doCommit();
                    doCheckAppendResponse();
                    return;
                }
                if (this.pendingMap.size() >= this.maxPendingSize || DLedgerUtils.elapsed(this.lastCheckLeakTimeMs) > 1000) {
                    long peerWaterMark = DLedgerEntryPusher.this.getPeerWaterMark(this.term, this.peerId);
                    for (Long l : this.pendingMap.keySet()) {
                        if (l.longValue() < peerWaterMark) {
                            this.pendingMap.remove(l);
                        }
                    }
                    this.lastCheckLeakTimeMs = System.currentTimeMillis();
                }
                if (this.pendingMap.size() >= this.maxPendingSize) {
                    doCheckAppendResponse();
                    return;
                } else {
                    doAppendInner(this.writeIndex);
                    this.writeIndex++;
                }
            }
        }

        private void sendBatchAppendEntryRequest() throws Exception {
            this.batchAppendEntryRequest.setCommitIndex(DLedgerEntryPusher.this.dLedgerStore.getCommittedIndex());
            CompletableFuture<PushEntryResponse> push = DLedgerEntryPusher.this.dLedgerRpcService.push(this.batchAppendEntryRequest);
            this.batchPendingMap.put(Long.valueOf(this.batchAppendEntryRequest.getFirstEntryIndex()), new Pair<>(Long.valueOf(System.currentTimeMillis()), Integer.valueOf(this.batchAppendEntryRequest.getCount())));
            push.whenComplete((pushEntryResponse, th) -> {
                try {
                    PreConditions.check(th == null, DLedgerResponseCode.UNKNOWN);
                    DLedgerResponseCode valueOf = DLedgerResponseCode.valueOf(pushEntryResponse.getCode());
                    switch (valueOf) {
                        case SUCCESS:
                            this.batchPendingMap.remove(pushEntryResponse.getIndex());
                            DLedgerEntryPusher.this.updatePeerWaterMark(pushEntryResponse.getTerm(), this.peerId, pushEntryResponse.getIndex().longValue());
                            break;
                        case INCONSISTENT_STATE:
                            this.logger.info("[Push-{}]Get INCONSISTENT_STATE when batch push index={} term={}", this.peerId, pushEntryResponse.getIndex(), Long.valueOf(pushEntryResponse.getTerm()));
                            changeState(-1L, PushEntryRequest.Type.COMPARE);
                            break;
                        default:
                            this.logger.warn("[Push-{}]Get error response code {} {}", this.peerId, valueOf, pushEntryResponse.baseInfo());
                            break;
                    }
                } catch (Throwable th) {
                    this.logger.error("", th);
                }
            });
            this.lastPushCommitTimeMs = System.currentTimeMillis();
            this.batchAppendEntryRequest.clear();
        }

        private void doBatchAppendInner(long j) throws Exception {
            DLedgerEntry dLedgerEntryForAppend = getDLedgerEntryForAppend(j);
            if (null == dLedgerEntryForAppend) {
                return;
            }
            this.batchAppendEntryRequest.addEntry(dLedgerEntryForAppend);
            if (this.batchAppendEntryRequest.getTotalSize() >= DLedgerEntryPusher.this.dLedgerConfig.getMaxBatchPushSize()) {
                sendBatchAppendEntryRequest();
            }
        }

        private void doCheckBatchAppendResponse() throws Exception {
            long peerWaterMark = DLedgerEntryPusher.this.getPeerWaterMark(this.term, this.peerId);
            Pair<Long, Integer> pair = this.batchPendingMap.get(Long.valueOf(peerWaterMark + 1));
            if (pair == null || System.currentTimeMillis() - pair.getKey().longValue() <= DLedgerEntryPusher.this.dLedgerConfig.getMaxPushTimeOutMs()) {
                return;
            }
            long j = peerWaterMark + 1;
            long intValue = (j + pair.getValue().intValue()) - 1;
            this.logger.warn("[Push-{}]Retry to push entry from {} to {}", this.peerId, Long.valueOf(j), Long.valueOf(intValue));
            this.batchAppendEntryRequest.clear();
            long j2 = j;
            while (true) {
                long j3 = j2;
                if (j3 > intValue) {
                    sendBatchAppendEntryRequest();
                    return;
                } else {
                    this.batchAppendEntryRequest.addEntry(DLedgerEntryPusher.this.dLedgerStore.get(Long.valueOf(j3)));
                    j2 = j3 + 1;
                }
            }
        }

        private void doBatchAppend() throws Exception {
            while (checkAndFreshState() && this.type.get() == PushEntryRequest.Type.APPEND) {
                if (this.writeIndex > DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex()) {
                    if (this.batchAppendEntryRequest.getCount() > 0) {
                        sendBatchAppendEntryRequest();
                    }
                    doCommit();
                    doCheckBatchAppendResponse();
                    return;
                }
                if (this.batchPendingMap.size() >= this.maxPendingSize || DLedgerUtils.elapsed(this.lastCheckLeakTimeMs) > 1000) {
                    long peerWaterMark = DLedgerEntryPusher.this.getPeerWaterMark(this.term, this.peerId);
                    for (Map.Entry<Long, Pair<Long, Integer>> entry : this.batchPendingMap.entrySet()) {
                        if ((entry.getKey().longValue() + entry.getValue().getValue().intValue()) - 1 <= peerWaterMark) {
                            this.batchPendingMap.remove(entry.getKey());
                        }
                    }
                    this.lastCheckLeakTimeMs = System.currentTimeMillis();
                }
                if (this.batchPendingMap.size() >= this.maxPendingSize) {
                    doCheckBatchAppendResponse();
                    return;
                } else {
                    doBatchAppendInner(this.writeIndex);
                    this.writeIndex++;
                }
            }
        }

        private void doTruncate(long j) throws Exception {
            PreConditions.check(this.type.get() == PushEntryRequest.Type.TRUNCATE, DLedgerResponseCode.UNKNOWN);
            DLedgerEntry dLedgerEntry = DLedgerEntryPusher.this.dLedgerStore.get(Long.valueOf(j));
            PreConditions.check(dLedgerEntry != null, DLedgerResponseCode.UNKNOWN);
            this.logger.info("[Push-{}]Will push data to truncate truncateIndex={} pos={}", this.peerId, Long.valueOf(j), Long.valueOf(dLedgerEntry.getPos()));
            PushEntryResponse pushEntryResponse = DLedgerEntryPusher.this.dLedgerRpcService.push(buildPushRequest(dLedgerEntry, PushEntryRequest.Type.TRUNCATE)).get(3L, TimeUnit.SECONDS);
            PreConditions.check(pushEntryResponse != null, DLedgerResponseCode.UNKNOWN, "truncateIndex=%d", Long.valueOf(j));
            PreConditions.check(pushEntryResponse.getCode() == DLedgerResponseCode.SUCCESS.getCode(), DLedgerResponseCode.valueOf(pushEntryResponse.getCode()), "truncateIndex=%d", Long.valueOf(j));
            this.lastPushCommitTimeMs = System.currentTimeMillis();
            changeState(j, PushEntryRequest.Type.APPEND);
        }

        private synchronized void changeState(long j, PushEntryRequest.Type type) {
            this.logger.info("[Push-{}]Change state from {} to {} at {}", this.peerId, this.type.get(), type, Long.valueOf(j));
            switch (type) {
                case APPEND:
                    this.compareIndex = -1L;
                    DLedgerEntryPusher.this.updatePeerWaterMark(this.term, this.peerId, j);
                    DLedgerEntryPusher.this.quorumAckChecker.wakeup();
                    this.writeIndex = j + 1;
                    if (DLedgerEntryPusher.this.dLedgerConfig.isEnableBatchPush()) {
                        resetBatchAppendEntryRequest();
                        break;
                    }
                    break;
                case COMPARE:
                    if (this.type.compareAndSet(PushEntryRequest.Type.APPEND, PushEntryRequest.Type.COMPARE)) {
                        this.compareIndex = -1L;
                        if (!DLedgerEntryPusher.this.dLedgerConfig.isEnableBatchPush()) {
                            this.pendingMap.clear();
                            break;
                        } else {
                            this.batchPendingMap.clear();
                            break;
                        }
                    }
                    break;
                case TRUNCATE:
                    this.compareIndex = -1L;
                    break;
            }
            this.type.set(type);
        }

        private void doCompare() throws Exception {
            while (checkAndFreshState()) {
                if (this.type.get() != PushEntryRequest.Type.COMPARE && this.type.get() != PushEntryRequest.Type.TRUNCATE) {
                    return;
                }
                if (this.compareIndex == -1 && DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex() == -1) {
                    return;
                }
                if (this.compareIndex == -1) {
                    this.compareIndex = DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex();
                    this.logger.info("[Push-{}][DoCompare] compareIndex=-1 means start to compare", this.peerId);
                } else if (this.compareIndex > DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex() || this.compareIndex < DLedgerEntryPusher.this.dLedgerStore.getLedgerBeginIndex()) {
                    this.logger.info("[Push-{}][DoCompare] compareIndex={} out of range {}-{}", this.peerId, Long.valueOf(this.compareIndex), Long.valueOf(DLedgerEntryPusher.this.dLedgerStore.getLedgerBeginIndex()), Long.valueOf(DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex()));
                    this.compareIndex = DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex();
                }
                DLedgerEntry dLedgerEntry = DLedgerEntryPusher.this.dLedgerStore.get(Long.valueOf(this.compareIndex));
                PreConditions.check(dLedgerEntry != null, DLedgerResponseCode.INTERNAL_ERROR, "compareIndex=%d", Long.valueOf(this.compareIndex));
                PushEntryResponse pushEntryResponse = DLedgerEntryPusher.this.dLedgerRpcService.push(buildPushRequest(dLedgerEntry, PushEntryRequest.Type.COMPARE)).get(3L, TimeUnit.SECONDS);
                PreConditions.check(pushEntryResponse != null, DLedgerResponseCode.INTERNAL_ERROR, "compareIndex=%d", Long.valueOf(this.compareIndex));
                PreConditions.check(pushEntryResponse.getCode() == DLedgerResponseCode.INCONSISTENT_STATE.getCode() || pushEntryResponse.getCode() == DLedgerResponseCode.SUCCESS.getCode(), DLedgerResponseCode.valueOf(pushEntryResponse.getCode()), "compareIndex=%d", Long.valueOf(this.compareIndex));
                long j = -1;
                if (pushEntryResponse.getCode() == DLedgerResponseCode.SUCCESS.getCode()) {
                    if (this.compareIndex == pushEntryResponse.getEndIndex()) {
                        changeState(this.compareIndex, PushEntryRequest.Type.APPEND);
                        return;
                    }
                    j = this.compareIndex;
                } else if (pushEntryResponse.getEndIndex() < DLedgerEntryPusher.this.dLedgerStore.getLedgerBeginIndex() || pushEntryResponse.getBeginIndex() > DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex()) {
                    j = DLedgerEntryPusher.this.dLedgerStore.getLedgerBeginIndex();
                } else if (this.compareIndex < pushEntryResponse.getBeginIndex()) {
                    j = DLedgerEntryPusher.this.dLedgerStore.getLedgerBeginIndex();
                } else if (this.compareIndex > pushEntryResponse.getEndIndex()) {
                    this.compareIndex = pushEntryResponse.getEndIndex();
                } else {
                    this.compareIndex--;
                }
                if (this.compareIndex < DLedgerEntryPusher.this.dLedgerStore.getLedgerBeginIndex()) {
                    j = DLedgerEntryPusher.this.dLedgerStore.getLedgerBeginIndex();
                }
                if (j != -1) {
                    changeState(j, PushEntryRequest.Type.TRUNCATE);
                    doTruncate(j);
                    return;
                }
            }
        }

        @Override // io.openmessaging.storage.dledger.ShutdownAbleThread
        public void doWork() {
            try {
                if (!checkAndFreshState()) {
                    waitForRunning(1L);
                    return;
                }
                if (this.type.get() != PushEntryRequest.Type.APPEND) {
                    doCompare();
                } else if (DLedgerEntryPusher.this.dLedgerConfig.isEnableBatchPush()) {
                    doBatchAppend();
                } else {
                    doAppend();
                }
                Thread.yield();
            } catch (Throwable th) {
                DLedgerEntryPusher.logger.error("[Push-{}]Error in {} writeIndex={} compareIndex={}", this.peerId, getName(), Long.valueOf(this.writeIndex), Long.valueOf(this.compareIndex), th);
                changeState(-1L, PushEntryRequest.Type.COMPARE);
                DLedgerUtils.sleep(500L);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/dledger-0.2.3.jar:io/openmessaging/storage/dledger/DLedgerEntryPusher$EntryHandler.class */
    public class EntryHandler extends ShutdownAbleThread {
        private long lastCheckFastForwardTimeMs;
        ConcurrentMap<Long, Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>>> writeRequestMap;
        BlockingQueue<Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>>> compareOrTruncateRequests;

        public EntryHandler(Logger logger) {
            super("EntryHandler-" + DLedgerEntryPusher.this.memberState.getSelfId(), logger);
            this.lastCheckFastForwardTimeMs = System.currentTimeMillis();
            this.writeRequestMap = new ConcurrentHashMap();
            this.compareOrTruncateRequests = new ArrayBlockingQueue(100);
        }

        public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest pushEntryRequest) throws Exception {
            TimeoutFuture timeoutFuture = new TimeoutFuture(1000L);
            switch (pushEntryRequest.getType()) {
                case APPEND:
                    if (pushEntryRequest.isBatch()) {
                        PreConditions.check(pushEntryRequest.getBatchEntry() != null && pushEntryRequest.getCount() > 0, DLedgerResponseCode.UNEXPECTED_ARGUMENT);
                    } else {
                        PreConditions.check(pushEntryRequest.getEntry() != null, DLedgerResponseCode.UNEXPECTED_ARGUMENT);
                    }
                    long firstEntryIndex = pushEntryRequest.getFirstEntryIndex();
                    Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> putIfAbsent = this.writeRequestMap.putIfAbsent(Long.valueOf(firstEntryIndex), new Pair<>(pushEntryRequest, timeoutFuture));
                    if (putIfAbsent != null) {
                        this.logger.warn("[MONITOR]The index {} has already existed with {} and curr is {}", Long.valueOf(firstEntryIndex), putIfAbsent.getKey().baseInfo(), pushEntryRequest.baseInfo());
                        timeoutFuture.complete(buildResponse(pushEntryRequest, DLedgerResponseCode.REPEATED_PUSH.getCode()));
                        break;
                    }
                    break;
                case COMPARE:
                case TRUNCATE:
                    PreConditions.check(pushEntryRequest.getEntry() != null, DLedgerResponseCode.UNEXPECTED_ARGUMENT);
                    this.writeRequestMap.clear();
                    this.compareOrTruncateRequests.put(new Pair<>(pushEntryRequest, timeoutFuture));
                    break;
                case COMMIT:
                    this.compareOrTruncateRequests.put(new Pair<>(pushEntryRequest, timeoutFuture));
                    break;
                default:
                    this.logger.error("[BUG]Unknown type {} from {}", pushEntryRequest.getType(), pushEntryRequest.baseInfo());
                    timeoutFuture.complete(buildResponse(pushEntryRequest, DLedgerResponseCode.UNEXPECTED_ARGUMENT.getCode()));
                    break;
            }
            wakeup();
            return timeoutFuture;
        }

        private PushEntryResponse buildResponse(PushEntryRequest pushEntryRequest, int i) {
            PushEntryResponse pushEntryResponse = new PushEntryResponse();
            pushEntryResponse.setGroup(pushEntryRequest.getGroup());
            pushEntryResponse.setCode(i);
            pushEntryResponse.setTerm(pushEntryRequest.getTerm());
            if (pushEntryRequest.getType() != PushEntryRequest.Type.COMMIT) {
                pushEntryResponse.setIndex(Long.valueOf(pushEntryRequest.getEntry().getIndex()));
            }
            pushEntryResponse.setBeginIndex(DLedgerEntryPusher.this.dLedgerStore.getLedgerBeginIndex());
            pushEntryResponse.setEndIndex(DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex());
            return pushEntryResponse;
        }

        private PushEntryResponse buildBatchAppendResponse(PushEntryRequest pushEntryRequest, int i) {
            PushEntryResponse pushEntryResponse = new PushEntryResponse();
            pushEntryResponse.setGroup(pushEntryRequest.getGroup());
            pushEntryResponse.setCode(i);
            pushEntryResponse.setTerm(pushEntryRequest.getTerm());
            pushEntryResponse.setIndex(Long.valueOf(pushEntryRequest.getLastEntryIndex()));
            pushEntryResponse.setBeginIndex(DLedgerEntryPusher.this.dLedgerStore.getLedgerBeginIndex());
            pushEntryResponse.setEndIndex(DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex());
            return pushEntryResponse;
        }

        private void handleDoAppend(long j, PushEntryRequest pushEntryRequest, CompletableFuture<PushEntryResponse> completableFuture) {
            try {
                PreConditions.check(j == pushEntryRequest.getEntry().getIndex(), DLedgerResponseCode.INCONSISTENT_STATE);
                PreConditions.check(DLedgerEntryPusher.this.dLedgerStore.appendAsFollower(pushEntryRequest.getEntry(), pushEntryRequest.getTerm(), pushEntryRequest.getLeaderId()).getIndex() == j, DLedgerResponseCode.INCONSISTENT_STATE);
                completableFuture.complete(buildResponse(pushEntryRequest, DLedgerResponseCode.SUCCESS.getCode()));
                DLedgerEntryPusher.this.dLedgerStore.updateCommittedIndex(pushEntryRequest.getTerm(), pushEntryRequest.getCommitIndex());
            } catch (Throwable th) {
                this.logger.error("[HandleDoWrite] writeIndex={}", Long.valueOf(j), th);
                completableFuture.complete(buildResponse(pushEntryRequest, DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
            }
        }

        private CompletableFuture<PushEntryResponse> handleDoCompare(long j, PushEntryRequest pushEntryRequest, CompletableFuture<PushEntryResponse> completableFuture) {
            try {
                PreConditions.check(j == pushEntryRequest.getEntry().getIndex(), DLedgerResponseCode.UNKNOWN);
                PreConditions.check(pushEntryRequest.getType() == PushEntryRequest.Type.COMPARE, DLedgerResponseCode.UNKNOWN);
                PreConditions.check(pushEntryRequest.getEntry().equals(DLedgerEntryPusher.this.dLedgerStore.get(Long.valueOf(j))), DLedgerResponseCode.INCONSISTENT_STATE);
                completableFuture.complete(buildResponse(pushEntryRequest, DLedgerResponseCode.SUCCESS.getCode()));
            } catch (Throwable th) {
                this.logger.error("[HandleDoCompare] compareIndex={}", Long.valueOf(j), th);
                completableFuture.complete(buildResponse(pushEntryRequest, DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
            }
            return completableFuture;
        }

        private CompletableFuture<PushEntryResponse> handleDoCommit(long j, PushEntryRequest pushEntryRequest, CompletableFuture<PushEntryResponse> completableFuture) {
            try {
                PreConditions.check(j == pushEntryRequest.getCommitIndex(), DLedgerResponseCode.UNKNOWN);
                PreConditions.check(pushEntryRequest.getType() == PushEntryRequest.Type.COMMIT, DLedgerResponseCode.UNKNOWN);
                DLedgerEntryPusher.this.dLedgerStore.updateCommittedIndex(pushEntryRequest.getTerm(), j);
                completableFuture.complete(buildResponse(pushEntryRequest, DLedgerResponseCode.SUCCESS.getCode()));
            } catch (Throwable th) {
                this.logger.error("[HandleDoCommit] committedIndex={}", Long.valueOf(pushEntryRequest.getCommitIndex()), th);
                completableFuture.complete(buildResponse(pushEntryRequest, DLedgerResponseCode.UNKNOWN.getCode()));
            }
            return completableFuture;
        }

        private CompletableFuture<PushEntryResponse> handleDoTruncate(long j, PushEntryRequest pushEntryRequest, CompletableFuture<PushEntryResponse> completableFuture) {
            try {
                this.logger.info("[HandleDoTruncate] truncateIndex={} pos={}", Long.valueOf(j), Long.valueOf(pushEntryRequest.getEntry().getPos()));
                PreConditions.check(j == pushEntryRequest.getEntry().getIndex(), DLedgerResponseCode.UNKNOWN);
                PreConditions.check(pushEntryRequest.getType() == PushEntryRequest.Type.TRUNCATE, DLedgerResponseCode.UNKNOWN);
                PreConditions.check(DLedgerEntryPusher.this.dLedgerStore.truncate(pushEntryRequest.getEntry(), pushEntryRequest.getTerm(), pushEntryRequest.getLeaderId()) == j, DLedgerResponseCode.INCONSISTENT_STATE);
                completableFuture.complete(buildResponse(pushEntryRequest, DLedgerResponseCode.SUCCESS.getCode()));
                DLedgerEntryPusher.this.dLedgerStore.updateCommittedIndex(pushEntryRequest.getTerm(), pushEntryRequest.getCommitIndex());
            } catch (Throwable th) {
                this.logger.error("[HandleDoTruncate] truncateIndex={}", Long.valueOf(j), th);
                completableFuture.complete(buildResponse(pushEntryRequest, DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
            }
            return completableFuture;
        }

        private void handleDoBatchAppend(long j, PushEntryRequest pushEntryRequest, CompletableFuture<PushEntryResponse> completableFuture) {
            try {
                PreConditions.check(j == pushEntryRequest.getFirstEntryIndex(), DLedgerResponseCode.INCONSISTENT_STATE);
                Iterator<DLedgerEntry> it = pushEntryRequest.getBatchEntry().iterator();
                while (it.hasNext()) {
                    DLedgerEntryPusher.this.dLedgerStore.appendAsFollower(it.next(), pushEntryRequest.getTerm(), pushEntryRequest.getLeaderId());
                }
                completableFuture.complete(buildBatchAppendResponse(pushEntryRequest, DLedgerResponseCode.SUCCESS.getCode()));
                DLedgerEntryPusher.this.dLedgerStore.updateCommittedIndex(pushEntryRequest.getTerm(), pushEntryRequest.getCommitIndex());
            } catch (Throwable th) {
                this.logger.error("[HandleDoBatchAppend]", th);
            }
        }

        private void checkAppendFuture(long j) {
            Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair;
            long j2 = Long.MAX_VALUE;
            for (Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair2 : this.writeRequestMap.values()) {
                long firstEntryIndex = pair2.getKey().getFirstEntryIndex();
                long lastEntryIndex = pair2.getKey().getLastEntryIndex();
                if (lastEntryIndex <= j) {
                    try {
                        if (pair2.getKey().isBatch()) {
                            for (DLedgerEntry dLedgerEntry : pair2.getKey().getBatchEntry()) {
                                PreConditions.check(dLedgerEntry.equals(DLedgerEntryPusher.this.dLedgerStore.get(Long.valueOf(dLedgerEntry.getIndex()))), DLedgerResponseCode.INCONSISTENT_STATE);
                            }
                        } else {
                            DLedgerEntry entry = pair2.getKey().getEntry();
                            PreConditions.check(entry.equals(DLedgerEntryPusher.this.dLedgerStore.get(Long.valueOf(entry.getIndex()))), DLedgerResponseCode.INCONSISTENT_STATE);
                        }
                        pair2.getValue().complete(buildBatchAppendResponse(pair2.getKey(), DLedgerResponseCode.SUCCESS.getCode()));
                        this.logger.warn("[PushFallBehind]The leader pushed an batch append entry last index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed", Long.valueOf(lastEntryIndex), Long.valueOf(j));
                    } catch (Throwable th) {
                        this.logger.error("[PushFallBehind]The leader pushed an batch append entry last index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed", Long.valueOf(lastEntryIndex), Long.valueOf(j), th);
                        pair2.getValue().complete(buildBatchAppendResponse(pair2.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
                    }
                    this.writeRequestMap.remove(Long.valueOf(pair2.getKey().getFirstEntryIndex()));
                } else {
                    if (firstEntryIndex == j + 1) {
                        return;
                    }
                    if (((TimeoutFuture) pair2.getValue()).isTimeOut() && firstEntryIndex < j2) {
                        j2 = firstEntryIndex;
                    }
                }
            }
            if (j2 == Long.MAX_VALUE || (pair = this.writeRequestMap.get(Long.valueOf(j2))) == null) {
                return;
            }
            this.logger.warn("[PushFastForward] ledgerEndIndex={} entryIndex={}", Long.valueOf(j), Long.valueOf(j2));
            pair.getValue().complete(buildBatchAppendResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
        }

        private void checkAbnormalFuture(long j) {
            if (DLedgerUtils.elapsed(this.lastCheckFastForwardTimeMs) < 1000) {
                return;
            }
            this.lastCheckFastForwardTimeMs = System.currentTimeMillis();
            if (this.writeRequestMap.isEmpty()) {
                return;
            }
            checkAppendFuture(j);
        }

        @Override // io.openmessaging.storage.dledger.ShutdownAbleThread
        public void doWork() {
            try {
                if (!DLedgerEntryPusher.this.memberState.isFollower()) {
                    waitForRunning(1L);
                    return;
                }
                if (this.compareOrTruncateRequests.peek() != null) {
                    Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> poll = this.compareOrTruncateRequests.poll();
                    PreConditions.check(poll != null, DLedgerResponseCode.UNKNOWN);
                    switch (poll.getKey().getType()) {
                        case COMPARE:
                            handleDoCompare(poll.getKey().getEntry().getIndex(), poll.getKey(), poll.getValue());
                            break;
                        case TRUNCATE:
                            handleDoTruncate(poll.getKey().getEntry().getIndex(), poll.getKey(), poll.getValue());
                            break;
                        case COMMIT:
                            handleDoCommit(poll.getKey().getCommitIndex(), poll.getKey(), poll.getValue());
                            break;
                    }
                } else {
                    long ledgerEndIndex = DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex() + 1;
                    Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> remove = this.writeRequestMap.remove(Long.valueOf(ledgerEndIndex));
                    if (remove == null) {
                        checkAbnormalFuture(DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex());
                        waitForRunning(1L);
                    } else {
                        PushEntryRequest key = remove.getKey();
                        if (key.isBatch()) {
                            handleDoBatchAppend(ledgerEndIndex, key, remove.getValue());
                        } else {
                            handleDoAppend(ledgerEndIndex, key, remove.getValue());
                        }
                    }
                }
            } catch (Throwable th) {
                DLedgerEntryPusher.logger.error("Error in {}", getName(), th);
                DLedgerUtils.sleep(100L);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/dledger-0.2.3.jar:io/openmessaging/storage/dledger/DLedgerEntryPusher$QuorumAckChecker.class */
    public class QuorumAckChecker extends ShutdownAbleThread {
        private long lastPrintWatermarkTimeMs;
        private long lastCheckLeakTimeMs;
        private long lastQuorumIndex;

        public QuorumAckChecker(Logger logger) {
            super("QuorumAckChecker-" + DLedgerEntryPusher.this.memberState.getSelfId(), logger);
            this.lastPrintWatermarkTimeMs = System.currentTimeMillis();
            this.lastCheckLeakTimeMs = System.currentTimeMillis();
            this.lastQuorumIndex = -1L;
        }

        @Override // io.openmessaging.storage.dledger.ShutdownAbleThread
        public void doWork() {
            TimeoutFuture timeoutFuture;
            CompletableFuture completableFuture;
            try {
                if (DLedgerUtils.elapsed(this.lastPrintWatermarkTimeMs) > 3000) {
                    this.logger.info("[{}][{}] term={} ledgerBegin={} ledgerEnd={} committed={} watermarks={}", DLedgerEntryPusher.this.memberState.getSelfId(), DLedgerEntryPusher.this.memberState.getRole(), Long.valueOf(DLedgerEntryPusher.this.memberState.currTerm()), Long.valueOf(DLedgerEntryPusher.this.dLedgerStore.getLedgerBeginIndex()), Long.valueOf(DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex()), Long.valueOf(DLedgerEntryPusher.this.dLedgerStore.getCommittedIndex()), JSON.toJSONString(DLedgerEntryPusher.this.peerWaterMarksByTerm));
                    this.lastPrintWatermarkTimeMs = System.currentTimeMillis();
                }
                if (!DLedgerEntryPusher.this.memberState.isLeader()) {
                    waitForRunning(1L);
                    return;
                }
                long currTerm = DLedgerEntryPusher.this.memberState.currTerm();
                DLedgerEntryPusher.this.checkTermForPendingMap(currTerm, "QuorumAckChecker");
                DLedgerEntryPusher.this.checkTermForWaterMark(currTerm, "QuorumAckChecker");
                if (DLedgerEntryPusher.this.pendingAppendResponsesByTerm.size() > 1) {
                    for (Long l : DLedgerEntryPusher.this.pendingAppendResponsesByTerm.keySet()) {
                        if (l.longValue() != currTerm) {
                            for (Map.Entry entry : ((ConcurrentMap) DLedgerEntryPusher.this.pendingAppendResponsesByTerm.get(l)).entrySet()) {
                                AppendEntryResponse appendEntryResponse = new AppendEntryResponse();
                                appendEntryResponse.setGroup(DLedgerEntryPusher.this.memberState.getGroup());
                                appendEntryResponse.setIndex(((Long) entry.getKey()).longValue());
                                appendEntryResponse.setCode(DLedgerResponseCode.TERM_CHANGED.getCode());
                                appendEntryResponse.setLeaderId(DLedgerEntryPusher.this.memberState.getLeaderId());
                                this.logger.info("[TermChange] Will clear the pending response index={} for term changed from {} to {}", entry.getKey(), l, Long.valueOf(currTerm));
                                ((TimeoutFuture) entry.getValue()).complete(appendEntryResponse);
                            }
                            DLedgerEntryPusher.this.pendingAppendResponsesByTerm.remove(l);
                        }
                    }
                }
                if (DLedgerEntryPusher.this.peerWaterMarksByTerm.size() > 1) {
                    for (Long l2 : DLedgerEntryPusher.this.peerWaterMarksByTerm.keySet()) {
                        if (l2.longValue() != currTerm) {
                            this.logger.info("[TermChange] Will clear the watermarks for term changed from {} to {}", l2, Long.valueOf(currTerm));
                            DLedgerEntryPusher.this.peerWaterMarksByTerm.remove(l2);
                        }
                    }
                }
                List list = (List) ((Map) DLedgerEntryPusher.this.peerWaterMarksByTerm.get(Long.valueOf(currTerm))).values().stream().sorted(Comparator.reverseOrder()).collect(Collectors.toList());
                long longValue = ((Long) list.get(list.size() / 2)).longValue();
                DLedgerEntryPusher.this.dLedgerStore.updateCommittedIndex(currTerm, longValue);
                ConcurrentMap concurrentMap = (ConcurrentMap) DLedgerEntryPusher.this.pendingAppendResponsesByTerm.get(Long.valueOf(currTerm));
                boolean z = false;
                int i = 0;
                Long valueOf = Long.valueOf(longValue);
                while (true) {
                    if (valueOf.longValue() <= this.lastQuorumIndex) {
                        break;
                    }
                    try {
                        completableFuture = (CompletableFuture) concurrentMap.remove(valueOf);
                    } catch (Throwable th) {
                        this.logger.error("Error in ack to index={} term={}", valueOf, Long.valueOf(currTerm), th);
                    }
                    if (completableFuture == null) {
                        z = true;
                        break;
                    }
                    if (!completableFuture.isDone()) {
                        AppendEntryResponse appendEntryResponse2 = new AppendEntryResponse();
                        appendEntryResponse2.setGroup(DLedgerEntryPusher.this.memberState.getGroup());
                        appendEntryResponse2.setTerm(currTerm);
                        appendEntryResponse2.setIndex(valueOf.longValue());
                        appendEntryResponse2.setLeaderId(DLedgerEntryPusher.this.memberState.getSelfId());
                        appendEntryResponse2.setPos(((AppendFuture) completableFuture).getPos());
                        completableFuture.complete(appendEntryResponse2);
                    }
                    i++;
                    valueOf = Long.valueOf(valueOf.longValue() - 1);
                }
                if (i == 0) {
                    for (long j = longValue + 1; j < 2147483647L && (timeoutFuture = (TimeoutFuture) concurrentMap.get(Long.valueOf(j))) != null && timeoutFuture.isTimeOut(); j++) {
                        AppendEntryResponse appendEntryResponse3 = new AppendEntryResponse();
                        appendEntryResponse3.setGroup(DLedgerEntryPusher.this.memberState.getGroup());
                        appendEntryResponse3.setCode(DLedgerResponseCode.WAIT_QUORUM_ACK_TIMEOUT.getCode());
                        appendEntryResponse3.setTerm(currTerm);
                        appendEntryResponse3.setIndex(j);
                        appendEntryResponse3.setLeaderId(DLedgerEntryPusher.this.memberState.getSelfId());
                        timeoutFuture.complete(appendEntryResponse3);
                    }
                    waitForRunning(1L);
                }
                if (DLedgerUtils.elapsed(this.lastCheckLeakTimeMs) > 1000 || z) {
                    DLedgerEntryPusher.this.updatePeerWaterMark(currTerm, DLedgerEntryPusher.this.memberState.getSelfId(), DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex());
                    for (Map.Entry entry2 : concurrentMap.entrySet()) {
                        if (((Long) entry2.getKey()).longValue() < longValue) {
                            AppendEntryResponse appendEntryResponse4 = new AppendEntryResponse();
                            appendEntryResponse4.setGroup(DLedgerEntryPusher.this.memberState.getGroup());
                            appendEntryResponse4.setTerm(currTerm);
                            appendEntryResponse4.setIndex(((Long) entry2.getKey()).longValue());
                            appendEntryResponse4.setLeaderId(DLedgerEntryPusher.this.memberState.getSelfId());
                            appendEntryResponse4.setPos(((AppendFuture) entry2.getValue()).getPos());
                            ((TimeoutFuture) entry2.getValue()).complete(appendEntryResponse4);
                            concurrentMap.remove(entry2.getKey());
                        }
                    }
                    this.lastCheckLeakTimeMs = System.currentTimeMillis();
                }
                this.lastQuorumIndex = longValue;
            } catch (Throwable th2) {
                DLedgerEntryPusher.logger.error("Error in {}", getName(), th2);
                DLedgerUtils.sleep(100L);
            }
        }
    }

    public DLedgerEntryPusher(DLedgerConfig dLedgerConfig, MemberState memberState, DLedgerStore dLedgerStore, DLedgerRpcService dLedgerRpcService) {
        this.dLedgerConfig = dLedgerConfig;
        this.memberState = memberState;
        this.dLedgerStore = dLedgerStore;
        this.dLedgerRpcService = dLedgerRpcService;
        for (String str : memberState.getPeerMap().keySet()) {
            if (!str.equals(memberState.getSelfId())) {
                this.dispatcherMap.put(str, new EntryDispatcher(str, logger));
            }
        }
        this.entryHandler = new EntryHandler(logger);
        this.quorumAckChecker = new QuorumAckChecker(logger);
    }

    public void startup() {
        this.entryHandler.start();
        this.quorumAckChecker.start();
        Iterator<EntryDispatcher> it = this.dispatcherMap.values().iterator();
        while (it.hasNext()) {
            it.next().start();
        }
    }

    public void shutdown() {
        this.entryHandler.shutdown();
        this.quorumAckChecker.shutdown();
        Iterator<EntryDispatcher> it = this.dispatcherMap.values().iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
    }

    public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest pushEntryRequest) throws Exception {
        return this.entryHandler.handlePush(pushEntryRequest);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkTermForWaterMark(long j, String str) {
        if (this.peerWaterMarksByTerm.containsKey(Long.valueOf(j))) {
            return;
        }
        logger.info("Initialize the watermark in {} for term={}", str, Long.valueOf(j));
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        Iterator<String> it = this.memberState.getPeerMap().keySet().iterator();
        while (it.hasNext()) {
            concurrentHashMap.put(it.next(), -1L);
        }
        this.peerWaterMarksByTerm.putIfAbsent(Long.valueOf(j), concurrentHashMap);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkTermForPendingMap(long j, String str) {
        if (this.pendingAppendResponsesByTerm.containsKey(Long.valueOf(j))) {
            return;
        }
        logger.info("Initialize the pending append map in {} for term={}", str, Long.valueOf(j));
        this.pendingAppendResponsesByTerm.putIfAbsent(Long.valueOf(j), new ConcurrentHashMap());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updatePeerWaterMark(long j, String str, long j2) {
        synchronized (this.peerWaterMarksByTerm) {
            checkTermForWaterMark(j, "updatePeerWaterMark");
            if (this.peerWaterMarksByTerm.get(Long.valueOf(j)).get(str).longValue() < j2) {
                this.peerWaterMarksByTerm.get(Long.valueOf(j)).put(str, Long.valueOf(j2));
            }
        }
    }

    public long getPeerWaterMark(long j, String str) {
        long longValue;
        synchronized (this.peerWaterMarksByTerm) {
            checkTermForWaterMark(j, "getPeerWaterMark");
            longValue = this.peerWaterMarksByTerm.get(Long.valueOf(j)).get(str).longValue();
        }
        return longValue;
    }

    public boolean isPendingFull(long j) {
        checkTermForPendingMap(j, "isPendingFull");
        return this.pendingAppendResponsesByTerm.get(Long.valueOf(j)).size() > this.dLedgerConfig.getMaxPendingRequestsNum();
    }

    public CompletableFuture<AppendEntryResponse> waitAck(DLedgerEntry dLedgerEntry, boolean z) {
        updatePeerWaterMark(dLedgerEntry.getTerm(), this.memberState.getSelfId(), dLedgerEntry.getIndex());
        if (this.memberState.getPeerMap().size() != 1) {
            checkTermForPendingMap(dLedgerEntry.getTerm(), "waitAck");
            AppendFuture batchAppendFuture = z ? new BatchAppendFuture(this.dLedgerConfig.getMaxWaitAckTimeMs()) : new AppendFuture(this.dLedgerConfig.getMaxWaitAckTimeMs());
            batchAppendFuture.setPos(dLedgerEntry.getPos());
            if (this.pendingAppendResponsesByTerm.get(Long.valueOf(dLedgerEntry.getTerm())).put(Long.valueOf(dLedgerEntry.getIndex()), batchAppendFuture) != null) {
                logger.warn("[MONITOR] get old wait at index={}", Long.valueOf(dLedgerEntry.getIndex()));
            }
            return batchAppendFuture;
        }
        AppendEntryResponse appendEntryResponse = new AppendEntryResponse();
        appendEntryResponse.setGroup(this.memberState.getGroup());
        appendEntryResponse.setLeaderId(this.memberState.getSelfId());
        appendEntryResponse.setIndex(dLedgerEntry.getIndex());
        appendEntryResponse.setTerm(dLedgerEntry.getTerm());
        appendEntryResponse.setPos(dLedgerEntry.getPos());
        return z ? BatchAppendFuture.newCompletedFuture(dLedgerEntry.getPos(), appendEntryResponse) : AppendFuture.newCompletedFuture(dLedgerEntry.getPos(), appendEntryResponse);
    }

    public void wakeUpDispatchers() {
        Iterator<EntryDispatcher> it = this.dispatcherMap.values().iterator();
        while (it.hasNext()) {
            it.next().wakeup();
        }
    }
}
