package com.torodb.mongodb.repl.topology;

import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.torodb.common.util.ListeningFutureToCompletableFuture;
import com.torodb.core.concurrent.ConcurrentToolsFactory;
import com.torodb.core.logging.LoggerFactory;
import com.torodb.mongodb.commands.pojos.ReplicaSetConfig;
import java.time.Duration;
import java.util.Collections;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntSupplier;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.logging.log4j.Logger;

@ThreadSafe
/* loaded from: input_file:com/torodb/mongodb/repl/topology/TopologyExecutor.class */
public class TopologyExecutor {
    private final Logger logger;
    private final TopologyCoordinator coord;
    private final ListeningScheduledExecutorService executor;
    private final OnAnyVersion onAnyVersion;
    private final OnCurrentVersion onCurrentVersion;
    private final Set<VersionChangeListener> versionListeners = Collections.newSetFromMap(new WeakHashMap());
    private volatile int version = -1;
    private final VersionChangeListener versionChangeListener = this::onVersionChange;

    /* loaded from: input_file:com/torodb/mongodb/repl/topology/TopologyExecutor$OnAnyVersion.class */
    private static class OnAnyVersion implements VersionExecutor {
        private final ListeningScheduledExecutorService executor;
        private final TopologyCoordinator coord;

        public OnAnyVersion(ListeningScheduledExecutorService listeningScheduledExecutorService, TopologyCoordinator topologyCoordinator) {
            this.executor = listeningScheduledExecutorService;
            this.coord = topologyCoordinator;
        }

        @Override // com.torodb.mongodb.repl.topology.TopologyExecutor.VersionExecutor
        public CompletableFuture<?> consumeAsync(Consumer<TopologyCoordinator> consumer) {
            return CompletableFuture.runAsync(() -> {
                consumer.accept(this.coord);
            }, this.executor);
        }

        @Override // com.torodb.mongodb.repl.topology.TopologyExecutor.VersionExecutor
        public <R> CompletableFuture<R> mapAsync(Function<TopologyCoordinator, R> function) {
            return CompletableFuture.supplyAsync(() -> {
                return function.apply(this.coord);
            }, this.executor);
        }

        @Override // com.torodb.mongodb.repl.topology.TopologyExecutor.VersionExecutor
        public CompletableFuture<?> scheduleOnce(Consumer<TopologyCoordinator> consumer, Duration duration) {
            return ListeningFutureToCompletableFuture.toCompletableFuture(this.executor.schedule(() -> {
                consumer.accept(this.coord);
            }, duration.toMillis(), TimeUnit.MILLISECONDS));
        }

        @Override // com.torodb.mongodb.repl.topology.TopologyExecutor.VersionExecutor
        public <E> CompletableFuture<?> andThenAcceptAsync(CompletionStage<E> completionStage, BiConsumer<TopologyCoordinator, E> biConsumer) {
            return completionStage.thenAcceptAsync(obj -> {
                biConsumer.accept(this.coord, obj);
            }, this.executor).toCompletableFuture();
        }

        @Override // com.torodb.mongodb.repl.topology.TopologyExecutor.VersionExecutor
        public <E, U> CompletableFuture<U> andThenApplyAsync(CompletionStage<E> completionStage, BiFunction<TopologyCoordinator, E, U> biFunction) {
            return completionStage.thenApplyAsync(obj -> {
                return biFunction.apply(this.coord, obj);
            }, this.executor).toCompletableFuture();
        }
    }

    /* loaded from: input_file:com/torodb/mongodb/repl/topology/TopologyExecutor$OnCurrentVersion.class */
    private static class OnCurrentVersion implements VersionExecutor {
        private final ListeningScheduledExecutorService executor;
        private final TopologyCoordinator coord;
        private final IntSupplier versionSupplier;

        public OnCurrentVersion(ListeningScheduledExecutorService listeningScheduledExecutorService, TopologyCoordinator topologyCoordinator, IntSupplier intSupplier) {
            this.executor = listeningScheduledExecutorService;
            this.coord = topologyCoordinator;
            this.versionSupplier = intSupplier;
        }

        private void checkVersion(int i) {
            int asInt = this.versionSupplier.getAsInt();
            if (i != asInt) {
                throw new CancellationException("Replication configuration changed from " + i + " to " + asInt + " since the task was scheduled");
            }
        }

