package com.torodb.mongodb.repl;

import com.google.common.net.HostAndPort;
import com.torodb.core.annotations.TorodbRunnableService;
import com.torodb.core.logging.LoggerFactory;
import com.torodb.core.services.RunnableTorodbService;
import com.torodb.core.supervision.Supervisor;
import com.torodb.mongodb.repl.exceptions.NoSyncSourceFoundException;
import com.torodb.mongowp.OpTime;
import com.torodb.mongowp.client.core.UnreachableMongoServerException;
import com.torodb.mongowp.commands.oplog.OplogOperation;
import com.torodb.mongowp.commands.pojos.MongoCursor;
import com.torodb.mongowp.exceptions.MongoException;
import com.torodb.mongowp.exceptions.OplogOperationUnsupported;
import com.torodb.mongowp.exceptions.OplogStartMissingException;
import java.util.concurrent.ThreadFactory;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.logging.log4j.Logger;

@NotThreadSafe
/* loaded from: input_file:com/torodb/mongodb/repl/ReplSyncFetcher.class */
class ReplSyncFetcher extends RunnableTorodbService {
    private static final int MIN_BATCH_SIZE = 5;
    private static final long SLEEP_TO_BATCH_MILLIS = 2;
    private final Logger logger;
    private final SyncServiceView callback;
    private final OplogReaderProvider readerProvider;
    private final SyncSourceProvider syncSourceProvider;
    private final ReplMetrics metrics;
    private long opsReadCounter;
    private long lastFetchedHash;
    private OpTime lastFetchedOpTime;
    private volatile Thread runThread;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/torodb/mongodb/repl/ReplSyncFetcher$RestartFetchException.class */
    public static class RestartFetchException extends Exception {
        private static final long serialVersionUID = 1;

        private RestartFetchException() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/torodb/mongodb/repl/ReplSyncFetcher$StopFetchException.class */
    public static class StopFetchException extends Exception {
        private static final long serialVersionUID = 1;

        public StopFetchException() {
        }

        public StopFetchException(Throwable th) {
            super(th);
        }
    }

    /* loaded from: input_file:com/torodb/mongodb/repl/ReplSyncFetcher$SyncServiceView.class */
    public interface SyncServiceView {
        void deliver(@Nonnull OplogOperation oplogOperation) throws InterruptedException;

        void rollback(OplogReader oplogReader);

        void awaitUntilUnpaused() throws InterruptedException;

        boolean shouldPause();

        void awaitUntilAllFetchedAreApplied();

        void fetchFinished();

        void fetchAborted(Throwable th);
    }

    ReplSyncFetcher(@TorodbRunnableService ThreadFactory threadFactory, Supervisor supervisor, @Nonnull SyncServiceView syncServiceView, @Nonnull SyncSourceProvider syncSourceProvider, @Nonnull OplogReaderProvider oplogReaderProvider, long j, OpTime opTime, ReplMetrics replMetrics, LoggerFactory loggerFactory) {
        super(supervisor, threadFactory);
        this.opsReadCounter = 0L;
        this.logger = loggerFactory.apply(getClass());
        this.callback = syncServiceView;
        this.readerProvider = oplogReaderProvider;
        this.lastFetchedHash = 0L;
        this.lastFetchedOpTime = null;
        this.syncSourceProvider = syncSourceProvider;
        this.lastFetchedHash = j;
        this.lastFetchedOpTime = opTime;
        this.metrics = replMetrics;
    }

    protected String serviceName() {
        return "ToroDB Sync Fetcher";
    }

    public long getOpsReadCounter() {
        return this.opsReadCounter;
    }

    protected void triggerShutdown() {
        if (this.runThread != null) {
            this.runThread.interrupt();
        }
    }

    /* JADX WARN: Finally extract failed */
    public void runProtected() {
        HostAndPort hostAndPort;
        this.runThread = Thread.currentThread();
        boolean z = false;
        OplogReader oplogReader = null;
        while (!z) {
            try {
                if (!isRunning()) {
                    break;
                }
                try {
                    try {
                        try {
                            if (this.callback.shouldPause()) {
                                this.callback.awaitUntilUnpaused();
                            }
                            this.callback.awaitUntilAllFetchedAreApplied();
                            hostAndPort = null;
                        } catch (Throwable th) {
                            if (oplogReader != null) {
                                oplogReader.close();
                            }
                            throw th;
                        }
                    } catch (InterruptedException e) {
                        this.logger.info("Interrupted fetch process", e);
                        if (oplogReader != null) {
                            oplogReader.close();
                        }
                    }
                } catch (RestartFetchException e2) {
                    this.logger.info("Restarting fetch process", e2);
                    if (oplogReader != null) {
                        oplogReader.close();
                    }
                } catch (Throwable th2) {
                    throw new StopFetchException(th2);
                }
                try {
                    hostAndPort = this.syncSourceProvider.newSyncSource(this.lastFetchedOpTime);
                    oplogReader = this.readerProvider.newReader(hostAndPort);
                    z = fetch(oplogReader);
                    if (oplogReader != null) {
                        oplogReader.close();
                    }
                } catch (UnreachableMongoServerException e3) {
                    if (!$assertionsDisabled && hostAndPort == null) {
                        throw new AssertionError();
                        break;
                    }
                    this.logger.warn("It was impossible to reach the sync source " + hostAndPort);
                    Thread.sleep(1000L);
                    if (oplogReader != null) {
                        oplogReader.close();
                    }
                } catch (NoSyncSourceFoundException e4) {
                    this.logger.warn("There is no source to sync from");
                    Thread.sleep(1000L);
                    if (oplogReader != null) {
                        oplogReader.close();
                    }
                }
            } catch (StopFetchException e5) {
                this.logger.info(serviceName() + " stopped by self request");
                this.callback.fetchAborted(e5);
            }
        }
        if (z) {
            this.logger.info("Requesting rollback");
            this.callback.rollback(oplogReader);
        } else {
            this.logger.info(serviceName() + " ending by external request");
            this.callback.fetchFinished();
        }
        this.logger.info(serviceName() + " stopped");
    }

