package org.neo4j.coreedge.core.consensus.shipping;

import java.io.IOException;
import java.time.Clock;
import org.neo4j.coreedge.core.consensus.LeaderContext;
import org.neo4j.coreedge.core.consensus.RaftMessages;
import org.neo4j.coreedge.core.consensus.log.RaftLogEntry;
import org.neo4j.coreedge.core.consensus.log.ReadableRaftLog;
import org.neo4j.coreedge.core.consensus.log.segmented.InFlightMap;
import org.neo4j.coreedge.core.consensus.schedule.DelayedRenewableTimeoutService;
import org.neo4j.coreedge.core.consensus.schedule.RenewableTimeoutService;
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.coreedge.messaging.Outbound;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

/* loaded from: input_file:org/neo4j/coreedge/core/consensus/shipping/RaftLogShipper.class */
public class RaftLogShipper {
    private static final long MIN_INDEX = 1;
    private final Outbound<MemberId, RaftMessages.RaftMessage> outbound;
    private final LogProvider logProvider;
    private final Log log;
    private final ReadableRaftLog raftLog;
    private final Clock clock;
    private final MemberId follower;
    private final MemberId leader;
    private final long retryTimeMillis;
    private final int catchupBatchSize;
    private final int maxAllowedShippingLag;
    private final InFlightMap<Long, RaftLogEntry> inFlightMap;
    private DelayedRenewableTimeoutService timeoutService;
    private RenewableTimeoutService.RenewableTimeout timeout;
    private long timeoutAbsoluteMillis;
    private long lastSentIndex;
    private LeaderContext lastLeaderContext;
    private final int TIMER_INACTIVE = 0;
    private long matchIndex = -1;
    private Mode mode = Mode.MISMATCH;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/neo4j/coreedge/core/consensus/shipping/RaftLogShipper$Mode.class */
    public enum Mode {
        MISMATCH,
        CATCHUP,
        PIPELINE
    }

    /* loaded from: input_file:org/neo4j/coreedge/core/consensus/shipping/RaftLogShipper$Timeouts.class */
    public enum Timeouts implements RenewableTimeoutService.TimeoutName {
        RESEND
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RaftLogShipper(Outbound<MemberId, RaftMessages.RaftMessage> outbound, LogProvider logProvider, ReadableRaftLog readableRaftLog, Clock clock, MemberId memberId, MemberId memberId2, long j, long j2, long j3, int i, int i2, InFlightMap<Long, RaftLogEntry> inFlightMap) {
        this.outbound = outbound;
        this.catchupBatchSize = i;
        this.maxAllowedShippingLag = i2;
        this.logProvider = logProvider;
        this.log = logProvider.getLog(getClass());
        this.raftLog = readableRaftLog;
        this.clock = clock;
        this.follower = memberId2;
        this.leader = memberId;
        this.retryTimeMillis = j3;
        this.lastLeaderContext = new LeaderContext(j, j2);
        this.inFlightMap = inFlightMap;
    }

    public Object identity() {
        return this.follower;
    }

    public synchronized void start() {
        this.log.info("Starting log shipper: %s", new Object[]{statusAsString()});
        this.timeoutService = new DelayedRenewableTimeoutService(this.clock, this.logProvider);
        this.timeoutService.init();
        this.timeoutService.start();
        sendEmpty(this.raftLog.appendIndex(), this.lastLeaderContext);
    }

    public synchronized void stop() {
        this.log.info("Stopping log shipper %s", new Object[]{statusAsString()});
        try {
            this.timeoutService.stop();
            this.timeoutService.shutdown();
        } catch (Throwable th) {
            this.log.error("Failed to stop log shipper " + statusAsString(), th);
        }
        abortTimeout();
    }

    public synchronized void onMismatch(long j, LeaderContext leaderContext) {
        switch (this.mode) {
            case MISMATCH:
                sendEmpty(Long.max(Long.min(this.lastSentIndex - MIN_INDEX, j), MIN_INDEX), leaderContext);
                break;
            case PIPELINE:
            case CATCHUP:
                this.log.info("%s: mismatch in mode %s from follower %s, moving to MISMATCH mode", new Object[]{statusAsString(), this.mode, this.follower});
                this.mode = Mode.MISMATCH;
                sendEmpty(this.lastSentIndex, leaderContext);
                break;
            default:
                throw new IllegalStateException("Unknown mode: " + this.mode);
        }
        this.lastLeaderContext = leaderContext;
    }

