package io.mantisrx.runtime.executor;

import io.mantisrx.common.metrics.rx.MonitorOperator;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.MantisJobDurationType;
import io.mantisrx.runtime.PortRequest;
import io.mantisrx.runtime.SinkHolder;
import io.mantisrx.runtime.StageConfig;
import io.mantisrx.runtime.sink.Sink;
import io.reactivex.mantis.remote.observable.RxMetrics;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;

/* loaded from: input_file:io/mantisrx/runtime/executor/SinkPublisher.class */
public class SinkPublisher<T> implements WorkerPublisher<T> {
    private static final Logger logger = LoggerFactory.getLogger(SinkPublisher.class);
    private final SinkHolder<T> sinkHolder;
    private final PortSelector portSelector;
    private final Context context;
    private final Action0 observableTerminatedCallback;
    private final Action0 onSubscribeAction;
    private final Action0 onUnsubscribeAction;
    private final Action0 observableOnCompleteCallback;
    private final Action1<Throwable> observableOnErrorCallback;
    private Subscription eagerSubscription;
    private Sink<T> sink;

    public SinkPublisher(SinkHolder<T> sinkHolder, PortSelector portSelector, Context context, Action0 action0, Action0 action02, Action0 action03, Action0 action04, Action1<Throwable> action1) {
        this.sinkHolder = sinkHolder;
        this.portSelector = portSelector;
        this.context = context;
        this.observableTerminatedCallback = action0;
        this.onSubscribeAction = action02;
        this.onUnsubscribeAction = action03;
        this.observableOnCompleteCallback = action04;
        this.observableOnErrorCallback = action1;
    }

    @Override // io.mantisrx.runtime.executor.WorkerPublisher
    public void start(StageConfig<?, T> stageConfig, Observable<Observable<T>> observable) {
        this.sink = this.sinkHolder.getSinkAction();
        int i = -1;
        if (this.sinkHolder.isPortRequested()) {
            i = this.portSelector.acquirePort();
        }
        Observable lift = Observable.merge(observable).lift(new MonitorOperator("worker_sink"));
        Observable share = Observable.create(subscriber -> {
            logger.info("Got sink subscription with onSubscribeAction={}", this.onSubscribeAction);
            lift.doOnCompleted(this.observableOnCompleteCallback).doOnError(this.observableOnErrorCallback).doOnTerminate(this.observableTerminatedCallback).subscribe(subscriber);
            if (this.onSubscribeAction != null) {
                this.onSubscribeAction.call();
            }
        }).doOnCompleted(() -> {
            logger.info("Sink observable subscription completed.");
        }).doOnError(th -> {
            logger.error("Sink observable subscription onError:", th);
        }).doOnTerminate(() -> {
            logger.info("Sink observable subscription termindated.");
        }).doOnUnsubscribe(() -> {
            logger.info("Sink subscriptions clean up, action={}", this.onUnsubscribeAction);
            if (this.onUnsubscribeAction != null) {
                this.onUnsubscribeAction.call();
            }
        }).share();
        if (this.context.getWorkerInfo().getDurationType() == MantisJobDurationType.Perpetual) {
            logger.info("eagerSubscription subscribed for Perpetual job.");
            this.eagerSubscription = share.subscribe();
        }
        this.sink.init(this.context);
        this.sink.call(this.context, new PortRequest(i), share);
    }

    @Override // io.mantisrx.runtime.executor.WorkerPublisher
    public RxMetrics getMetrics() {
        return null;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            this.sink.close();
        } finally {
            this.sink = null;
            if (this.eagerSubscription != null) {
                this.eagerSubscription.unsubscribe();
                this.eagerSubscription = null;
            }
        }
    }
}
