package io.mantisrx.client;

import io.mantisrx.common.MantisServerSentEvent;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.runtime.JobSla;
import io.mantisrx.runtime.MantisJobDurationType;
import io.mantisrx.runtime.descriptor.SchedulingInfo;
import io.mantisrx.runtime.parameter.Parameter;
import io.mantisrx.runtime.parameter.SinkParameters;
import io.mantisrx.server.master.client.ConditionalRetry;
import io.mantisrx.server.master.client.NoSuchJobException;
import io.reactivx.mantis.operators.DropOperator;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Action1;
import rx.schedulers.Schedulers;

/* loaded from: input_file:io/mantisrx/client/MantisSSEJob.class */
public class MantisSSEJob implements Closeable {
    private static final String ConnectTimeoutSecsPropertyName = "MantisClientConnectTimeoutSecs";
    private static final Logger logger = LoggerFactory.getLogger(MantisSSEJob.class);
    private final Builder builder;
    private final Mode mode;
    private Observable<Observable<MantisServerSentEvent>> resultsObservable;
    private String jobId;
    private int forPartition;
    private int totalPartitions;

    /* loaded from: input_file:io/mantisrx/client/MantisSSEJob$Builder.class */
    public static class Builder {
        private final MantisClient mantisClient;
        private final List<Parameter> parameters;
        private String name;
        private String jarVersion;
        private SinkParameters sinkParameters;
        private Action1<Throwable> onConnectionReset;
        private boolean ephemeral;
        private SchedulingInfo schedulingInfo;
        private JobSla jobSla;
        private long connectTimeoutSecs;
        private Observer<SinkConnectionsStatus> sinkConnectionsStatusObserver;
        private long dataRecvTimeoutSecs;

        public Builder(Properties properties) {
            this(new MantisClient(properties));
        }

        public Builder() {
            this.parameters = new ArrayList();
            this.sinkParameters = new SinkParameters.Builder().build();
            this.ephemeral = false;
            this.connectTimeoutSecs = 0L;
            this.sinkConnectionsStatusObserver = null;
            this.dataRecvTimeoutSecs = 5L;
            Properties properties = new Properties();
            properties.setProperty("mantis.zookeeper.connectionTimeMs", "1000");
            properties.setProperty("mantis.zookeeper.connection.retrySleepMs", "500");
            properties.setProperty("mantis.zookeeper.connection.retryCount", "500");
            properties.setProperty("mantis.zookeeper.connectString", System.getenv("mantis.zookeeper.connectString"));
            properties.setProperty("mantis.zookeeper.root", System.getenv("mantis.zookeeper.root"));
            properties.setProperty("mantis.zookeeper.leader.announcement.path", System.getenv("mantis.zookeeper.leader.announcement.path"));
            this.mantisClient = new MantisClient(properties);
        }

        public Builder(MantisClient mantisClient) {
            this.parameters = new ArrayList();
            this.sinkParameters = new SinkParameters.Builder().build();
            this.ephemeral = false;
            this.connectTimeoutSecs = 0L;
            this.sinkConnectionsStatusObserver = null;
            this.dataRecvTimeoutSecs = 5L;
            this.mantisClient = mantisClient;
        }

        public Builder name(String str) {
            this.name = str;
            return this;
        }

        public Builder jarVersion(String str) {
            this.jarVersion = str;
            return this;
        }

        public Builder parameters(Parameter... parameterArr) {
            this.parameters.addAll(Arrays.asList(parameterArr));
            return this;
        }

        public Builder sinkParams(SinkParameters sinkParameters) {
            this.sinkParameters = sinkParameters;
            return this;
        }

        public Builder onCloseKillJob() {
            this.ephemeral = true;
            return this;
        }

        public Builder schedulingInfo(SchedulingInfo schedulingInfo) {
            this.schedulingInfo = schedulingInfo;
            return this;
        }

        public Builder jobSla(JobSla jobSla) {
            this.jobSla = jobSla;
            if (jobSla != null) {
                this.ephemeral = jobSla.getDurationType() == MantisJobDurationType.Transient;
            }
            return this;
        }

        public Builder connectTimeoutSecs(long j) {
            this.connectTimeoutSecs = j;
            return this;
        }

        public Builder onConnectionReset(Action1<Throwable> action1) {
            this.onConnectionReset = action1;
            return this;
        }

        public Builder sinkConnectionsStatusObserver(Observer<SinkConnectionsStatus> observer) {
            this.sinkConnectionsStatusObserver = observer;
            return this;
        }

        public Builder sinkDataRecvTimeoutSecs(long j) {
            this.dataRecvTimeoutSecs = j;
            return this;
        }

        public MantisSSEJob buildJobSubmitter() {
            return new MantisSSEJob(this, Mode.Submit);
        }

        public MantisSSEJob buildJobConnector(int i, int i2) {
            if (i >= i2) {
                throw new IllegalArgumentException("forPartition " + i + " must be less than totalPartitions " + i2);
            }
            MantisSSEJob mantisSSEJob = new MantisSSEJob(this, Mode.Connect);
            mantisSSEJob.forPartition = i;
            mantisSSEJob.totalPartitions = i2;
            return mantisSSEJob;
        }

