package io.mantisrx.runtime.executor;

import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.runtime.GroupToGroup;
import io.mantisrx.runtime.GroupToScalar;
import io.mantisrx.runtime.KeyToKey;
import io.mantisrx.runtime.KeyToScalar;
import io.mantisrx.runtime.ScalarToGroup;
import io.mantisrx.runtime.ScalarToKey;
import io.mantisrx.runtime.ScalarToScalar;
import io.mantisrx.runtime.StageConfig;
import io.reactivex.mantis.remote.observable.ConnectToGroupedObservable;
import io.reactivex.mantis.remote.observable.ConnectToObservable;
import io.reactivex.mantis.remote.observable.DynamicConnectionSet;
import io.reactivex.mantis.remote.observable.EndpointInjector;
import io.reactivex.mantis.remote.observable.reconciliator.Reconciliator;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

/* loaded from: input_file:io/mantisrx/runtime/executor/WorkerConsumerRemoteObservable.class */
public class WorkerConsumerRemoteObservable<T, R> implements WorkerConsumer<T> {
    private static final Logger logger = LoggerFactory.getLogger(WorkerConsumerRemoteObservable.class);
    private final String name;
    private final EndpointInjector injector;
    private DynamicConnectionSet<T> connectionSet;
    private Reconciliator<T> reconciliator;

    public WorkerConsumerRemoteObservable(String str, EndpointInjector endpointInjector) {
        this.name = str;
        this.injector = endpointInjector;
    }

    @Override // io.mantisrx.runtime.executor.WorkerConsumer
    public Observable<Observable<T>> start(StageConfig<T, ?> stageConfig) {
        if ((stageConfig instanceof KeyToKey) || (stageConfig instanceof KeyToScalar) || (stageConfig instanceof GroupToScalar) || (stageConfig instanceof GroupToGroup)) {
            logger.info("Remote connection to stage " + this.name + " is KeyedStage");
            this.connectionSet = DynamicConnectionSet.createMGO(new ConnectToGroupedObservable.Builder().name(this.name).keyDecoder(stageConfig.getInputKeyCodec()).valueDecoder(stageConfig.getInputCodec()).subscribeAttempts(30));
        } else {
            if (!(stageConfig instanceof ScalarToScalar) && !(stageConfig instanceof ScalarToKey) && !(stageConfig instanceof ScalarToGroup)) {
                throw new RuntimeException("Unsupported stage type: " + stageConfig);
            }
            logger.info("Remote connection to stage " + this.name + " is ScalarStage");
            this.connectionSet = DynamicConnectionSet.create(new ConnectToObservable.Builder().name(this.name).decoder(stageConfig.getInputCodec()).subscribeAttempts(30));
        }
        this.reconciliator = new Reconciliator.Builder().name("worker2worker_" + this.name).connectionSet(this.connectionSet).injector(this.injector).build();
        registerMetrics(this.reconciliator.getMetrics());
        registerMetrics(this.connectionSet.getConnectionMetrics());
        return this.reconciliator.observables();
    }

    private void registerMetrics(Metrics metrics) {
        MetricsRegistry.getInstance().registerAndGet(metrics);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
    }
}
