package io.reactivex.mantis.remote.observable;

import io.mantisrx.common.codec.Encoder;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.network.HashFunctions;
import io.mantisrx.server.core.ServiceRegistry;
import io.reactivex.mantis.remote.observable.filter.ServerSideFilters;
import io.reactivex.mantis.remote.observable.slotting.ConsistentHashing;
import io.reactivex.mantis.remote.observable.slotting.SlottingStrategy;
import io.reactivx.mantis.operators.DisableBackPressureOperator;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Notification;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.observables.GroupedObservable;

/* loaded from: input_file:io/reactivex/mantis/remote/observable/ServeGroupedObservable.class */
public class ServeGroupedObservable<K, V> extends ServeConfig<K, Group<String, V>> {
    private static final Logger logger = LoggerFactory.getLogger(ServeGroupedObservable.class);
    private Encoder<String> keyEncoder;
    private Encoder<V> valueEncoder;
    private int groupBufferTimeMSec;
    private long expiryInSecs;
    private Counter groupsExpiredCounter;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.reactivex.mantis.remote.observable.ServeGroupedObservable$2, reason: invalid class name */
    /* loaded from: input_file:io/reactivex/mantis/remote/observable/ServeGroupedObservable$2.class */
    public class AnonymousClass2 implements Func1<Observable<GroupedObservable<String, V>>, Observable<List<Group<String, V>>>> {
        AnonymousClass2() {
        }

        public Observable<List<Group<String, V>>> call(Observable<GroupedObservable<String, V>> observable) {
            return observable.flatMap(new Func1<GroupedObservable<String, V>, Observable<List<Group<String, V>>>>() { // from class: io.reactivex.mantis.remote.observable.ServeGroupedObservable.2.1
                public Observable<List<Group<String, V>>> call(GroupedObservable<String, V> groupedObservable) {
                    final byte[] encode = ServeGroupedObservable.this.keyEncoder.encode(groupedObservable.getKey());
                    final String str = (String) groupedObservable.getKey();
                    return groupedObservable.doOnUnsubscribe(new Action0() { // from class: io.reactivex.mantis.remote.observable.ServeGroupedObservable.2.1.3
                        public void call() {
                            ServeGroupedObservable.this.groupsExpiredCounter.increment();
                        }
                    }).timeout(ServeGroupedObservable.this.expiryInSecs, TimeUnit.SECONDS, Observable.empty()).materialize().lift(new DisableBackPressureOperator()).buffer(ServeGroupedObservable.this.groupBufferTimeMSec, TimeUnit.MILLISECONDS).filter(new Func1<List<Notification<V>>, Boolean>() { // from class: io.reactivex.mantis.remote.observable.ServeGroupedObservable.2.1.2
                        public Boolean call(List<Notification<V>> list) {
                            return Boolean.valueOf((list == null || list.isEmpty()) ? false : true);
                        }
                    }).map(new Func1<List<Notification<V>>, List<Group<String, V>>>() { // from class: io.reactivex.mantis.remote.observable.ServeGroupedObservable.2.1.1
                        public List<Group<String, V>> call(List<Notification<V>> list) {
                            ArrayList arrayList = new ArrayList(list.size());
                            Iterator<Notification<V>> it = list.iterator();
                            while (it.hasNext()) {
                                arrayList.add(new Group(str, encode, it.next()));
                            }
                            return arrayList;
                        }
                    });
                }
            });
        }
    }

    /* loaded from: input_file:io/reactivex/mantis/remote/observable/ServeGroupedObservable$Builder.class */
    public static class Builder<K, V> {
        private Observable<Observable<GroupedObservable<String, V>>> observable;
        private Encoder<String> keyEncoder;
        private Encoder<V> valueEncoder;
        private String name;
        private SlottingStrategy<Group<String, V>> slottingStrategy = new ConsistentHashing(this.name, HashFunctions.ketama());
        private Func1<Map<String, String>, Func1<K, Boolean>> filterFunction = ServerSideFilters.noFiltering();
        private int maxWriteAttempts = 3;
        private long expiryTimeInSecs = Long.MAX_VALUE;
        private Observable<Integer> minConnectionsToSubscribe = Observable.just(1);

        public Builder<K, V> name(String str) {
            if (str != null && str.length() > 127) {
                throw new IllegalArgumentException("Observable name must be less than 127 characters");
            }
            this.name = str;
            return this;
        }

        public Builder<K, V> observable(Observable<Observable<GroupedObservable<String, V>>> observable) {
            this.observable = observable;
            return this;
        }

        public Builder<K, V> maxWriteAttempts(int i) {
            this.maxWriteAttempts = i;
            return this;
        }

        public Builder<K, V> withExpirySecs(long j) {
            this.expiryTimeInSecs = j;
            return this;
        }