        @Override // com.torodb.mongodb.repl.topology.TopologyExecutor.VersionExecutor
        public CompletableFuture<?> consumeAsync(Consumer<TopologyCoordinator> consumer) {
            int asInt = this.versionSupplier.getAsInt();
            return CompletableFuture.runAsync(() -> {
                checkVersion(asInt);
                consumer.accept(this.coord);
            }, this.executor);
        }

        @Override // com.torodb.mongodb.repl.topology.TopologyExecutor.VersionExecutor
        public <R> CompletableFuture<R> mapAsync(Function<TopologyCoordinator, R> function) {
            int asInt = this.versionSupplier.getAsInt();
            return CompletableFuture.supplyAsync(() -> {
                checkVersion(asInt);
                return function.apply(this.coord);
            }, this.executor);
        }

        @Override // com.torodb.mongodb.repl.topology.TopologyExecutor.VersionExecutor
        public CompletableFuture<?> scheduleOnce(Consumer<TopologyCoordinator> consumer, Duration duration) {
            int asInt = this.versionSupplier.getAsInt();
            return ListeningFutureToCompletableFuture.toCompletableFuture(this.executor.schedule(() -> {
                checkVersion(asInt);
                consumer.accept(this.coord);
            }, duration.toMillis(), TimeUnit.MILLISECONDS));
        }

        @Override // com.torodb.mongodb.repl.topology.TopologyExecutor.VersionExecutor
        public <E> CompletableFuture<?> andThenAcceptAsync(CompletionStage<E> completionStage, BiConsumer<TopologyCoordinator, E> biConsumer) {
            int asInt = this.versionSupplier.getAsInt();
            return completionStage.thenAcceptAsync(obj -> {
                checkVersion(asInt);
                biConsumer.accept(this.coord, obj);
            }, this.executor).toCompletableFuture();
        }

        @Override // com.torodb.mongodb.repl.topology.TopologyExecutor.VersionExecutor
        public <E, U> CompletableFuture<U> andThenApplyAsync(CompletionStage<E> completionStage, BiFunction<TopologyCoordinator, E, U> biFunction) {
            int asInt = this.versionSupplier.getAsInt();
            return completionStage.thenApplyAsync(obj -> {
                checkVersion(asInt);
                return biFunction.apply(this.coord, obj);
            }, this.executor).toCompletableFuture();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/torodb/mongodb/repl/topology/TopologyExecutor$VersionExecutor.class */
    public interface VersionExecutor {
        CompletableFuture<?> consumeAsync(Consumer<TopologyCoordinator> consumer);

        <R> CompletableFuture<R> mapAsync(Function<TopologyCoordinator, R> function);

        CompletableFuture<?> scheduleOnce(Consumer<TopologyCoordinator> consumer, Duration duration);

        <E> CompletableFuture<?> andThenAcceptAsync(CompletionStage<E> completionStage, BiConsumer<TopologyCoordinator, E> biConsumer);

        <E, U> CompletableFuture<U> andThenApplyAsync(CompletionStage<E> completionStage, BiFunction<TopologyCoordinator, E, U> biFunction);
    }

    public TopologyExecutor(ConcurrentToolsFactory concurrentToolsFactory, LoggerFactory loggerFactory, Duration duration, Duration duration2) {
        this.logger = loggerFactory.apply(getClass());
        this.executor = MoreExecutors.listeningDecorator(concurrentToolsFactory.createScheduledExecutorServiceWithMaxThreads("topology-executor", 1));
        this.coord = new TopologyCoordinator(duration, duration2, loggerFactory);
        this.coord.addVersionChangeListener(this.versionChangeListener);
        this.onAnyVersion = new OnAnyVersion(this.executor, this.coord);
        this.onCurrentVersion = new OnCurrentVersion(this.executor, this.coord, this::getVersion);
    }

    private int getVersion() {
        return this.version;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addVersionChangeListener(VersionChangeListener versionChangeListener) {
        this.versionListeners.add(versionChangeListener);
    }

    @GuardedBy("executor")
    private void onVersionChange(TopologyCoordinator topologyCoordinator, ReplicaSetConfig replicaSetConfig) {
        this.logger.debug("Changing version from {} to {}", Integer.valueOf(this.version), Integer.valueOf(topologyCoordinator.getRsConfig().getConfigVersion()));
        this.version = topologyCoordinator.getRsConfig().getConfigVersion();
        this.versionListeners.forEach(versionChangeListener -> {
            versionChangeListener.onVersionChange(topologyCoordinator, replicaSetConfig);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public VersionExecutor onAnyVersion() {
        return this.onAnyVersion;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public VersionExecutor onCurrentVersion() {
        return this.onCurrentVersion;
    }
}
