package com.torodb.mongodb.repl.oplogreplier.fetcher;

import com.google.common.base.Preconditions;
import com.google.inject.assistedinject.Assisted;
import com.mongodb.MongoServerException;
import com.torodb.core.logging.LoggerFactory;
import com.torodb.core.retrier.Retrier;
import com.torodb.core.retrier.RetrierAbortException;
import com.torodb.core.retrier.RetrierGiveUpException;
import com.torodb.core.transaction.RollbackException;
import com.torodb.mongodb.repl.OplogReader;
import com.torodb.mongodb.repl.OplogReaderProvider;
import com.torodb.mongodb.repl.ReplMetrics;
import com.torodb.mongodb.repl.SyncSourceProvider;
import com.torodb.mongodb.repl.oplogreplier.FinishedOplogBatch;
import com.torodb.mongodb.repl.oplogreplier.NormalOplogBatch;
import com.torodb.mongodb.repl.oplogreplier.NotReadyForMoreOplogBatch;
import com.torodb.mongodb.repl.oplogreplier.RollbackReplicationException;
import com.torodb.mongodb.repl.oplogreplier.StopReplicationException;
import com.torodb.mongodb.repl.oplogreplier.batch.OplogBatch;
import com.torodb.mongowp.OpTime;
import com.torodb.mongowp.commands.MongoRuntimeException;
import com.torodb.mongowp.commands.oplog.OplogOperation;
import com.torodb.mongowp.commands.pojos.MongoCursor;
import com.torodb.mongowp.exceptions.MongoException;
import com.torodb.mongowp.exceptions.OplogOperationUnsupported;
import com.torodb.mongowp.exceptions.OplogStartMissingException;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import javax.inject.Inject;
import org.apache.logging.log4j.Logger;

@NotThreadSafe
/* loaded from: input_file:com/torodb/mongodb/repl/oplogreplier/fetcher/ContinuousOplogFetcher.class */
public class ContinuousOplogFetcher implements OplogFetcher {
    private final Logger logger;
    private final OplogReaderProvider readerProvider;
    private final SyncSourceProvider syncSourceProvider;
    private final Retrier retrier;
    private final FetcherState state;
    private final ReplMetrics metrics;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:com/torodb/mongodb/repl/oplogreplier/fetcher/ContinuousOplogFetcher$ContinuousOplogFetcherFactory.class */
    public interface ContinuousOplogFetcherFactory {
        ContinuousOplogFetcher createFetcher(long j, OpTime opTime);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/torodb/mongodb/repl/oplogreplier/fetcher/ContinuousOplogFetcher$FetcherState.class */
    public class FetcherState implements AutoCloseable {
        private volatile boolean closed;
        private long lastFetchedHash;
        private OpTime lastFetchedOpTime;
        private OplogReader oplogReader;
        private MongoCursor<OplogOperation> cursor;

