package com.torodb.mongodb.repl.oplogreplier.batch;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.google.common.base.Supplier;
import com.torodb.common.util.Empty;
import com.torodb.core.concurrent.ConcurrentToolsFactory;
import com.torodb.core.concurrent.StreamExecutor;
import com.torodb.core.exceptions.user.UserException;
import com.torodb.core.logging.LoggerFactory;
import com.torodb.core.metrics.ToroMetricRegistry;
import com.torodb.core.retrier.Retrier;
import com.torodb.core.transaction.RollbackException;
import com.torodb.mongodb.core.MongodConnection;
import com.torodb.mongodb.core.MongodServer;
import com.torodb.mongodb.repl.OplogManager;
import com.torodb.mongodb.repl.oplogreplier.ApplierContext;
import com.torodb.mongodb.repl.oplogreplier.OplogOperationApplier;
import com.torodb.mongodb.repl.oplogreplier.analyzed.AnalyzedOp;
import com.torodb.mongodb.repl.oplogreplier.batch.AnalyzedOplogBatchExecutor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnegative;
import javax.inject.Inject;

/* loaded from: input_file:com/torodb/mongodb/repl/oplogreplier/batch/ConcurrentOplogBatchExecutor.class */
public class ConcurrentOplogBatchExecutor extends SimpleAnalyzedOplogBatchExecutor {
    private final StreamExecutor streamExecutor;
    private final ConcurrentOplogBatchExecutorMetrics concurrentMetrics;
    private final SubBatchHeuristic subBatchHeuristic;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:com/torodb/mongodb/repl/oplogreplier/batch/ConcurrentOplogBatchExecutor$ConcurrentOplogBatchExecutorMetrics.class */
    public static class ConcurrentOplogBatchExecutorMetrics extends AnalyzedOplogBatchExecutor.AnalyzedOplogBatchExecutorMetrics {
        private final Meter subBatchSizeMeter;
        private final Histogram subBatchSizeHistogram;

        @Inject
        public ConcurrentOplogBatchExecutorMetrics(ToroMetricRegistry toroMetricRegistry) {
            super(toroMetricRegistry);
            this.subBatchSizeMeter = getRegistry().meter("subBatchSizeMeter");
            this.subBatchSizeHistogram = getRegistry().histogram("subBatchSizeHistogram");
        }

        public Meter getSubBatchSizeMeter() {
            return this.subBatchSizeMeter;
        }

        public Histogram getSubBatchSizeHistogram() {
            return this.subBatchSizeHistogram;
        }
    }

    /* loaded from: input_file:com/torodb/mongodb/repl/oplogreplier/batch/ConcurrentOplogBatchExecutor$SubBatchHeuristic.class */
    public interface SubBatchHeuristic {
        @Nonnegative
        int getSubBatchSize(ConcurrentOplogBatchExecutorMetrics concurrentOplogBatchExecutorMetrics);
    }

