package org.neo4j.causalclustering.core.replication;

import java.util.concurrent.Future;
import java.util.function.BiConsumer;
import org.neo4j.causalclustering.core.consensus.LeaderInfo;
import org.neo4j.causalclustering.core.consensus.LeaderListener;
import org.neo4j.causalclustering.core.consensus.LeaderLocator;
import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException;
import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.replication.monitoring.ReplicationMonitor;
import org.neo4j.causalclustering.core.replication.session.LocalSessionPool;
import org.neo4j.causalclustering.core.replication.session.OperationContext;
import org.neo4j.causalclustering.helper.TimeoutStrategy;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.messaging.Outbound;
import org.neo4j.kernel.availability.AvailabilityGuard;
import org.neo4j.kernel.availability.UnavailableException;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

/* loaded from: input_file:org/neo4j/causalclustering/core/replication/RaftReplicator.class */
public class RaftReplicator implements Replicator, LeaderListener {
    private final MemberId me;
    private final Outbound<MemberId, RaftMessages.RaftMessage> outbound;
    private final ProgressTracker progressTracker;
    private final LocalSessionPool sessionPool;
    private final TimeoutStrategy progressTimeoutStrategy;
    private final AvailabilityGuard availabilityGuard;
    private final LeaderLocator leaderLocator;
    private final TimeoutStrategy leaderTimeoutStrategy;
    private final Log log;
    private final Throttler throttler;
    private final ReplicationMonitor replicationMonitor;
    private final long availabilityTimeoutMillis;

    public RaftReplicator(LeaderLocator leaderLocator, MemberId memberId, Outbound<MemberId, RaftMessages.RaftMessage> outbound, LocalSessionPool localSessionPool, ProgressTracker progressTracker, TimeoutStrategy timeoutStrategy, TimeoutStrategy timeoutStrategy2, long j, AvailabilityGuard availabilityGuard, LogProvider logProvider, long j2, Monitors monitors) {
        this.me = memberId;
        this.outbound = outbound;
        this.progressTracker = progressTracker;
        this.sessionPool = localSessionPool;
        this.progressTimeoutStrategy = timeoutStrategy;
        this.leaderTimeoutStrategy = timeoutStrategy2;
        this.availabilityTimeoutMillis = j;
        this.availabilityGuard = availabilityGuard;
        this.throttler = new Throttler(j2);
        this.leaderLocator = leaderLocator;
        leaderLocator.registerListener(this);
        this.log = logProvider.getLog(getClass());
        this.replicationMonitor = (ReplicationMonitor) monitors.newMonitor(ReplicationMonitor.class, new String[0]);
    }

    @Override // org.neo4j.causalclustering.core.replication.Replicator
    public Future<Object> replicate(ReplicatedContent replicatedContent, boolean z) throws ReplicationFailureException {
        try {
            MemberId leader = this.leaderLocator.getLeader();
            if (!replicatedContent.size().isPresent()) {
                return replicate0(replicatedContent, z, leader);
            }
            try {
                return (Future) this.throttler.invoke(() -> {
                    return replicate0(replicatedContent, z, leader);
                }, replicatedContent.size().get().longValue());
            } catch (InterruptedException e) {
                throw new ReplicationFailureException("Interrupted while waiting for replication credits", e);
            }
        } catch (NoLeaderFoundException e2) {
            throw new ReplicationFailureException("Replication aborted since no leader was available", e2);
        }
    }

    private Future<Object> replicate0(ReplicatedContent replicatedContent, boolean z, MemberId memberId) throws ReplicationFailureException {
        this.replicationMonitor.startReplication();
        try {
            assertNoLeaderSwitch(memberId);
            OperationContext acquireSession = this.sessionPool.acquireSession();
            DistributedOperation distributedOperation = new DistributedOperation(replicatedContent, acquireSession.globalSession(), acquireSession.localOperationId());
            Progress start = this.progressTracker.start(distributedOperation);
            TimeoutStrategy.Timeout newTimeout = this.progressTimeoutStrategy.newTimeout();
            TimeoutStrategy.Timeout newTimeout2 = this.leaderTimeoutStrategy.newTimeout();
            int i = 0;
            do {
                try {
                    i++;
                    if (i > 1) {
                        this.log.info("Retrying replication. Current attempt: %d Content: %s", new Object[]{Integer.valueOf(i), replicatedContent});
                    }
                    this.replicationMonitor.replicationAttempt();
                    assertDatabaseAvailable();
                    try {
                        this.outbound.send(memberId, new RaftMessages.NewEntry.Request(this.me, distributedOperation), true);
                        newTimeout2 = this.leaderTimeoutStrategy.newTimeout();
                        start.awaitReplication(newTimeout.getMillis());
                        newTimeout.increment();
                        memberId = this.leaderLocator.getLeader();
                    } catch (NoLeaderFoundException e) {
                        this.log.debug("Could not replicate operation " + distributedOperation + " because no leader was found. Retrying.", e);
                        Thread.sleep(newTimeout2.getMillis());
                        newTimeout2.increment();
                    }
                } catch (InterruptedException e2) {
                    this.progressTracker.abort(distributedOperation);
                    throw new ReplicationFailureException("Interrupted while replicating", e2);
                }
            } while (!start.isReplicated());
            BiConsumer<? super Object, ? super Throwable> biConsumer = (obj, th) -> {
                this.sessionPool.releaseSession(acquireSession);
            };
            if (z) {
                start.futureResult().whenComplete(biConsumer);
            } else {
                biConsumer.accept(null, null);
            }
            this.replicationMonitor.successfulReplication();
            return start.futureResult();
        } catch (Throwable th2) {
            this.replicationMonitor.failedReplication(th2);
            throw th2;
        }
    }

    private void assertNoLeaderSwitch(MemberId memberId) throws ReplicationFailureException {
        try {
            if (!this.leaderLocator.getLeader().equals(memberId)) {
                throw new ReplicationFailureException("Replication aborted since a leader switch was detected");
            }
        } catch (NoLeaderFoundException e) {
            throw new ReplicationFailureException("Replication aborted since no leader was available", e);
        }
    }

    @Override // org.neo4j.causalclustering.core.consensus.LeaderListener
    public void onLeaderSwitch(LeaderInfo leaderInfo) {
        this.progressTracker.triggerReplicationEvent();
    }

    private void assertDatabaseAvailable() throws ReplicationFailureException {
        try {
            this.availabilityGuard.await(this.availabilityTimeoutMillis);
        } catch (UnavailableException e) {
            throw new ReplicationFailureException("Database is not available, transaction cannot be replicated.", e);
        }
    }
}