    protected Logger getLogger() {
        return this.logger;
    }

    public boolean fetchIterationCanContinue() {
        return isRunning() && !this.callback.shouldPause();
    }

    private boolean fetch(OplogReader oplogReader) throws StopFetchException, RestartFetchException {
        try {
            MongoCursor<OplogOperation> queryGte = oplogReader.queryGte(this.lastFetchedOpTime);
            MongoCursor.Batch<OplogOperation> fetchBatch = queryGte.fetchBatch();
            postBatchChecks(oplogReader, queryGte, fetchBatch);
            try {
                if (isRollbackNeeded(oplogReader, fetchBatch, this.lastFetchedOpTime, this.lastFetchedHash)) {
                    return true;
                }
                while (fetchIterationCanContinue()) {
                    if (!fetchBatch.hasNext()) {
                        preBatchChecks(fetchBatch);
                        fetchBatch = queryGte.fetchBatch();
                        postBatchChecks(oplogReader, queryGte, fetchBatch);
                    } else if (fetchBatch.hasNext()) {
                        OplogOperation oplogOperation = (OplogOperation) fetchBatch.next();
                        if (!$assertionsDisabled && oplogOperation == null) {
                            throw new AssertionError();
                        }
                        boolean z = false;
                        while (!z) {
                            try {
                                this.logger.debug("Delivered op: {}", oplogOperation);
                                this.callback.deliver(oplogOperation);
                                z = true;
                                this.opsReadCounter++;
                            } catch (InterruptedException e) {
                                this.logger.warn(serviceName() + " interrupted while a message was being to deliver. Retrying", e);
                            }
                        }
                        this.lastFetchedHash = oplogOperation.getHash();
                        this.lastFetchedOpTime = oplogOperation.getOpTime();
                        this.metrics.getLastOpTimeFetched().setValue(this.lastFetchedOpTime.toString());
                    } else {
                        continue;
                    }
                }
                queryGte.close();
                return false;
            } finally {
                queryGte.close();
            }
        } catch (MongoException e2) {
            throw new RestartFetchException();
        }
    }

    private void preBatchChecks(MongoCursor.Batch<OplogOperation> batch) {
        int batchSize = batch.getBatchSize();
        if (batchSize <= 0 || batchSize >= MIN_BATCH_SIZE || System.currentTimeMillis() - batch.getFetchTime() >= SLEEP_TO_BATCH_MILLIS) {
            return;
        }
        try {
            this.logger.debug("Batch size is very small. Waiting {} millis for more...", Long.valueOf(SLEEP_TO_BATCH_MILLIS));
            Thread.sleep(SLEEP_TO_BATCH_MILLIS);
        } catch (InterruptedException e) {
            Thread.interrupted();
        }
    }

    private void postBatchChecks(OplogReader oplogReader, MongoCursor<OplogOperation> mongoCursor, MongoCursor.Batch<OplogOperation> batch) throws RestartFetchException {
        if (batch == null) {
            throw new RestartFetchException();
        }
        infrequentChecks(oplogReader);
        if (!batch.hasNext() && mongoCursor.hasNext()) {
            throw new RestartFetchException();
        }
    }

    private void infrequentChecks(OplogReader oplogReader) throws RestartFetchException {
        if (this.syncSourceProvider.shouldChangeSyncSource()) {
            this.logger.info("A better sync source has been detected");
            throw new RestartFetchException();
        }
    }

    private boolean isRollbackNeeded(OplogReader oplogReader, MongoCursor.Batch<OplogOperation> batch, OpTime opTime, long j) throws StopFetchException {
        if (batch.hasNext()) {
            OplogOperation oplogOperation = (OplogOperation) batch.next();
            if (!$assertionsDisabled && oplogOperation == null) {
                throw new AssertionError();
            }
            if (oplogOperation.getHash() == j && oplogOperation.getOpTime().equals(opTime)) {
                return false;
            }
            this.logger.info("Rolling back: Our last fetched = [{}, {}]. Source = [{}, {}]", opTime, Long.valueOf(j), oplogOperation.getOpTime(), Long.valueOf(oplogOperation.getHash()));
            return true;
        }
        try {
            if (oplogReader.getLastOp().getOpTime().compareTo(opTime) >= 0) {
                return false;
            }
            this.logger.info("We are ahead of the sync source. Rolling back");
            return true;
        } catch (OplogStartMissingException e) {
            this.logger.error("Sync source contais no operation on his oplog!");
            throw new StopFetchException();
        } catch (MongoException e2) {
            this.logger.error("Unknown error while trying to fetch last remote operation", e2);
            throw new StopFetchException(e2);
        } catch (OplogOperationUnsupported e3) {
            this.logger.error("Sync source contais an invalid operation!");
            throw new StopFetchException(e3);
        }
    }

    static {
        $assertionsDisabled = !ReplSyncFetcher.class.desiredAssertionStatus();
    }
}