        private FetcherState(long j, OpTime opTime) {
            this.closed = false;
            this.lastFetchedHash = j;
            this.lastFetchedOpTime = opTime;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void prepareToFetch() throws StopReplicationException, RollbackException, RollbackReplicationException {
            if (this.oplogReader == null) {
                calculateOplogReader();
            } else if (ContinuousOplogFetcher.this.syncSourceProvider.shouldChangeSyncSource()) {
                ContinuousOplogFetcher.this.logger.info("A better sync source has been detected");
                discardReader();
                calculateOplogReader();
            }
            if (this.cursor == null || this.cursor.isClosed()) {
                calculateMongoCursor();
            }
        }

        @Nonnull
        private OplogReader getLastUsedOplogReader() {
            Preconditions.checkState(this.oplogReader != null, "The oplog reader must be calculated before");
            return this.oplogReader;
        }

        @Nonnull
        private OplogReader calculateOplogReader() throws StopReplicationException {
            if (this.oplogReader == null) {
                try {
                    ContinuousOplogFetcher.this.logger.debug("Looking for a sync source");
                    this.oplogReader = (OplogReader) ContinuousOplogFetcher.this.retrier.retry(() -> {
                        return ContinuousOplogFetcher.this.readerProvider.newReader(ContinuousOplogFetcher.this.syncSourceProvider.newSyncSource(this.lastFetchedOpTime));
                    }, Retrier.Hint.TIME_SENSIBLE, Retrier.Hint.CRITICAL);
                    ContinuousOplogFetcher.this.logger.info("Reading from {}", ContinuousOplogFetcher.this.state.oplogReader.getSyncSource());
                } catch (RetrierGiveUpException e) {
                    throw new StopReplicationException("It was impossible find a reachable sync source", e);
                }
            }
            return this.oplogReader;
        }

        /* JADX INFO: Access modifiers changed from: private */
        @Nonnull
        public MongoCursor<OplogOperation> getLastUsedMongoCursor() throws RestartFetchException {
            if (this.cursor == null || this.cursor.isClosed()) {
                throw new RestartFetchException("The cursor has not been calculated or it is closed");
            }
            return this.cursor;
        }

        private MongoCursor<OplogOperation> calculateMongoCursor() throws StopReplicationException, RollbackException, RollbackReplicationException {
            if (this.cursor == null || this.cursor.isClosed()) {
                try {
                    this.cursor = getLastUsedOplogReader().queryGte(this.lastFetchedOpTime);
                    ContinuousOplogFetcher.this.checkRollback(this.oplogReader, this.cursor.hasNext() ? (OplogOperation) this.cursor.next() : null);
                } catch (MongoException e) {
                    throw new RollbackException(e);
                }
            }
            return this.cursor;
        }

        public boolean isClosed() {
            return this.closed;
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            if (this.cursor != null) {
                this.cursor.close();
            }
            this.closed = true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void discardReader() {
            if (this.cursor != null) {
                this.cursor.close();
                this.cursor = null;
            }
            if (this.oplogReader != null) {
                this.oplogReader.close();
                this.oplogReader = null;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void updateState(List<OplogOperation> list, long j) {
            int size = list.size();
            if (size == 0) {
                return;
            }
            OplogOperation oplogOperation = list.get(size - 1);
            this.lastFetchedHash = oplogOperation.getHash();
            this.lastFetchedOpTime = oplogOperation.getOpTime();
            ContinuousOplogFetcher.this.metrics.getLastOpTimeFetched().setValue(ContinuousOplogFetcher.this.state.lastFetchedOpTime.toString());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/torodb/mongodb/repl/oplogreplier/fetcher/ContinuousOplogFetcher$RestartFetchException.class */
    public static class RestartFetchException extends Exception {
        private static final long serialVersionUID = 1;

        private RestartFetchException() {
        }

        private RestartFetchException(String str, RetrierGiveUpException retrierGiveUpException) {
            super(str, retrierGiveUpException);
        }

        private RestartFetchException(String str) {
            super(str);
        }
    }

    @Inject
    public ContinuousOplogFetcher(OplogReaderProvider oplogReaderProvider, SyncSourceProvider syncSourceProvider, Retrier retrier, @Assisted long j, @Assisted OpTime opTime, ReplMetrics replMetrics, LoggerFactory loggerFactory) {
        this.logger = loggerFactory.apply(getClass());
        this.readerProvider = oplogReaderProvider;
        this.syncSourceProvider = syncSourceProvider;
        this.retrier = retrier;
        this.state = new FetcherState(j, opTime);
        this.metrics = replMetrics;
    }

    @Override // com.torodb.mongodb.repl.oplogreplier.fetcher.OplogFetcher
    public OplogBatch fetch() throws StopReplicationException, RollbackReplicationException {
        if (this.state.isClosed()) {
            return FinishedOplogBatch.getInstance();
        }
        try {
            return (OplogBatch) this.retrier.retry(() -> {
                try {
                    try {
                        if (this.state.isClosed()) {
                            return FinishedOplogBatch.getInstance();
                        }
                        this.state.prepareToFetch();
                        MongoCursor<OplogOperation> lastUsedMongoCursor = this.state.getLastUsedMongoCursor();
                        MongoCursor.Batch tryFetchBatch = lastUsedMongoCursor.tryFetchBatch();
                        if (tryFetchBatch == null || !tryFetchBatch.hasNext()) {
                            Thread.sleep(1000L);
                            tryFetchBatch = lastUsedMongoCursor.tryFetchBatch();
                            if (tryFetchBatch == null || !tryFetchBatch.hasNext()) {
                                return NotReadyForMoreOplogBatch.getInstance();
                            }
                        }
                        List<OplogOperation> list = null;
                        long j = 0;
                        boolean z = false;
                        try {
                            list = tryFetchBatch.asList();
                            j = tryFetchBatch.getFetchTime();
                            postBatchChecks(lastUsedMongoCursor, list);
                            NormalOplogBatch normalOplogBatch = new NormalOplogBatch(list, true);
                            z = true;
                            if (1 == 0) {
                                lastUsedMongoCursor.close();
                            } else {
                                if (!$assertionsDisabled && list == null) {
                                    throw new AssertionError();
                                }
                                if (!$assertionsDisabled && j == 0) {
                                    throw new AssertionError();
                                }
                                this.state.updateState(list, j);
                            }
                            return normalOplogBatch;
                        } catch (Throwable th) {
                            if (!z) {
                                lastUsedMongoCursor.close();
                            } else {
                                if (!$assertionsDisabled && list == null) {
                                    throw new AssertionError();
                                }
                                if (!$assertionsDisabled && j == 0) {
                                    throw new AssertionError();
                                }
                                this.state.updateState(list, j);
                            }
                            throw th;
                        }
                    } catch (MongoException | MongoRuntimeException e) {
                        this.logger.warn("Catched an error while reading the remote oplog: {}", e.getLocalizedMessage());
                        this.state.discardReader();
                        throw new RollbackException(e);
                    } catch (RollbackReplicationException | StopReplicationException e2) {
                        throw new RetrierAbortException(e2);
                    }
                } catch (MongoCursor.DeadCursorException e3) {
                    throw new RollbackException(e3);
                } catch (RestartFetchException e4) {
                    this.state.discardReader();
                    throw new RollbackException(e4);
                } catch (MongoServerException e5) {
                    this.logger.debug("Found an unwrapped MongodbServerException");
                    this.state.discardReader();
                    throw new RollbackException(e5);
                }
            }, Retrier.Hint.CRITICAL, Retrier.Hint.TIME_SENSIBLE);
        } catch (RetrierAbortException e) {
            close();
            Throwable cause = e.getCause();
            if (cause != null) {
                if (cause instanceof StopReplicationException) {
                    throw ((StopReplicationException) cause);
                }
                if (cause instanceof RollbackReplicationException) {
                    throw ((RollbackReplicationException) cause);
                }
            }
            throw new StopReplicationException("Stopping replication after a unknown abort exception", e);
        } catch (RetrierGiveUpException e2) {
            close();
            throw new StopReplicationException("Stopping replication after several attepts to fetch the remote oplog", e2);
        }
    }

    @Override // com.torodb.mongodb.repl.oplogreplier.fetcher.OplogFetcher, java.lang.AutoCloseable
    public void close() {
        this.state.close();
    }

    private void postBatchChecks(MongoCursor<OplogOperation> mongoCursor, @Nonnull List<OplogOperation> list) throws RollbackException, RestartFetchException {
        if (list.isEmpty() && mongoCursor.hasNext()) {
            throw new RollbackException();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkRollback(OplogReader oplogReader, @Nullable OplogOperation oplogOperation) throws StopReplicationException, RollbackReplicationException {
        if (oplogOperation != null) {
            if (oplogOperation.getHash() != this.state.lastFetchedHash || !oplogOperation.getOpTime().equals(this.state.lastFetchedOpTime)) {
                throw new RollbackReplicationException("Rolling back: Our last fetched = [" + this.state.lastFetchedOpTime + ", " + this.state.lastFetchedHash + "]. Source = [" + oplogOperation.getOpTime() + ", " + oplogOperation.getHash() + "]");
            }
            return;
        }
        try {
            if (oplogReader.getLastOp().getOpTime().compareTo(this.state.lastFetchedOpTime) < 0) {
                throw new RollbackReplicationException("We are ahead of the sync source. Rolling back");
            }
        } catch (OplogOperationUnsupported e) {
            throw new StopReplicationException("Sync source contais an invalid operation!", e);
        } catch (OplogStartMissingException e2) {
            throw new StopReplicationException("Sync source contais no operation on his oplog!");
        } catch (MongoException e3) {
            throw new StopReplicationException("Unknown error while trying to fetch last remote operation", e3);
        }
    }

    static {
        $assertionsDisabled = !ContinuousOplogFetcher.class.desiredAssertionStatus();
    }
}
