package io.mantisrx.client;

import com.mantisrx.common.utils.Services;
import io.mantisrx.runtime.JobSla;
import io.mantisrx.runtime.MantisJobState;
import io.mantisrx.runtime.descriptor.SchedulingInfo;
import io.mantisrx.runtime.parameter.Parameter;
import io.mantisrx.server.core.Configurations;
import io.mantisrx.server.core.CoreConfiguration;
import io.mantisrx.server.core.JobSchedulingInfo;
import io.mantisrx.server.master.client.HighAvailabilityServices;
import io.mantisrx.server.master.client.HighAvailabilityServicesUtil;
import io.mantisrx.server.master.client.JobSubmitResponse;
import io.mantisrx.server.master.client.MantisMasterGateway;
import io.mantisrx.server.master.client.MasterClientWrapper;
import io.reactivex.mantis.remote.observable.EndpointChange;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.subjects.PublishSubject;

/* loaded from: input_file:io/mantisrx/client/MantisClient.class */
public class MantisClient {
    private static final Logger logger = LoggerFactory.getLogger(MantisClient.class);
    private static final String ENABLE_PINGS_KEY = "mantis.sse.disablePingFiltering";
    private final boolean disablePingFiltering;
    private final MasterClientWrapper clientWrapper;
    private final JobSinkLocator jobSinkLocator;

    public MantisClient(Properties properties) {
        this.jobSinkLocator = new JobSinkLocator() { // from class: io.mantisrx.client.MantisClient.1
            @Override // io.mantisrx.client.JobSinkLocator
            public Observable<EndpointChange> locateSinkForJob(String str) {
                return locatePartitionedSinkForJob(str, -1, 0);
            }

            @Override // io.mantisrx.client.JobSinkLocator
            public Observable<EndpointChange> locatePartitionedSinkForJob(String str, int i, int i2) {
                return MantisClient.this.clientWrapper.getMasterClientApi().flatMap(mantisMasterGateway -> {
                    return mantisMasterGateway.getSinkStageNum(str).take(1).flatMap(num -> {
                        MantisClient.logger.info("Getting sink locations for " + str);
                        return MantisClient.this.clientWrapper.getSinkLocations(str, num.intValue(), i, i2);
                    });
                });
            }
        };
        HighAvailabilityServices createHAServices = HighAvailabilityServicesUtil.createHAServices((CoreConfiguration) Configurations.frmProperties(properties, CoreConfiguration.class));
        Services.startAndWait(createHAServices);
        this.clientWrapper = new MasterClientWrapper(createHAServices.getMasterClientApi());
        this.disablePingFiltering = Boolean.parseBoolean(properties.getProperty(ENABLE_PINGS_KEY));
    }

    public MantisClient(MasterClientWrapper masterClientWrapper, boolean z) {
        this.jobSinkLocator = new JobSinkLocator() { // from class: io.mantisrx.client.MantisClient.1
            @Override // io.mantisrx.client.JobSinkLocator
            public Observable<EndpointChange> locateSinkForJob(String str) {
                return locatePartitionedSinkForJob(str, -1, 0);
            }

            @Override // io.mantisrx.client.JobSinkLocator
            public Observable<EndpointChange> locatePartitionedSinkForJob(String str, int i, int i2) {
                return MantisClient.this.clientWrapper.getMasterClientApi().flatMap(mantisMasterGateway -> {
                    return mantisMasterGateway.getSinkStageNum(str).take(1).flatMap(num -> {
                        MantisClient.logger.info("Getting sink locations for " + str);
                        return MantisClient.this.clientWrapper.getSinkLocations(str, num.intValue(), i, i2);
                    });
                });
            }
        };
        this.disablePingFiltering = z;
        this.clientWrapper = masterClientWrapper;
    }

    public MantisClient(MasterClientWrapper masterClientWrapper) {
        this(masterClientWrapper, false);
    }