    public synchronized void onMatch(long j, LeaderContext leaderContext) {
        boolean z = j > this.matchIndex;
        if (j > this.matchIndex) {
            this.matchIndex = j;
        } else {
            this.log.warn("%s: match index not progressing. This should be transient.", new Object[]{statusAsString()});
        }
        switch (this.mode) {
            case MISMATCH:
                if (!sendNextBatchAfterMatch(leaderContext)) {
                    this.log.info("%s: starting catch up after mismatch, moving to CATCHUP mode", new Object[]{statusAsString()});
                    this.mode = Mode.CATCHUP;
                    break;
                } else {
                    this.log.info("%s: caught up after mismatch, moving to PIPELINE mode", new Object[]{statusAsString()});
                    this.mode = Mode.PIPELINE;
                    break;
                }
            case PIPELINE:
                if (this.matchIndex != this.lastSentIndex) {
                    if (z) {
                        scheduleTimeout(this.retryTimeMillis);
                        break;
                    }
                } else {
                    abortTimeout();
                    break;
                }
                break;
            case CATCHUP:
                if (this.matchIndex >= this.lastSentIndex && sendNextBatchAfterMatch(leaderContext)) {
                    this.log.info("%s: caught up, moving to PIPELINE mode", new Object[]{statusAsString()});
                    this.mode = Mode.PIPELINE;
                    break;
                }
                break;
            default:
                throw new IllegalStateException("Unknown mode: " + this.mode);
        }
        this.lastLeaderContext = leaderContext;
    }

    public synchronized void onNewEntries(long j, long j2, RaftLogEntry[] raftLogEntryArr, LeaderContext leaderContext) {
        if (this.mode == Mode.PIPELINE) {
            while (true) {
                if (this.lastSentIndex <= j) {
                    if (j - this.matchIndex > this.maxAllowedShippingLag) {
                        this.log.info("%s: follower has fallen behind (target prevLogIndex was %d, maxAllowedShippingLag is %d), moving to CATCHUP mode", new Object[]{statusAsString(), Long.valueOf(j), Integer.valueOf(this.maxAllowedShippingLag)});
                        this.mode = Mode.CATCHUP;
                        break;
                    }
                    sendNewEntries(j, j2, raftLogEntryArr, leaderContext);
                } else {
                    break;
                }
            }
        }
        this.lastLeaderContext = leaderContext;
    }

    public synchronized void onCommitUpdate(LeaderContext leaderContext) {
        if (this.mode == Mode.PIPELINE) {
            sendCommitUpdate(leaderContext);
        }
        this.lastLeaderContext = leaderContext;
    }

    private synchronized void onScheduledTimeoutExpiry() {
        if (timedOut()) {
            onTimeout();
            return;
        }
        if (this.timeoutAbsoluteMillis != 0) {
            long millis = this.timeoutAbsoluteMillis - this.clock.millis();
            if (millis > 0) {
                scheduleTimeout(millis);
            } else {
                onTimeout();
            }
        }
    }

    void onTimeout() {
        if (this.mode == Mode.PIPELINE) {
            this.log.info("%s: timed out, moving to CATCHUP mode", new Object[]{statusAsString()});
            this.mode = Mode.CATCHUP;
            scheduleTimeout(this.retryTimeMillis);
        } else if (this.mode == Mode.CATCHUP) {
            this.log.info("%s: timed out, moving to MISMATCH mode", new Object[]{statusAsString()});
            this.mode = Mode.MISMATCH;
        }
        if (this.lastLeaderContext != null) {
            sendEmpty(this.lastSentIndex, this.lastLeaderContext);
        }
    }

    private boolean timedOut() {
        return this.timeoutAbsoluteMillis != 0 && this.clock.millis() - this.timeoutAbsoluteMillis >= 0;
    }

    private void scheduleTimeout(long j) {
        this.timeoutAbsoluteMillis = this.clock.millis() + j;
        if (this.timeout != null) {
            this.timeout.cancel();
        }
        this.timeout = this.timeoutService.create(Timeouts.RESEND, j, 0L, renewableTimeout -> {
            onScheduledTimeoutExpiry();
        });
    }

    private void abortTimeout() {
        if (this.timeout != null) {
            this.timeout.cancel();
        }
        this.timeoutAbsoluteMillis = 0L;
    }

    private boolean sendNextBatchAfterMatch(LeaderContext leaderContext) {
        long appendIndex = this.raftLog.appendIndex();
        if (appendIndex <= this.matchIndex) {
            return true;
        }
        long min = Long.min(appendIndex, this.matchIndex + this.catchupBatchSize);
        scheduleTimeout(this.retryTimeMillis);
        sendRange(this.matchIndex + MIN_INDEX, min, leaderContext);
        return min == appendIndex;
    }

