package com.torodb.mongodb.repl;

import com.google.inject.assistedinject.Assisted;
import com.torodb.core.concurrent.ConcurrentToolsFactory;
import com.torodb.core.logging.LoggerFactory;
import com.torodb.core.services.IdleTorodbService;
import com.torodb.mongodb.repl.OplogApplierService;
import com.torodb.mongodb.repl.OplogManager;
import com.torodb.mongodb.repl.oplogreplier.ApplierContext;
import com.torodb.mongodb.repl.oplogreplier.OplogApplier;
import com.torodb.mongodb.repl.oplogreplier.RollbackReplicationException;
import com.torodb.mongodb.repl.oplogreplier.fetcher.ContinuousOplogFetcher;
import com.torodb.mongodb.repl.oplogreplier.fetcher.OplogFetcher;
import com.torodb.mongowp.OpTime;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import javax.inject.Inject;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/torodb/mongodb/repl/DefaultOplogApplierService.class */
public class DefaultOplogApplierService extends IdleTorodbService implements OplogApplierService {
    private final Logger logger;
    private final OplogApplier oplogApplier;
    private final ContinuousOplogFetcher.ContinuousOplogFetcherFactory oplogFetcherFactory;
    private final OplogManager oplogManager;
    private final OplogApplierService.Callback callback;
    private volatile boolean stopping;
    private OplogFetcher fetcher;
    private OplogApplier.ApplyingJob applyJob;
    private final ExecutorService selfExecutor;
    private CompletableFuture<Void> onFinishFuture;
    static final /* synthetic */ boolean $assertionsDisabled;

    @Inject
    public DefaultOplogApplierService(ThreadFactory threadFactory, OplogApplier oplogApplier, OplogManager oplogManager, ContinuousOplogFetcher.ContinuousOplogFetcherFactory continuousOplogFetcherFactory, LoggerFactory loggerFactory, @Assisted OplogApplierService.Callback callback, ConcurrentToolsFactory concurrentToolsFactory) {
        super(threadFactory);
        this.logger = loggerFactory.apply(getClass());
        this.oplogApplier = oplogApplier;
        this.oplogFetcherFactory = continuousOplogFetcherFactory;
        this.oplogManager = oplogManager;
        this.callback = callback;
        this.selfExecutor = concurrentToolsFactory.createExecutorService("oplog-applier-service", true, 1);
    }

    protected void startUp() throws Exception {
        this.callback.waitUntilStartPermision();
        this.fetcher = createFetcher();
        this.applyJob = this.oplogApplier.apply(this.fetcher, new ApplierContext.Builder().setReapplying(false).setUpdatesAsUpserts(true).build());
        this.onFinishFuture = this.applyJob.onFinish().thenAcceptAsync(tuple2 -> {
            if (this.stopping) {
                return;
            }
            switch ((OplogApplier.ApplyingJobFinishState) tuple2.v1) {
                case FINE:
                case ROLLBACK:
                    this.callback.rollback(this, (RollbackReplicationException) tuple2.v2);
                    return;
                case UNEXPECTED:
                case STOP:
                    this.callback.onError(this, (Throwable) tuple2.v2);
                    return;
                case CANCELLED:
                    this.callback.onError(this, new AssertionError("Unexpected cancellation of the applier while the service is not stopping"));
                    return;
                default:
                    this.callback.onError(this, new AssertionError("Unexpected " + OplogApplier.ApplyingJobFinishState.class.getSimpleName() + " found: " + tuple2.v1 + " with error " + tuple2.v2));
                    return;
            }
        }, (Executor) this.selfExecutor);
    }

    protected void shutDown() throws Exception {
        this.logger.debug("Shutdown requested");
        this.stopping = true;
        if (this.applyJob == null) {
            this.logger.debug(serviceName() + " stopped before it was running?");
        } else if (this.applyJob.onFinish().isDone()) {
            this.logger.trace("Requesting to stop the stream");
            this.applyJob.cancel();
            this.applyJob.onFinish().join();
            this.logger.trace("Applier finished");
        } else {
            this.logger.trace("Applier has been already finished");
        }
        if (this.fetcher != null) {
            this.logger.trace("Closing the fetcher");
            this.fetcher.close();
            this.logger.trace("Fetcher closed");
        } else {
            this.logger.debug(serviceName() + " stopped before it was running?");
        }
        if (this.onFinishFuture != null) {
            this.onFinishFuture.join();
        }
        List<Runnable> shutdownNow = this.selfExecutor.shutdownNow();
        if (!$assertionsDisabled && !shutdownNow.isEmpty()) {
            throw new AssertionError("Oplog applier service shutted down before its task were correctly executed");
        }
        this.callback.onFinish(this);
    }

    private OplogFetcher createFetcher() {
        OplogManager.ReadOplogTransaction createReadTransaction = this.oplogManager.createReadTransaction();
        Throwable th = null;
        try {
            try {
                OpTime lastAppliedOptime = createReadTransaction.getLastAppliedOptime();
                long lastAppliedHash = createReadTransaction.getLastAppliedHash();
                if (createReadTransaction != null) {
                    if (0 != 0) {
                        try {
                            createReadTransaction.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createReadTransaction.close();
                    }
                }
                return this.oplogFetcherFactory.createFetcher(lastAppliedHash, lastAppliedOptime);
            } finally {
            }
        } catch (Throwable th3) {
            if (createReadTransaction != null) {
                if (th != null) {
                    try {
                        createReadTransaction.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createReadTransaction.close();
                }
            }
            throw th3;
        }
    }

    static {
        $assertionsDisabled = !DefaultOplogApplierService.class.desiredAssertionStatus();
    }
}
