package com.torodb.mongodb.repl.oplogreplier;

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.dispatch.ExecutionContexts;
import akka.japi.Pair;
import akka.stream.ActorMaterializer;
import akka.stream.KillSwitch;
import akka.stream.KillSwitches;
import akka.stream.UniqueKillSwitch;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.torodb.common.util.Empty;
import com.torodb.core.Shutdowner;
import com.torodb.core.concurrent.ConcurrentToolsFactory;
import com.torodb.core.concurrent.akka.BatchFlow;
import com.torodb.core.logging.LoggerFactory;
import com.torodb.mongodb.repl.OplogManager;
import com.torodb.mongodb.repl.oplogreplier.OplogApplier;
import com.torodb.mongodb.repl.oplogreplier.batch.AnalyzedOplogBatch;
import com.torodb.mongodb.repl.oplogreplier.batch.AnalyzedOplogBatchExecutor;
import com.torodb.mongodb.repl.oplogreplier.batch.BatchAnalyzer;
import com.torodb.mongodb.repl.oplogreplier.batch.OplogBatch;
import com.torodb.mongodb.repl.oplogreplier.batch.OplogBatchChecker;
import com.torodb.mongodb.repl.oplogreplier.batch.OplogBatchFilter;
import com.torodb.mongodb.repl.oplogreplier.fetcher.OplogFetcher;
import com.torodb.mongodb.repl.oplogreplier.offheapbuffer.OffHeapBufferConfig;
import com.torodb.mongodb.repl.oplogreplier.offheapbuffer.OffHeapBufferUtils;
import com.torodb.mongowp.commands.oplog.OplogOperation;
import com.typesafe.config.Config;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.function.ToIntFunction;
import javax.inject.Inject;
import org.apache.logging.log4j.Logger;
import scala.concurrent.Await;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:com/torodb/mongodb/repl/oplogreplier/DefaultOplogApplier.class */
public class DefaultOplogApplier implements OplogApplier {
    private final Logger logger;
    private final BatchLimits batchLimits;
    private final AnalyzedOplogBatchExecutor batchExecutor;
    private final OplogManager oplogManager;
    private final BatchAnalyzer.BatchAnalyzerFactory batchAnalyzerFactory;
    private final ActorSystem actorSystem;
    private final ExecutorService executorService;
    private final OplogApplierMetrics metrics;
    private final OplogBatchFilter batchFilter;
    private final OplogBatchChecker batchChecker;
    private final OffHeapBufferConfig offHeapConfig;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/torodb/mongodb/repl/oplogreplier/DefaultOplogApplier$AnalyzedStreamElement.class */
    public static class AnalyzedStreamElement {
        private final OplogBatch rawBatch;
        private final long startFetchTimestamp;
        private final List<AnalyzedOplogBatch> analyzedBatch;

        AnalyzedStreamElement(RawStreamElement rawStreamElement, List<AnalyzedOplogBatch> list) {
            this.rawBatch = rawStreamElement.rawBatch;
            this.startFetchTimestamp = rawStreamElement.startFetchTimestamp;
            this.analyzedBatch = list;
        }
    }

    /* loaded from: input_file:com/torodb/mongodb/repl/oplogreplier/DefaultOplogApplier$BatchLimits.class */
    public static class BatchLimits {
        private final int maxSize;
        private final FiniteDuration maxPeriod;

        public BatchLimits(int i, Duration duration) {
            this.maxSize = i;
            this.maxPeriod = new FiniteDuration(duration.toMillis(), TimeUnit.MILLISECONDS);
        }

        public int getMaxSize() {
            return this.maxSize;
        }

        public FiniteDuration getMaxPeriod() {
            return this.maxPeriod;
        }
    }

    /* loaded from: input_file:com/torodb/mongodb/repl/oplogreplier/DefaultOplogApplier$DefaultApplyingJob.class */
    private static class DefaultApplyingJob extends AbstractApplyingJob {
        private final KillSwitch killSwitch;

        public DefaultApplyingJob(KillSwitch killSwitch, CompletableFuture<Empty> completableFuture) {
            super(completableFuture);
            this.killSwitch = killSwitch;
        }