    @Inject
    public ConcurrentOplogBatchExecutor(OplogOperationApplier oplogOperationApplier, MongodServer mongodServer, Retrier retrier, ConcurrentToolsFactory concurrentToolsFactory, NamespaceJobExecutor namespaceJobExecutor, LoggerFactory loggerFactory, ConcurrentOplogBatchExecutorMetrics concurrentOplogBatchExecutorMetrics, SubBatchHeuristic subBatchHeuristic) {
        super(concurrentOplogBatchExecutorMetrics, oplogOperationApplier, mongodServer, retrier, namespaceJobExecutor);
        this.streamExecutor = concurrentToolsFactory.createStreamExecutor(loggerFactory.apply(getClass()), "concurrent-oplog-batch-executor", true);
        this.concurrentMetrics = concurrentOplogBatchExecutorMetrics;
        this.subBatchHeuristic = subBatchHeuristic;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.torodb.mongodb.repl.oplogreplier.batch.SimpleAnalyzedOplogBatchExecutor
    public void doStart() {
        this.streamExecutor.startAsync();
        this.streamExecutor.awaitRunning();
        super.doStart();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.torodb.mongodb.repl.oplogreplier.batch.SimpleAnalyzedOplogBatchExecutor
    public void doStop() {
        this.streamExecutor.stopAsync();
        this.streamExecutor.awaitTerminated();
        super.doStop();
    }

    @Override // com.torodb.mongodb.repl.oplogreplier.batch.SimpleAnalyzedOplogBatchExecutor, com.torodb.mongodb.repl.oplogreplier.batch.AnalyzedOplogBatchExecutor
    public void execute(CudAnalyzedOplogBatch cudAnalyzedOplogBatch, ApplierContext applierContext) throws UserException {
        if (!$assertionsDisabled && !isRunning()) {
            throw new AssertionError("The service is on state " + state() + " instead of RUNNING");
        }
        List list = (List) cudAnalyzedOplogBatch.streamNamespaceJobs().flatMap(this::split).collect(Collectors.toList());
        this.concurrentMetrics.getSubBatchSizeMeter().mark(list.size());
        this.concurrentMetrics.getSubBatchSizeHistogram().update(list.size());
        try {
            this.streamExecutor.execute(list.stream().map(namespaceJob -> {
                return () -> {
                    execute(namespaceJob, applierContext);
                    return Empty.getInstance();
                };
            })).join();
        } catch (CompletionException e) {
            UserException cause = e.getCause();
            if (cause instanceof UserException) {
                throw cause;
            }
            if (!(cause instanceof RollbackException)) {
                throw e;
            }
            throw ((RollbackException) cause);
        }
    }

    private void execute(NamespaceJob namespaceJob, ApplierContext applierContext) throws OplogManager.OplogManagerPersistException, UserException, NamespaceJobExecutionException {
        if (!$assertionsDisabled && !isRunning()) {
            throw new AssertionError("The service is not running");
        }
        MongodConnection openConnection = getServer().openConnection();
        Throwable th = null;
        try {
            try {
                execute(namespaceJob, applierContext, openConnection);
                if (openConnection != null) {
                    if (0 == 0) {
                        openConnection.close();
                        return;
                    }
                    try {
                        openConnection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (openConnection != null) {
                if (th != null) {
                    try {
                        openConnection.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    openConnection.close();
                }
            }
            throw th4;
        }
    }

    private Stream<NamespaceJob> split(NamespaceJob namespaceJob) {
        Collection<AnalyzedOp> jobs = namespaceJob.getJobs();
        int subBatchSize = this.subBatchHeuristic.getSubBatchSize(this.concurrentMetrics);
        if (!$assertionsDisabled && subBatchSize <= 0) {
            throw new AssertionError("Sub batch size must be positive");
        }
        Supplier supplier = () -> {
            return new ArrayList(subBatchSize);
        };
        ArrayList arrayList = new ArrayList(1 + (jobs.size() / subBatchSize));
        List list = null;
        for (AnalyzedOp analyzedOp : jobs) {
            if (list == null) {
                list = (List) supplier.get();
            }
            list.add(analyzedOp);
            if (list.size() >= subBatchSize) {
                arrayList.add(new NamespaceJob(namespaceJob.getDatabase(), namespaceJob.getCollection(), list));
                list = (List) supplier.get();
            }
            if (!$assertionsDisabled && list.size() > subBatchSize) {
                throw new AssertionError("Created a subatch whose size is " + list.size() + " but heuristic says max subatch size is " + subBatchSize);
            }
        }
        if (list != null) {
            arrayList.add(new NamespaceJob(namespaceJob.getDatabase(), namespaceJob.getCollection(), list));
        }
        return arrayList.stream();
    }

    static {
        $assertionsDisabled = !ConcurrentOplogBatchExecutor.class.desiredAssertionStatus();
    }
}
