package com.torodb.mongodb.repl.oplogreplier;

import com.torodb.core.annotations.TorodbRunnableService;
import com.torodb.core.logging.LoggerFactory;
import com.torodb.core.services.RunnableTorodbService;
import com.torodb.core.supervision.Supervisor;
import com.torodb.core.supervision.SupervisorDecision;
import com.torodb.core.transaction.RollbackException;
import com.torodb.mongodb.repl.oplogreplier.batch.OplogBatch;
import com.torodb.mongodb.repl.oplogreplier.fetcher.OplogFetcher;
import com.torodb.mongowp.commands.oplog.OplogOperation;
import java.util.concurrent.ThreadFactory;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.logging.log4j.Logger;

@NotThreadSafe
/* loaded from: input_file:com/torodb/mongodb/repl/oplogreplier/ReplSyncFetcher.class */
class ReplSyncFetcher extends RunnableTorodbService {
    private final Logger logger;
    private final SyncServiceView callback;
    private final OplogFetcher fetcher;
    private volatile Thread runThread;

    /* loaded from: input_file:com/torodb/mongodb/repl/oplogreplier/ReplSyncFetcher$SyncServiceView.class */
    public interface SyncServiceView extends Supervisor {
        void deliver(OplogOperation oplogOperation) throws InterruptedException;

        void rollback(RollbackReplicationException rollbackReplicationException);

        void awaitUntilUnpaused() throws InterruptedException;

        boolean shouldPause();

        void awaitUntilAllFetchedAreApplied();

        void fetchFinished();

        void fetchAborted(Throwable th);

        default SupervisorDecision onError(Object obj, Throwable th) {
            fetchAborted(th);
            return SupervisorDecision.STOP;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReplSyncFetcher(@TorodbRunnableService ThreadFactory threadFactory, SyncServiceView syncServiceView, OplogFetcher oplogFetcher, LoggerFactory loggerFactory) {
        super(syncServiceView, threadFactory);
        this.logger = loggerFactory.apply(getClass());
        this.callback = syncServiceView;
        this.fetcher = oplogFetcher;
    }

    protected Logger getLogger() {
        return this.logger;
    }

    protected String serviceName() {
        return "ToroDB Sync Fetcher";
    }

    protected void triggerShutdown() {
        if (this.runThread != null) {
            this.runThread.interrupt();
        }
    }

    public void runProtected() {
        this.runThread = Thread.currentThread();
        RollbackReplicationException rollbackReplicationException = null;
        boolean z = false;
        while (true) {
            if (rollbackReplicationException != null) {
                break;
            }
            try {
                if (!isRunning()) {
                    break;
                }
                try {
                    try {
                        try {
                            if (this.callback.shouldPause()) {
                                this.callback.awaitUntilUnpaused();
                            } else {
                                this.callback.awaitUntilAllFetchedAreApplied();
                                OplogBatch fetch = this.fetcher.fetch();
                                if (fetch.isLastOne()) {
                                    z = true;
                                    break;
                                }
                                fetch.getOps().forEach(oplogOperation -> {
                                    try {
                                        this.callback.deliver(oplogOperation);
                                    } catch (InterruptedException e) {
                                        Thread.interrupted();
                                        throw new RollbackException(serviceName() + " interrupted while a message was being to deliver.", e);
                                    }
                                });
                                if (!fetch.isReadyForMore()) {
                                    this.logger.warn("There is no source to sync from");
                                    Thread.sleep(1000L);
                                }
                            }
                        } catch (Throwable th) {
                            throw new StopReplicationException(th);
                        }
                    } catch (StopReplicationException e) {
                        throw e;
                    } catch (InterruptedException e2) {
                        Thread.interrupted();
                        this.logger.info("Restarting fetch process", e2);
                    }
                } catch (RollbackException e3) {
                    this.logger.info("Retrying after a rollback exception");
                } catch (RollbackReplicationException e4) {
                    rollbackReplicationException = e4;
                }
            } catch (StopReplicationException e5) {
                this.logger.info(serviceName() + " stopped by self request");
                this.callback.fetchAborted(e5);
            }
        }
        if (rollbackReplicationException != null) {
            this.logger.debug("Requesting rollback");
            this.callback.rollback(rollbackReplicationException);
        } else {
            if (z) {
                this.logger.info("Remote oplog finished");
            } else {
                this.logger.info(serviceName() + " ending by external request");
            }
            this.callback.fetchFinished();
        }
        this.logger.info(serviceName() + " stopped");
    }
}