        @Override // com.torodb.mongodb.repl.oplogreplier.OplogApplier.ApplyingJob
        public void cancel() {
            this.killSwitch.shutdown();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/torodb/mongodb/repl/oplogreplier/DefaultOplogApplier$RawStreamElement.class */
    public static class RawStreamElement {
        private static final RawStreamElement INITIAL_ELEMENT = new RawStreamElement(null, 0);
        private final OplogBatch rawBatch;
        private final long startFetchTimestamp;

        public RawStreamElement(OplogBatch oplogBatch, long j) {
            this.rawBatch = oplogBatch;
            this.startFetchTimestamp = j;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public RawStreamElement concat(OplogBatch oplogBatch) {
            OplogBatch concat;
            long j;
            if (this == INITIAL_ELEMENT) {
                concat = oplogBatch;
                j = System.currentTimeMillis();
            } else {
                concat = this.rawBatch.concat(oplogBatch);
                j = this.startFetchTimestamp;
            }
            return new RawStreamElement(concat, j);
        }
    }

    @Inject
    public DefaultOplogApplier(BatchLimits batchLimits, OplogManager oplogManager, AnalyzedOplogBatchExecutor analyzedOplogBatchExecutor, BatchAnalyzer.BatchAnalyzerFactory batchAnalyzerFactory, ConcurrentToolsFactory concurrentToolsFactory, Shutdowner shutdowner, LoggerFactory loggerFactory, OplogApplierMetrics oplogApplierMetrics, OplogBatchFilter oplogBatchFilter, OplogBatchChecker oplogBatchChecker, OffHeapBufferConfig offHeapBufferConfig) {
        this.logger = loggerFactory.apply(getClass());
        this.batchExecutor = analyzedOplogBatchExecutor;
        this.batchLimits = batchLimits;
        this.oplogManager = oplogManager;
        this.batchAnalyzerFactory = batchAnalyzerFactory;
        this.executorService = concurrentToolsFactory.createExecutorServiceWithMaxThreads("oplog-applier", 3);
        this.actorSystem = ActorSystem.create("oplog-applier", (Config) null, (ClassLoader) null, ExecutionContexts.fromExecutor(this.executorService));
        this.metrics = oplogApplierMetrics;
        this.batchFilter = oplogBatchFilter;
        this.batchChecker = oplogBatchChecker;
        this.offHeapConfig = offHeapBufferConfig;
        shutdowner.addCloseShutdownListener(this);
    }

    @Override // com.torodb.mongodb.repl.oplogreplier.OplogApplier
    public OplogApplier.ApplyingJob apply(OplogFetcher oplogFetcher, ApplierContext applierContext) {
        Pair pair = (Pair) createOplogSource(oplogFetcher).async().via(createOffheapBuffer(this.offHeapConfig)).async().map(this.batchFilter).map(this.batchChecker).via(createBatcherFlow(applierContext)).viaMat(KillSwitches.single(), Keep.right()).async().map(analyzedStreamElement -> {
            Iterator it = analyzedStreamElement.analyzedBatch.iterator();
            while (it.hasNext()) {
                this.batchExecutor.apply((AnalyzedOplogBatch) it.next(), applierContext);
            }
            return analyzedStreamElement;
        }).map(this::metricExecution).toMat(Sink.foreach(this::storeLastAppliedOp), (uniqueKillSwitch, completionStage) -> {
            return new Pair(uniqueKillSwitch, completionStage);
        }).run(ActorMaterializer.create(this.actorSystem));
        UniqueKillSwitch uniqueKillSwitch2 = (UniqueKillSwitch) pair.first();
        return new DefaultApplyingJob(uniqueKillSwitch2, ((CompletionStage) pair.second()).toCompletableFuture().thenApply(done -> {
            return Empty.getInstance();
        }).whenComplete((empty, th) -> {
            oplogFetcher.close();
            if (empty != null) {
                this.logger.trace("Oplog replication stream finished normally");
                return;
            }
            Throwable cause = th instanceof CompletionException ? th.getCause() : th;
            if (cause instanceof CancellationException) {
                this.logger.debug("Oplog replication stream has been cancelled");
                uniqueKillSwitch2.shutdown();
            } else {
                Throwable rootCause = Throwables.getRootCause(cause);
                this.logger.warn("Oplog replication stream finished exceptionally: " + rootCause.getLocalizedMessage(), rootCause);
                uniqueKillSwitch2.shutdown();
            }
        }));
    }

    private Flow<OplogBatch, OplogBatch, NotUsed> createOffheapBuffer(OffHeapBufferConfig offHeapBufferConfig) {
        return OffHeapBufferUtils.createOffheapBuffer(offHeapBufferConfig);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.logger.trace("Waiting until actor system terminates");
        Await.result(this.actorSystem.terminate(), scala.concurrent.duration.Duration.Inf());
        this.logger.trace("Actor system terminated");
        this.executorService.shutdown();
    }

    private Source<OplogBatch, NotUsed> createOplogSource(OplogFetcher oplogFetcher) {
        return Source.unfold(oplogFetcher, oplogFetcher2 -> {
            OplogBatch fetch = oplogFetcher2.fetch();
            return fetch.isLastOne() ? Optional.empty() : Optional.of(new Pair(oplogFetcher2, fetch));
        });
    }

    private Flow<OplogBatch, AnalyzedStreamElement, NotUsed> createBatcherFlow(ApplierContext applierContext) {
        Predicate predicate = oplogBatch -> {
            return !oplogBatch.isReadyForMore();
        };
        ToIntFunction toIntFunction = oplogBatch2 -> {
            return oplogBatch2.count();
        };
        Supplier supplier = () -> {
            return RawStreamElement.INITIAL_ELEMENT;
        };
        BiFunction biFunction = (rawStreamElement, oplogBatch3) -> {
            return rawStreamElement.concat(oplogBatch3);
        };
        BatchAnalyzer createBatchAnalyzer = this.batchAnalyzerFactory.createBatchAnalyzer(applierContext);
        return Flow.of(OplogBatch.class).via(new BatchFlow(this.batchLimits.maxSize, this.batchLimits.maxPeriod, predicate, toIntFunction, supplier, biFunction)).filter(rawStreamElement2 -> {
            return (rawStreamElement2.rawBatch == null || rawStreamElement2.rawBatch.isEmpty()) ? false : true;
        }).map(rawStreamElement3 -> {
            return new AnalyzedStreamElement(rawStreamElement3, createBatchAnalyzer.apply(rawStreamElement3.rawBatch.getOps()));
        });
    }

    private AnalyzedStreamElement storeLastAppliedOp(AnalyzedStreamElement analyzedStreamElement) throws OplogManager.OplogManagerPersistException {
        if (!$assertionsDisabled && analyzedStreamElement.rawBatch.isEmpty()) {
            throw new AssertionError();
        }
        OplogOperation lastOperation = analyzedStreamElement.rawBatch.getLastOperation();
        OplogManager.WriteOplogTransaction createWriteTransaction = this.oplogManager.createWriteTransaction();
        Throwable th = null;
        try {
            try {
                createWriteTransaction.forceNewValue(lastOperation.getHash(), lastOperation.getOpTime());
                if (createWriteTransaction != null) {
                    if (0 != 0) {
                        try {
                            createWriteTransaction.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createWriteTransaction.close();
                    }
                }
                return analyzedStreamElement;
            } finally {
            }
        } catch (Throwable th3) {
            if (createWriteTransaction != null) {
                if (th != null) {
                    try {
                        createWriteTransaction.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createWriteTransaction.close();
                }
            }
            throw th3;
        }
    }

    private AnalyzedStreamElement metricExecution(AnalyzedStreamElement analyzedStreamElement) {
        long currentTimeMillis = System.currentTimeMillis() - analyzedStreamElement.startFetchTimestamp;
        int count = analyzedStreamElement.rawBatch.count();
        this.metrics.getBatchSize().update(count);
        this.metrics.getApplied().mark(count);
        metricOpsExecutionDelay(count, currentTimeMillis);
        return analyzedStreamElement;
    }

    private void metricOpsExecutionDelay(int i, long j) {
        if (i < 1) {
            return;
        }
        if (j <= 0) {
            this.logger.debug("Unexpected time execution: {}" + j);
        }
        this.metrics.getMaxDelay().update(j);
        this.metrics.getApplicationCost().update((1000 * j) / i);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1866375331:
                if (implMethodName.equals("lambda$createBatcherFlow$e5605256$1")) {
                    z = 5;
                    break;
                }
                break;
            case -1610179768:
                if (implMethodName.equals("metricExecution")) {
                    z = 6;
                    break;
                }
                break;
            case -1009312544:
                if (implMethodName.equals("lambda$apply$e6a147fb$1")) {
                    z = 3;
                    break;
                }
                break;
            case -182091888:
                if (implMethodName.equals("lambda$apply$8484ac0f$1")) {
                    z = 2;
                    break;
                }
                break;
            case 260830698:
                if (implMethodName.equals("lambda$createBatcherFlow$8aa5a12$1")) {
                    z = true;
                    break;
                }
                break;
            case 286233063:
                if (implMethodName.equals("storeLastAppliedOp")) {
                    z = 4;
                    break;
                }
                break;
            case 487373949:
                if (implMethodName.equals("lambda$createOplogSource$4f78d53$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/torodb/mongodb/repl/oplogreplier/DefaultOplogApplier") && serializedLambda.getImplMethodSignature().equals("(Lcom/torodb/mongodb/repl/oplogreplier/fetcher/OplogFetcher;)Ljava/util/Optional;")) {
                    return oplogFetcher2 -> {
                        OplogBatch fetch = oplogFetcher2.fetch();
                        return fetch.isLastOne() ? Optional.empty() : Optional.of(new Pair(oplogFetcher2, fetch));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Predicate") && serializedLambda.getFunctionalInterfaceMethodName().equals("test") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("com/torodb/mongodb/repl/oplogreplier/DefaultOplogApplier") && serializedLambda.getImplMethodSignature().equals("(Lcom/torodb/mongodb/repl/oplogreplier/DefaultOplogApplier$RawStreamElement;)Z")) {
                    return rawStreamElement2 -> {
                        return (rawStreamElement2.rawBatch == null || rawStreamElement2.rawBatch.isEmpty()) ? false : true;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function2") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/torodb/mongodb/repl/oplogreplier/DefaultOplogApplier") && serializedLambda.getImplMethodSignature().equals("(Lakka/stream/UniqueKillSwitch;Ljava/util/concurrent/CompletionStage;)Lakka/japi/Pair;")) {
                    return (uniqueKillSwitch, completionStage) -> {
                        return new Pair(uniqueKillSwitch, completionStage);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/torodb/mongodb/repl/oplogreplier/DefaultOplogApplier") && serializedLambda.getImplMethodSignature().equals("(Lcom/torodb/mongodb/repl/oplogreplier/ApplierContext;Lcom/torodb/mongodb/repl/oplogreplier/DefaultOplogApplier$AnalyzedStreamElement;)Lcom/torodb/mongodb/repl/oplogreplier/DefaultOplogApplier$AnalyzedStreamElement;")) {
                    DefaultOplogApplier defaultOplogApplier = (DefaultOplogApplier) serializedLambda.getCapturedArg(0);
                    ApplierContext applierContext = (ApplierContext) serializedLambda.getCapturedArg(1);
                    return analyzedStreamElement -> {
                        Iterator it = analyzedStreamElement.analyzedBatch.iterator();
                        while (it.hasNext()) {
                            this.batchExecutor.apply((AnalyzedOplogBatch) it.next(), applierContext);
                        }
                        return analyzedStreamElement;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Procedure") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/torodb/mongodb/repl/oplogreplier/DefaultOplogApplier") && serializedLambda.getImplMethodSignature().equals("(Lcom/torodb/mongodb/repl/oplogreplier/DefaultOplogApplier$AnalyzedStreamElement;)Lcom/torodb/mongodb/repl/oplogreplier/DefaultOplogApplier$AnalyzedStreamElement;")) {
                    DefaultOplogApplier defaultOplogApplier2 = (DefaultOplogApplier) serializedLambda.getCapturedArg(0);
                    return defaultOplogApplier2::storeLastAppliedOp;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/torodb/mongodb/repl/oplogreplier/DefaultOplogApplier") && serializedLambda.getImplMethodSignature().equals("(Lcom/torodb/mongodb/repl/oplogreplier/batch/BatchAnalyzer;Lcom/torodb/mongodb/repl/oplogreplier/DefaultOplogApplier$RawStreamElement;)Lcom/torodb/mongodb/repl/oplogreplier/DefaultOplogApplier$AnalyzedStreamElement;")) {
                    BatchAnalyzer batchAnalyzer = (BatchAnalyzer) serializedLambda.getCapturedArg(0);
                    return rawStreamElement3 -> {
                        return new AnalyzedStreamElement(rawStreamElement3, batchAnalyzer.apply(rawStreamElement3.rawBatch.getOps()));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/torodb/mongodb/repl/oplogreplier/DefaultOplogApplier") && serializedLambda.getImplMethodSignature().equals("(Lcom/torodb/mongodb/repl/oplogreplier/DefaultOplogApplier$AnalyzedStreamElement;)Lcom/torodb/mongodb/repl/oplogreplier/DefaultOplogApplier$AnalyzedStreamElement;")) {
                    DefaultOplogApplier defaultOplogApplier3 = (DefaultOplogApplier) serializedLambda.getCapturedArg(0);
                    return defaultOplogApplier3::metricExecution;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static {
        $assertionsDisabled = !DefaultOplogApplier.class.desiredAssertionStatus();
    }
}