        public Builder<K, V> minConnectionsToSubscribe(Observable<Integer> observable) {
            this.minConnectionsToSubscribe = observable;
            return this;
        }

        public Builder<K, V> slottingStrategy(SlottingStrategy<Group<String, V>> slottingStrategy) {
            this.slottingStrategy = slottingStrategy;
            return this;
        }

        public Builder<K, V> keyEncoder(Encoder<String> encoder) {
            this.keyEncoder = encoder;
            return this;
        }

        public Builder<K, V> valueEncoder(Encoder<V> encoder) {
            this.valueEncoder = encoder;
            return this;
        }

        public Builder<K, V> serverSideFilter(Func1<Map<String, String>, Func1<K, Boolean>> func1) {
            this.filterFunction = func1;
            return this;
        }

        public ServeGroupedObservable<K, V> build() {
            return new ServeGroupedObservable<>(this);
        }
    }

    public ServeGroupedObservable(Builder<K, V> builder) {
        super(((Builder) builder).name, ((Builder) builder).slottingStrategy, ((Builder) builder).filterFunction, ((Builder) builder).maxWriteAttempts);
        this.groupBufferTimeMSec = 250;
        this.expiryInSecs = Long.MAX_VALUE;
        String stringValue = ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantis.remoteObservable.groupBufferMSec", "250");
        if (stringValue != null && !stringValue.equals("250")) {
            this.groupBufferTimeMSec = Integer.parseInt(stringValue);
        }
        this.keyEncoder = ((Builder) builder).keyEncoder;
        this.valueEncoder = ((Builder) builder).valueEncoder;
        this.expiryInSecs = ((Builder) builder).expiryTimeInSecs;
        this.groupsExpiredCounter = MetricsRegistry.getInstance().registerAndGet(new Metrics.Builder().name("ServeGroupedObservable").addCounter("groupsExpiredCounter").build()).getCounter("groupsExpiredCounter");
        applySlottingSideEffectToObservable(((Builder) builder).observable, ((Builder) builder).minConnectionsToSubscribe);
    }

    private void applySlottingSideEffectToObservable(Observable<Observable<GroupedObservable<String, V>>> observable, Observable<Integer> observable2) {
        final AtomicInteger atomicInteger = new AtomicInteger();
        observable2.subscribe(new Action1<Integer>() { // from class: io.reactivex.mantis.remote.observable.ServeGroupedObservable.1
            public void call(Integer num) {
                atomicInteger.set(num.intValue());
            }
        });
        final Observable doOnEach = Observable.merge(observable.map(new AnonymousClass2())).doOnEach(new Observer<List<Group<String, V>>>() { // from class: io.reactivex.mantis.remote.observable.ServeGroupedObservable.3
            public void onCompleted() {
                ServeGroupedObservable.this.slottingStrategy.completeAllConnections();
            }

            public void onError(Throwable th) {
                th.printStackTrace();
                ServeGroupedObservable.this.slottingStrategy.errorAllConnections(th);
            }

            public void onNext(List<Group<String, V>> list) {
                for (Group<String, V> group : list) {
                    ServeGroupedObservable.this.slottingStrategy.writeOnSlot(group.getKeyBytes(), group);
                }
            }
        });
        final MutableReference mutableReference = new MutableReference();
        final AtomicInteger atomicInteger2 = new AtomicInteger(0);
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        this.slottingStrategy.registerDoOnEachConnectionAdded(new Action0() { // from class: io.reactivex.mantis.remote.observable.ServeGroupedObservable.4
            public void call() {
                Integer valueOf = Integer.valueOf(atomicInteger.get());
                Integer valueOf2 = Integer.valueOf(atomicInteger2.incrementAndGet());
                if (valueOf2.intValue() < valueOf.intValue()) {
                    ServeGroupedObservable.logger.info("MinConnectionsToSubscribe: " + valueOf + ", has NOT been met, current connection count: " + valueOf2);
                } else if (atomicBoolean.compareAndSet(false, true)) {
                    ServeGroupedObservable.logger.info("MinConnectionsToSubscribe: " + valueOf + ", has been met, subscribing to observable, current connection count: " + valueOf2);
                    mutableReference.setValue(doOnEach.subscribe());
                }
            }
        });
        this.slottingStrategy.registerDoAfterLastConnectionRemoved(new Action0() { // from class: io.reactivex.mantis.remote.observable.ServeGroupedObservable.5
            public void call() {
                ((Subscription) mutableReference.getValue()).unsubscribe();
                ServeGroupedObservable.logger.info("All connections deregistered, unsubscribed to observable, resetting current connection count: 0");
                atomicInteger2.set(0);
                atomicBoolean.set(false);
            }
        });
    }

    public Encoder<String> getKeyEncoder() {
        return this.keyEncoder;
    }

    public Encoder<V> getValueEncoder() {
        return this.valueEncoder;
    }
}