    private void sendCommitUpdate(LeaderContext leaderContext) {
        this.outbound.send(this.follower, new RaftMessages.Heartbeat(this.leader, leaderContext.term, leaderContext.commitIndex, leaderContext.term));
    }

    private void sendNewEntries(long j, long j2, RaftLogEntry[] raftLogEntryArr, LeaderContext leaderContext) {
        scheduleTimeout(this.retryTimeMillis);
        this.lastSentIndex = j + MIN_INDEX;
        this.outbound.send(this.follower, new RaftMessages.AppendEntries.Request(this.leader, leaderContext.term, j, j2, raftLogEntryArr, leaderContext.commitIndex));
    }

    private void sendEmpty(long j, LeaderContext leaderContext) {
        scheduleTimeout(this.retryTimeMillis);
        long max = Long.max(this.raftLog.prevIndex() + MIN_INDEX, j);
        this.lastSentIndex = max;
        try {
            long j2 = max - MIN_INDEX;
            long readEntryTerm = this.raftLog.readEntryTerm(j2);
            if (readEntryTerm > leaderContext.term) {
                this.log.warn("%s: aborting send. Not leader anymore? %s, prevLogTerm=%d", new Object[]{statusAsString(), leaderContext, Long.valueOf(readEntryTerm)});
            } else if (doesNotExistInLog(j2, readEntryTerm)) {
                this.log.warn("%s: Entry was pruned when sending empty (prevLogIndex=%d, prevLogTerm=%d)", new Object[]{statusAsString(), Long.valueOf(j2), Long.valueOf(readEntryTerm)});
            } else {
                this.outbound.send(this.follower, new RaftMessages.AppendEntries.Request(this.leader, leaderContext.term, j2, readEntryTerm, RaftLogEntry.empty, leaderContext.commitIndex));
            }
        } catch (IOException e) {
            this.log.warn(statusAsString() + " exception during empty send", e);
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:22:0x00ac, code lost:
    
        r13.log.warn("%s aborting send. Not leader anymore? %s, entryTerm=%d", new java.lang.Object[]{statusAsString(), r18, java.lang.Long.valueOf(r0[r28].term())});
     */
    /* JADX WARN: Code restructure failed: missing block: B:23:0x00d7, code lost:
    
        if (r0 == null) goto L77;
     */
    /* JADX WARN: Code restructure failed: missing block: B:25:0x00dc, code lost:
    
        if (0 == 0) goto L26;
     */
    /* JADX WARN: Code restructure failed: missing block: B:26:0x00f3, code lost:
    
        r0.close();
     */
    /* JADX WARN: Code restructure failed: missing block: B:27:0x00f8, code lost:
    
        return;
     */
    /* JADX WARN: Code restructure failed: missing block: B:29:0x00df, code lost:
    
        r0.close();
     */
    /* JADX WARN: Code restructure failed: missing block: B:31:?, code lost:
    
        return;
     */
    /* JADX WARN: Code restructure failed: missing block: B:32:0x00e7, code lost:
    
        r29 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:33:0x00e9, code lost:
    
        r27.addSuppressed(r29);
     */
    /* JADX WARN: Code restructure failed: missing block: B:34:?, code lost:
    
        return;
     */
    /* JADX WARN: Code restructure failed: missing block: B:35:?, code lost:
    
        return;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void sendRange(long r14, long r16, org.neo4j.coreedge.core.consensus.LeaderContext r18) {
        /*
            Method dump skipped, instructions count: 490
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.neo4j.coreedge.core.consensus.shipping.RaftLogShipper.sendRange(long, long, org.neo4j.coreedge.core.consensus.LeaderContext):void");
    }

    private boolean doesNotExistInLog(long j, long j2) {
        return j2 == -1 && j != -1;
    }

    private void sendLogCompactionInfo(LeaderContext leaderContext) {
        this.log.warn("Sending log compaction info. Log pruned? Status=%s, LeaderContext=%s", new Object[]{statusAsString(), leaderContext});
        this.outbound.send(this.follower, new RaftMessages.LogCompactionInfo(this.leader, leaderContext.term, this.raftLog.prevIndex()));
    }

    private String statusAsString() {
        return String.format("%s[matchIndex: %d, lastSentIndex: %d, localAppendIndex: %d, mode: %s]", this.follower, Long.valueOf(this.matchIndex), Long.valueOf(this.lastSentIndex), Long.valueOf(this.raftLog.appendIndex()), this.mode);
    }
}