    public JobSinkLocator getSinkLocator() {
        return this.jobSinkLocator;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MasterClientWrapper getClientWrapper() {
        return this.clientWrapper;
    }

    private MantisMasterGateway blockAndGetMasterApi() {
        return (MantisMasterGateway) this.clientWrapper.getMasterClientApi().toBlocking().first();
    }

    public Observable<Boolean> namedJobExists(String str) {
        return this.clientWrapper.namedJobExists(str);
    }

    public <T> Observable<SinkClient<T>> getSinkClientByJobName(String str, SinkConnectionFunc<T> sinkConnectionFunc, Observer<SinkConnectionsStatus> observer) {
        return getSinkClientByJobName(str, sinkConnectionFunc, observer, 5L);
    }

    public <T> Observable<SinkClient<T>> getSinkClientByJobName(String str, SinkConnectionFunc<T> sinkConnectionFunc, Observer<SinkConnectionsStatus> observer, long j) {
        AtomicReference atomicReference = new AtomicReference();
        return this.clientWrapper.getNamedJobsIds(str).doOnUnsubscribe(() -> {
            atomicReference.set(null);
        }).filter(str2 -> {
            logger.info("Got job cluster's new jobId=" + str2);
            return newJobIdIsGreater((String) atomicReference.get(), str2);
        }).map(str3 -> {
            if ("No_such_named_job".equals(str3)) {
                return getErrorSinkClient(str3);
            }
            atomicReference.set(str3);
            logger.info("Connecting to job " + str + " with new jobId=" + str3);
            return getSinkClientByJobId(str3, sinkConnectionFunc, observer, j);
        });
    }

    private Boolean newJobIdIsGreater(String str, String str2) {
        int lastIndexOf;
        int lastIndexOf2;
        if (str != null && (lastIndexOf = str.lastIndexOf(45)) >= 0 && (lastIndexOf2 = str2.lastIndexOf(45)) >= 0) {
            try {
                return Boolean.valueOf(Integer.parseInt(str2.substring(lastIndexOf2 + 1)) > Integer.parseInt(str.substring(lastIndexOf + 1)));
            } catch (IndexOutOfBoundsException | NumberFormatException e) {
                return true;
            }
        }
        return true;
    }

    private <T> SinkClient<T> getErrorSinkClient(final String str) {
        return new SinkClient<T>() { // from class: io.mantisrx.client.MantisClient.2
            @Override // io.mantisrx.client.SinkClient
            public boolean hasError() {
                return true;
            }

            @Override // io.mantisrx.client.SinkClient
            public String getError() {
                return str;
            }

            @Override // io.mantisrx.client.SinkClient
            public Observable<Observable<T>> getResults() {
                return null;
            }

            @Override // io.mantisrx.client.SinkClient
            public Observable<Observable<T>> getPartitionedResults(int i, int i2) {
                return null;
            }
        };
    }

    public <T> SinkClient<T> getSinkClientByJobId(String str, SinkConnectionFunc<T> sinkConnectionFunc, Observer<SinkConnectionsStatus> observer) {
        return getSinkClientByJobId(str, sinkConnectionFunc, observer, 5L);
    }

    public <T> SinkClient<T> getSinkClientByJobId(String str, SinkConnectionFunc<T> sinkConnectionFunc, Observer<SinkConnectionsStatus> observer, long j) {
        PublishSubject create = PublishSubject.create();
        this.clientWrapper.addNumSinkWorkersObserver(create);
        return new SinkClientImpl(str, sinkConnectionFunc, getSinkLocator(), create.filter(jobSinkNumWorkers -> {
            return Boolean.valueOf(str.equals(jobSinkNumWorkers.getJobId()));
        }), observer, j, this.disablePingFiltering);
    }

    public String submitJob(String str, String str2, List<Parameter> list, JobSla jobSla, SchedulingInfo schedulingInfo) throws Exception {
        return ((JobSubmitResponse) this.clientWrapper.getMasterClientApi().flatMap(mantisMasterGateway -> {
            return mantisMasterGateway.submitJob(str, str2, list, jobSla, schedulingInfo).onErrorResumeNext(th -> {
                logger.warn(th.getMessage());
                return Observable.empty();
            });
        }).take(1).toBlocking().first()).getJobId();
    }

    public String submitJob(String str, String str2, List<Parameter> list, JobSla jobSla, long j, SchedulingInfo schedulingInfo) throws Exception {
        return ((JobSubmitResponse) this.clientWrapper.getMasterClientApi().flatMap(mantisMasterGateway -> {
            return mantisMasterGateway.submitJob(str, str2, list, jobSla, j, schedulingInfo).onErrorResumeNext(th -> {
                logger.warn(th.getMessage());
                return Observable.empty();
            });
        }).take(1).toBlocking().first()).getJobId();
    }

    public String submitJob(String str, String str2, List<Parameter> list, JobSla jobSla, long j, SchedulingInfo schedulingInfo, boolean z) throws Exception {
        return ((JobSubmitResponse) this.clientWrapper.getMasterClientApi().flatMap(mantisMasterGateway -> {
            return mantisMasterGateway.submitJob(str, str2, list, jobSla, j, schedulingInfo, z).onErrorResumeNext(th -> {
                logger.warn(th.getMessage());
                return Observable.empty();
            });
        }).take(1).toBlocking().first()).getJobId();
    }

    public void killJob(String str) {
        this.clientWrapper.getMasterClientApi().flatMap(mantisMasterGateway -> {
            return mantisMasterGateway.killJob(str).onErrorResumeNext(th -> {
                logger.warn(th.getMessage());
                return Observable.empty();
            });
        }).take(1).toBlocking().first();
    }

    public Observable<String> getJobsOfNamedJob(String str, MantisJobState.MetaState metaState) {
        return this.clientWrapper.getMasterClientApi().flatMap(mantisMasterGateway -> {
            return mantisMasterGateway.getJobsOfNamedJob(str, metaState);
        }).first();
    }

    public Observable<String> getJobStatusObservable(String str) {
        return this.clientWrapper.getMasterClientApi().flatMap(mantisMasterGateway -> {
            return mantisMasterGateway.getJobStatusObservable(str);
        });
    }

    public Observable<JobSchedulingInfo> getSchedulingChanges(String str) {
        return this.clientWrapper.getMasterClientApi().flatMap(mantisMasterGateway -> {
            return mantisMasterGateway.schedulingChanges(str);
        });
    }

    public Observable<JobSchedulingInfo> jobClusterDiscoveryInfoStream(String str) {
        AtomicReference atomicReference = new AtomicReference();
        return this.clientWrapper.getNamedJobsIds(str).doOnUnsubscribe(() -> {
            atomicReference.set(null);
        }).filter(str2 -> {
            logger.info("Got job cluster {}'s new jobId : {}", str, str2);
            return newJobIdIsGreater((String) atomicReference.get(), str2);
        }).switchMap(str3 -> {
            if ("No_such_named_job".equals(str3)) {
                return Observable.error(new Exception("No such job cluster " + str));
            }
            atomicReference.set(str3);
            logger.info("[{}] switched to streaming discovery info for {}", str, str3);
            return getSchedulingChanges(str3);
        });
    }
}
