package io.mantisrx.master.api.akka.route.handlers;

import akka.actor.ActorRef;
import akka.pattern.PatternsCS;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.netflix.spectator.api.Tag;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.master.api.akka.route.proto.JobClusterInfo;
import io.mantisrx.master.api.akka.route.proto.JobDiscoveryRouteProto;
import io.mantisrx.master.api.akka.route.utils.JobDiscoveryHeartbeats;
import io.mantisrx.master.jobcluster.proto.BaseResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto;
import io.mantisrx.master.jobcluster.proto.JobClusterScalerRuleProto;
import io.mantisrx.server.core.JobScalerRuleInfo;
import io.mantisrx.server.core.JobSchedulingInfo;
import io.mantisrx.server.master.config.ConfigurationProvider;
import io.mantisrx.server.master.domain.JobId;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.subjects.BehaviorSubject;

/* loaded from: input_file:io/mantisrx/master/api/akka/route/handlers/JobDiscoveryRouteHandlerAkkaImpl.class */
public class JobDiscoveryRouteHandlerAkkaImpl implements JobDiscoveryRouteHandler {
    private static final Logger logger = LoggerFactory.getLogger(JobDiscoveryRouteHandlerAkkaImpl.class);
    private final ActorRef jobClustersManagerActor;
    private final Duration serverIdleConnectionTimeout;
    private final Counter schedInfoStreamErrors;
    private final Counter lastSubmittedJobIdStreamErrors;
    private final Counter jobScalerRuleInfoStreamErrors;
    private final Duration askTimeout = Duration.ofMillis(((Long) Optional.ofNullable(Long.valueOf(ConfigurationProvider.getConfig().getMasterApiAskTimeoutMs())).orElse(1000L)).longValue());
    private final AsyncLoadingCache<JobClusterManagerProto.GetJobSchedInfoRequest, JobClusterManagerProto.GetJobSchedInfoResponse> schedInfoCache = Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.SECONDS).maximumSize(500).buildAsync(this::jobSchedInfo);
    private final AsyncLoadingCache<JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest, JobClusterManagerProto.GetLastSubmittedJobIdStreamResponse> lastSubmittedJobIdStreamRespCache = Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.SECONDS).maximumSize(500).buildAsync(this::lastSubmittedJobId);
    private final AsyncLoadingCache<JobClusterScalerRuleProto.GetJobScalerRuleStreamRequest, JobClusterScalerRuleProto.GetJobScalerRuleStreamSubjectResponse> jobScalerRuleStreamRespCache = Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.SECONDS).maximumSize(500).buildAsync(this::jobScalerRuleSubject);

    public JobDiscoveryRouteHandlerAkkaImpl(ActorRef actorRef, Duration duration) {
        this.jobClustersManagerActor = actorRef;
        this.serverIdleConnectionTimeout = duration;
        Metrics build = new Metrics.Builder().id("JobDiscoveryRouteHandlerAkkaImpl", new Tag[0]).addCounter("schedInfoStreamErrors").addCounter("lastSubmittedJobIdStreamErrors").addCounter("jobScalerRuleInfoStreamErrors").build();
        this.schedInfoStreamErrors = build.getCounter("schedInfoStreamErrors");
        this.lastSubmittedJobIdStreamErrors = build.getCounter("lastSubmittedJobIdStreamErrors");
        this.jobScalerRuleInfoStreamErrors = build.getCounter("jobScalerRuleInfoStreamErrors");
    }

    private CompletableFuture<JobClusterManagerProto.GetJobSchedInfoResponse> jobSchedInfo(JobClusterManagerProto.GetJobSchedInfoRequest getJobSchedInfoRequest, Executor executor) {
        CompletionStage ask = PatternsCS.ask(this.jobClustersManagerActor, getJobSchedInfoRequest, this.askTimeout);
        Class<JobClusterManagerProto.GetJobSchedInfoResponse> cls = JobClusterManagerProto.GetJobSchedInfoResponse.class;
        JobClusterManagerProto.GetJobSchedInfoResponse.class.getClass();
        return ask.thenApply(cls::cast).toCompletableFuture();
    }

    @Override // io.mantisrx.master.api.akka.route.handlers.JobDiscoveryRouteHandler
    public CompletionStage<JobDiscoveryRouteProto.SchedInfoResponse> schedulingInfoStream(JobClusterManagerProto.GetJobSchedInfoRequest getJobSchedInfoRequest, boolean z) {
        CompletableFuture completableFuture = this.schedInfoCache.get(getJobSchedInfoRequest);
        try {
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            JobSchedulingInfo jobSchedulingInfo = new JobSchedulingInfo(getJobSchedInfoRequest.getJobId().getId(), new HashMap());
            return completableFuture.thenApply(getJobSchedInfoResponse -> {
                Optional<BehaviorSubject<JobSchedulingInfo>> jobSchedInfoSubject = getJobSchedInfoResponse.getJobSchedInfoSubject();
                if (!getJobSchedInfoResponse.responseCode.equals(BaseResponse.ResponseCode.SUCCESS) || !jobSchedInfoSubject.isPresent()) {
                    logger.info("Failed to get Sched info stream for {}", getJobSchedInfoRequest.getJobId().getId());
                    this.schedInfoStreamErrors.increment();
                    return new JobDiscoveryRouteProto.SchedInfoResponse(getJobSchedInfoResponse.requestId, getJobSchedInfoResponse.responseCode, getJobSchedInfoResponse.message);
                }
                BehaviorSubject<JobSchedulingInfo> behaviorSubject = jobSchedInfoSubject.get();
                return new JobDiscoveryRouteProto.SchedInfoResponse(getJobSchedInfoResponse.requestId, getJobSchedInfoResponse.responseCode, getJobSchedInfoResponse.message, Observable.merge(behaviorSubject.doOnCompleted(() -> {
                    atomicBoolean.set(true);
                }), Observable.interval(5L, this.serverIdleConnectionTimeout.getSeconds() - 1, TimeUnit.SECONDS).map(l -> {
                    return !atomicBoolean.get() ? JobDiscoveryHeartbeats.SCHED_INFO_HB_INSTANCE : jobSchedulingInfo;
                }).takeWhile(jobSchedulingInfo2 -> {
                    return Boolean.valueOf(z);
                })));
            });
        } catch (Exception e) {
            logger.error("caught exception fetching sched info stream for {}", getJobSchedInfoRequest.getJobId().getId(), e);
            this.schedInfoStreamErrors.increment();
            return CompletableFuture.completedFuture(new JobDiscoveryRouteProto.SchedInfoResponse(0L, BaseResponse.ResponseCode.SERVER_ERROR, "Failed to get SchedulingInfo stream for jobId " + getJobSchedInfoRequest.getJobId().getId() + " error: " + e.getMessage()));
        }
    }

    private CompletableFuture<JobClusterManagerProto.GetLastSubmittedJobIdStreamResponse> lastSubmittedJobId(JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest getLastSubmittedJobIdStreamRequest, Executor executor) {
        CompletionStage ask = PatternsCS.ask(this.jobClustersManagerActor, getLastSubmittedJobIdStreamRequest, this.askTimeout);
        Class<JobClusterManagerProto.GetLastSubmittedJobIdStreamResponse> cls = JobClusterManagerProto.GetLastSubmittedJobIdStreamResponse.class;
        JobClusterManagerProto.GetLastSubmittedJobIdStreamResponse.class.getClass();
        return ask.thenApply(cls::cast).toCompletableFuture();
    }

    private CompletableFuture<JobClusterScalerRuleProto.GetJobScalerRuleStreamSubjectResponse> jobScalerRuleSubject(JobClusterScalerRuleProto.GetJobScalerRuleStreamRequest getJobScalerRuleStreamRequest, Executor executor) {
        CompletionStage ask = PatternsCS.ask(this.jobClustersManagerActor, getJobScalerRuleStreamRequest, this.askTimeout);
        Class<JobClusterScalerRuleProto.GetJobScalerRuleStreamSubjectResponse> cls = JobClusterScalerRuleProto.GetJobScalerRuleStreamSubjectResponse.class;
        JobClusterScalerRuleProto.GetJobScalerRuleStreamSubjectResponse.class.getClass();
        return ask.thenApply(cls::cast).toCompletableFuture();
    }

    @Override // io.mantisrx.master.api.akka.route.handlers.JobDiscoveryRouteHandler
    public CompletionStage<JobDiscoveryRouteProto.JobClusterInfoResponse> lastSubmittedJobIdStream(JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest getLastSubmittedJobIdStreamRequest, boolean z) {
        try {
            return this.lastSubmittedJobIdStreamRespCache.get(getLastSubmittedJobIdStreamRequest).thenApply(getLastSubmittedJobIdStreamResponse -> {
                Optional<BehaviorSubject<JobId>> optional = getLastSubmittedJobIdStreamResponse.getjobIdBehaviorSubject();
                if (getLastSubmittedJobIdStreamResponse.responseCode.equals(BaseResponse.ResponseCode.SUCCESS) && optional.isPresent()) {
                    return new JobDiscoveryRouteProto.JobClusterInfoResponse(getLastSubmittedJobIdStreamResponse.requestId, getLastSubmittedJobIdStreamResponse.responseCode, getLastSubmittedJobIdStreamResponse.message, Observable.merge(optional.get().map(jobId -> {
                        return new JobClusterInfo(jobId.getCluster(), jobId.getId());
                    }), Observable.interval(5L, this.serverIdleConnectionTimeout.getSeconds() - 1, TimeUnit.SECONDS).map(l -> {
                        return JobDiscoveryHeartbeats.JOB_CLUSTER_INFO_HB_INSTANCE;
                    }).takeWhile(jobClusterInfo -> {
                        return Boolean.valueOf(z);
                    })));
                }
                logger.info("Failed to get lastSubmittedJobId stream for job cluster {}", getLastSubmittedJobIdStreamRequest.getClusterName());
                this.lastSubmittedJobIdStreamErrors.increment();
                return new JobDiscoveryRouteProto.JobClusterInfoResponse(getLastSubmittedJobIdStreamResponse.requestId, getLastSubmittedJobIdStreamResponse.responseCode, getLastSubmittedJobIdStreamResponse.message);
            });
        } catch (Exception e) {
            logger.error("caught exception fetching lastSubmittedJobId stream for {}", getLastSubmittedJobIdStreamRequest.getClusterName(), e);
            this.lastSubmittedJobIdStreamErrors.increment();
            return CompletableFuture.completedFuture(new JobDiscoveryRouteProto.JobClusterInfoResponse(0L, BaseResponse.ResponseCode.SERVER_ERROR, "Failed to get last submitted jobId stream for " + getLastSubmittedJobIdStreamRequest.getClusterName() + " error: " + e.getMessage()));
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.mantisrx.master.api.akka.route.handlers.JobDiscoveryRouteHandler
    public CompletionStage<JobClusterScalerRuleProto.GetJobScalerRuleStreamResponse> jobScalerRuleStream(JobClusterScalerRuleProto.GetJobScalerRuleStreamRequest getJobScalerRuleStreamRequest, boolean z) {
        CompletableFuture completableFuture = this.jobScalerRuleStreamRespCache.get(getJobScalerRuleStreamRequest);
        try {
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            JobScalerRuleInfo jobScalerRuleInfo = new JobScalerRuleInfo(getJobScalerRuleStreamRequest.getJobId().getId(), true, (List) null);
            return completableFuture.thenApply(getJobScalerRuleStreamSubjectResponse -> {
                BehaviorSubject<JobScalerRuleInfo> jobScalerRuleStreamBehaviorSubject = getJobScalerRuleStreamSubjectResponse.getJobScalerRuleStreamBehaviorSubject();
                if (!getJobScalerRuleStreamSubjectResponse.responseCode.equals(BaseResponse.ResponseCode.SUCCESS) || jobScalerRuleStreamBehaviorSubject == null) {
                    logger.info("Failed to get job scaler rule info stream for {}", getJobScalerRuleStreamRequest.getJobId());
                    this.jobScalerRuleInfoStreamErrors.increment();
                    return ((JobClusterScalerRuleProto.GetJobScalerRuleStreamResponse.GetJobScalerRuleStreamResponseBuilder) ((JobClusterScalerRuleProto.GetJobScalerRuleStreamResponse.GetJobScalerRuleStreamResponseBuilder) ((JobClusterScalerRuleProto.GetJobScalerRuleStreamResponse.GetJobScalerRuleStreamResponseBuilder) JobClusterScalerRuleProto.GetJobScalerRuleStreamResponse.builder().requestId(getJobScalerRuleStreamSubjectResponse.requestId)).responseCode(getJobScalerRuleStreamSubjectResponse.responseCode)).message(getJobScalerRuleStreamSubjectResponse.message)).build();
                }
                return ((JobClusterScalerRuleProto.GetJobScalerRuleStreamResponse.GetJobScalerRuleStreamResponseBuilder) ((JobClusterScalerRuleProto.GetJobScalerRuleStreamResponse.GetJobScalerRuleStreamResponseBuilder) ((JobClusterScalerRuleProto.GetJobScalerRuleStreamResponse.GetJobScalerRuleStreamResponseBuilder) JobClusterScalerRuleProto.GetJobScalerRuleStreamResponse.builder().requestId(getJobScalerRuleStreamSubjectResponse.requestId)).responseCode(getJobScalerRuleStreamSubjectResponse.responseCode)).message(getJobScalerRuleStreamSubjectResponse.message)).scalerRuleObs(Observable.merge(jobScalerRuleStreamBehaviorSubject.doOnCompleted(() -> {
                    atomicBoolean.set(true);
                }), Observable.interval(5L, this.serverIdleConnectionTimeout.getSeconds() - 1, TimeUnit.SECONDS).map(l -> {
                    return !atomicBoolean.get() ? JobDiscoveryHeartbeats.JOB_SCALER_RULES_INFO_HB_INSTANCE : jobScalerRuleInfo;
                }).takeWhile(jobScalerRuleInfo2 -> {
                    return Boolean.valueOf(z);
                }))).build();
            });
        } catch (Exception e) {
            logger.error("caught exception fetching job scaler rule info stream for {}", getJobScalerRuleStreamRequest.getJobId(), e);
            this.jobScalerRuleInfoStreamErrors.increment();
            return CompletableFuture.completedFuture(((JobClusterScalerRuleProto.GetJobScalerRuleStreamResponse.GetJobScalerRuleStreamResponseBuilder) ((JobClusterScalerRuleProto.GetJobScalerRuleStreamResponse.GetJobScalerRuleStreamResponseBuilder) ((JobClusterScalerRuleProto.GetJobScalerRuleStreamResponse.GetJobScalerRuleStreamResponseBuilder) JobClusterScalerRuleProto.GetJobScalerRuleStreamResponse.builder().requestId(0L)).responseCode(BaseResponse.ResponseCode.SERVER_ERROR)).message("Failed to get scaler rule info stream for " + getJobScalerRuleStreamRequest.getJobId() + " error: " + e.getMessage())).build());
        }
    }
}