        public MantisSSEJob buildJobConnector() {
            return new MantisSSEJob(this, Mode.Connect);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/mantisrx/client/MantisSSEJob$Mode.class */
    public enum Mode {
        Submit,
        Connect
    }

    private MantisSSEJob(Builder builder, Mode mode) {
        this.jobId = null;
        this.forPartition = -1;
        this.totalPartitions = 0;
        this.builder = builder;
        this.mode = mode;
        if (builder.connectTimeoutSecs > 0) {
            System.setProperty(ConnectTimeoutSecsPropertyName, String.valueOf(builder.connectTimeoutSecs));
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() {
        if (this.mode == Mode.Submit && this.builder.ephemeral) {
            if (this.jobId == null) {
                logger.warn("Unexpected to not have JobId to kill ephemeral job");
            } else {
                this.builder.mantisClient.killJob(this.jobId);
                logger.info("Sent kill to master for job " + this.jobId);
            }
        }
    }

    public String getJobId() {
        return this.jobId;
    }

    private Observable<Observable<MantisServerSentEvent>> sinksToObservable(Observable<SinkClient<MantisServerSentEvent>> observable) {
        ConditionalRetry conditionalRetry = new ConditionalRetry((Counter) null, "SinkClient_" + this.builder.name);
        return observable.switchMap(sinkClient -> {
            return sinkClient.hasError() ? Observable.just(Observable.just(new MantisServerSentEvent(sinkClient.getError()))) : sinkClient.getPartitionedResults(this.forPartition, this.totalPartitions);
        }).doOnError(th -> {
            logger.warn("Error getting sink Observable: " + th.getMessage());
            if (th instanceof NoSuchJobException) {
                return;
            }
            conditionalRetry.setErrorRef(th);
        }).retryWhen(conditionalRetry.getRetryLogic());
    }

    @Deprecated
    public synchronized Observable<MantisServerSentEvent> connectAndGetObservable() throws IllegalStateException {
        return connectAndGet().flatMap(observable -> {
            return observable;
        });
    }

    public synchronized Observable<Observable<MantisServerSentEvent>> connectAndGet() throws IllegalStateException {
        if (this.mode != Mode.Connect) {
            throw new IllegalStateException("Can't call connect to sink");
        }
        if (this.resultsObservable == null) {
            logger.info("Getting sink for job name " + this.builder.name);
            this.resultsObservable = ((Boolean) this.builder.mantisClient.namedJobExists(this.builder.name).take(1).toBlocking().first()).booleanValue() ? sinksToObservable(this.builder.mantisClient.getSinkClientByJobName(this.builder.name, new SseSinkConnectionFunction(true, this.builder.onConnectionReset, this.builder.sinkParameters), this.builder.sinkConnectionsStatusObserver, this.builder.dataRecvTimeoutSecs)).share() : Observable.just(Observable.just(new MantisServerSentEvent("No such job name " + this.builder.name)));
        }
        return this.resultsObservable;
    }

    @Deprecated
    public synchronized Observable<MantisServerSentEvent> submitAndGetObservable() throws IllegalStateException {
        return submitAndGet().flatMap(observable -> {
            return observable;
        });
    }

    public synchronized Observable<Observable<MantisServerSentEvent>> submitAndGet() throws IllegalStateException {
        if (this.mode != Mode.Submit) {
            throw new IllegalStateException("Can't submit job");
        }
        return this.resultsObservable != null ? this.resultsObservable : Observable.create(new Observable.OnSubscribe<Observable<MantisServerSentEvent>>() { // from class: io.mantisrx.client.MantisSSEJob.1
            public void call(Subscriber<? super Observable<MantisServerSentEvent>> subscriber) {
                JobSla jobSla;
                try {
                    if (MantisSSEJob.this.builder.jobSla == null) {
                        jobSla = new JobSla(0L, 0L, JobSla.StreamSLAType.Lossy, MantisSSEJob.this.builder.ephemeral ? MantisJobDurationType.Transient : MantisJobDurationType.Perpetual, "");
                    } else {
                        jobSla = new JobSla(MantisSSEJob.this.builder.jobSla.getRuntimeLimitSecs(), MantisSSEJob.this.builder.jobSla.getMinRuntimeSecs(), MantisSSEJob.this.builder.jobSla.getSlaType(), MantisSSEJob.this.builder.ephemeral ? MantisJobDurationType.Transient : MantisJobDurationType.Perpetual, MantisSSEJob.this.builder.jobSla.getUserProvidedType());
                    }
                    MantisSSEJob.this.jobId = MantisSSEJob.this.builder.mantisClient.submitJob(MantisSSEJob.this.builder.name, MantisSSEJob.this.builder.jarVersion, MantisSSEJob.this.builder.parameters, jobSla, MantisSSEJob.this.builder.schedulingInfo);
                    MantisSSEJob.logger.info("Submitted job name " + MantisSSEJob.this.builder.name + " and got jobId: " + MantisSSEJob.this.jobId);
                    MantisSSEJob.this.resultsObservable = MantisSSEJob.this.builder.mantisClient.getSinkClientByJobId(MantisSSEJob.this.jobId, new SseSinkConnectionFunction(true, MantisSSEJob.this.builder.onConnectionReset), MantisSSEJob.this.builder.sinkConnectionsStatusObserver, MantisSSEJob.this.builder.dataRecvTimeoutSecs).getResults();
                    MantisSSEJob.this.resultsObservable.subscribe(subscriber);
                } catch (Exception e) {
                    subscriber.onError(e);
                }
            }
        }).doOnError(th -> {
            logger.warn(th.getMessage());
        }).lift(new DropOperator("client_submit_sse_share")).share().observeOn(Schedulers.io());
    }
}
