package io.mantisrx.master.resourcecluster;

import akka.actor.ActorRef;
import akka.pattern.Patterns;
import com.netflix.spectator.api.Tag;
import com.spotify.futures.CompletableFutures;
import io.mantisrx.common.Ack;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.server.master.resourcecluster.RequestThrottledException;
import io.mantisrx.server.master.resourcecluster.ResourceClusterGateway;
import io.mantisrx.server.master.resourcecluster.TaskExecutorDisconnection;
import io.mantisrx.server.master.resourcecluster.TaskExecutorHeartbeat;
import io.mantisrx.server.master.resourcecluster.TaskExecutorRegistration;
import io.mantisrx.server.master.resourcecluster.TaskExecutorStatusChange;
import io.mantisrx.shaded.com.google.common.util.concurrent.RateLimiter;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/mantisrx/master/resourcecluster/ResourceClusterGatewayAkkaImpl.class */
class ResourceClusterGatewayAkkaImpl implements ResourceClusterGateway {
    private static final Logger log = LoggerFactory.getLogger(ResourceClusterGatewayAkkaImpl.class);
    protected final ActorRef resourceClusterManagerActor;
    protected final Duration askTimeout;
    private final Counter registrationCounter;
    private final Counter heartbeatCounter;
    private final Counter disconnectionCounter;
    private final Counter throttledCounter;
    private final RateLimiter rateLimiter;
    private final ScheduledExecutorService semaphoreResetScheduler = Executors.newScheduledThreadPool(1);

    /* JADX INFO: Access modifiers changed from: package-private */
    public ResourceClusterGatewayAkkaImpl(ActorRef actorRef, Duration duration, Supplier<Integer> supplier) {
        this.resourceClusterManagerActor = actorRef;
        this.askTimeout = duration;
        log.info("Setting maxConcurrentRequestCount for resourceCluster gateway {}", supplier);
        this.rateLimiter = RateLimiter.create(supplier.get().intValue());
        this.semaphoreResetScheduler.scheduleAtFixedRate(() -> {
            int intValue = ((Integer) supplier.get()).intValue();
            log.info("Setting the rate limiter rate to {}", Integer.valueOf(intValue));
            this.rateLimiter.setRate(intValue);
        }, 1L, 1L, TimeUnit.MINUTES);
        Metrics registerAndGet = MetricsRegistry.getInstance().registerAndGet(new Metrics.Builder().id("ResourceClusterGatewayAkkaImpl", new Tag[0]).addCounter("registrationCounter").addCounter("heartbeatCounter").addCounter("disconnectionCounter").addCounter("throttledCounter").build());
        this.registrationCounter = registerAndGet.getCounter("registrationCounter");
        this.heartbeatCounter = registerAndGet.getCounter("heartbeatCounter");
        this.disconnectionCounter = registerAndGet.getCounter("disconnectionCounter");
        this.throttledCounter = registerAndGet.getCounter("throttledCounter");
    }

    private <In, Out> Function<In, CompletableFuture<Out>> withThrottle(Function<In, CompletableFuture<Out>> function) {
        return obj -> {
            if (this.rateLimiter.tryAcquire()) {
                return (CompletableFuture) function.apply(obj);
            }
            this.throttledCounter.increment();
            return CompletableFutures.exceptionallyCompletedFuture(new RequestThrottledException("Throttled req: " + obj.getClass().getSimpleName()));
        };
    }

    public CompletableFuture<Ack> registerTaskExecutor(TaskExecutorRegistration taskExecutorRegistration) {
        return (CompletableFuture) withThrottle(this::registerTaskExecutorImpl).apply(taskExecutorRegistration);
    }

    private CompletableFuture<Ack> registerTaskExecutorImpl(TaskExecutorRegistration taskExecutorRegistration) {
        this.registrationCounter.increment();
        CompletionStage ask = Patterns.ask(this.resourceClusterManagerActor, taskExecutorRegistration, this.askTimeout);
        Class<Ack> cls = Ack.class;
        Ack.class.getClass();
        return ask.thenApply(cls::cast).toCompletableFuture();
    }

    public CompletableFuture<Ack> heartBeatFromTaskExecutor(TaskExecutorHeartbeat taskExecutorHeartbeat) {
        return (CompletableFuture) withThrottle(this::heartBeatFromTaskExecutorImpl).apply(taskExecutorHeartbeat);
    }

    private CompletableFuture<Ack> heartBeatFromTaskExecutorImpl(TaskExecutorHeartbeat taskExecutorHeartbeat) {
        this.heartbeatCounter.increment();
        CompletionStage ask = Patterns.ask(this.resourceClusterManagerActor, taskExecutorHeartbeat, this.askTimeout);
        Class<Ack> cls = Ack.class;
        Ack.class.getClass();
        return ask.thenApply(cls::cast).toCompletableFuture();
    }

    public CompletableFuture<Ack> notifyTaskExecutorStatusChange(TaskExecutorStatusChange taskExecutorStatusChange) {
        CompletionStage ask = Patterns.ask(this.resourceClusterManagerActor, taskExecutorStatusChange, this.askTimeout);
        Class<Ack> cls = Ack.class;
        Ack.class.getClass();
        return ask.thenApply(cls::cast).toCompletableFuture();
    }

    public CompletableFuture<Ack> disconnectTaskExecutor(TaskExecutorDisconnection taskExecutorDisconnection) {
        this.disconnectionCounter.increment();
        return (CompletableFuture) withThrottle(this::disconnectTaskExecutorImpl).apply(taskExecutorDisconnection);
    }

    CompletableFuture<Ack> disconnectTaskExecutorImpl(TaskExecutorDisconnection taskExecutorDisconnection) {
        CompletionStage ask = Patterns.ask(this.resourceClusterManagerActor, taskExecutorDisconnection, this.askTimeout);
        Class<Ack> cls = Ack.class;
        Ack.class.getClass();
        return ask.thenApply(cls::cast).toCompletableFuture();
    }
}
