package io.reactivex.mantis.remote.observable;

import io.mantisrx.common.codec.Encoder;
import io.reactivex.mantis.remote.observable.filter.ServerSideFilters;
import io.reactivex.mantis.remote.observable.slotting.RoundRobin;
import io.reactivex.mantis.remote.observable.slotting.SlottingStrategy;
import java.util.Map;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;

/* loaded from: input_file:io/reactivex/mantis/remote/observable/ServeObservable.class */
public class ServeObservable<T> extends ServeConfig<T, T> {
    private Encoder<T> encoder;
    private Observable<T> observable;
    private boolean subscriptionPerConnection;
    private boolean isHotStream;

    /* loaded from: input_file:io/reactivex/mantis/remote/observable/ServeObservable$Builder.class */
    public static class Builder<T> {
        public boolean isHotStream;
        private String name;
        private Observable<T> observable;
        private Encoder<T> encoder;
        private int maxWriteAttempts;
        private boolean subscriptionPerConnection;
        private SlottingStrategy<T> slottingStrategy = new RoundRobin();
        private Func1<Map<String, String>, Func1<T, Boolean>> filterFunction = ServerSideFilters.noFiltering();

        public Builder<T> name(String str) {
            if (str != null && str.length() > 127) {
                throw new IllegalArgumentException("Observable name must be less than 127 characters");
            }
            this.name = str;
            return this;
        }

        public Builder<T> observable(Observable<T> observable) {
            this.observable = observable;
            return this;
        }

        public Builder<T> maxWriteAttempts(Observable<T> observable) {
            this.observable = observable;
            return this;
        }

        public Builder<T> slottingStrategy(SlottingStrategy<T> slottingStrategy) {
            this.slottingStrategy = slottingStrategy;
            return this;
        }

        public Builder<T> encoder(Encoder<T> encoder) {
            this.encoder = encoder;
            return this;
        }

        public Builder<T> serverSideFilter(Func1<Map<String, String>, Func1<T, Boolean>> func1) {
            this.filterFunction = func1;
            return this;
        }

        public ServeObservable<T> build() {
            return new ServeObservable<>(this);
        }

        public Builder<T> subscriptionPerConnection() {
            this.subscriptionPerConnection = true;
            return this;
        }

        public Builder<T> hotStream() {
            this.isHotStream = true;
            return this;
        }
    }

    public ServeObservable(Builder<T> builder) {
        super(((Builder) builder).name, ((Builder) builder).slottingStrategy, ((Builder) builder).filterFunction, ((Builder) builder).maxWriteAttempts);
        this.encoder = ((Builder) builder).encoder;
        this.subscriptionPerConnection = ((Builder) builder).subscriptionPerConnection;
        this.isHotStream = builder.isHotStream;
        this.observable = ((Builder) builder).observable;
        if (((Builder) builder).subscriptionPerConnection) {
            return;
        }
        applySlottingSideEffectToObservable(((Builder) builder).observable);
    }

    public boolean isSubscriptionPerConnection() {
        return this.subscriptionPerConnection;
    }

    public Observable<T> getObservable() {
        return this.observable;
    }

    public boolean isHotStream() {
        return this.isHotStream;
    }

    private void applySlottingSideEffectToObservable(Observable<T> observable) {
        final Observable doOnTerminate = observable.doOnNext(new Action1<T>() { // from class: io.reactivex.mantis.remote.observable.ServeObservable.2
            public void call(T t) {
                ServeObservable.this.slottingStrategy.writeOnSlot(null, t);
            }
        }).doOnTerminate(new Action0() { // from class: io.reactivex.mantis.remote.observable.ServeObservable.1
            public void call() {
                ServeObservable.this.slottingStrategy.completeAllConnections();
            }
        });
        final MutableReference mutableReference = new MutableReference();
        this.slottingStrategy.registerDoAfterFirstConnectionAdded(new Action0() { // from class: io.reactivex.mantis.remote.observable.ServeObservable.3
            public void call() {
                mutableReference.setValue(doOnTerminate.subscribe());
            }
        });
        this.slottingStrategy.registerDoAfterLastConnectionRemoved(new Action0() { // from class: io.reactivex.mantis.remote.observable.ServeObservable.4
            public void call() {
                ((Subscription) mutableReference.getValue()).unsubscribe();
            }
        });
    }

    public Encoder<T> getEncoder() {
        return this.encoder;
    }
}
