package io.reactivex.mantis.network.push;

import io.mantisrx.common.MantisGroup;
import io.mantisrx.common.metrics.Gauge;
import io.mantisrx.common.metrics.Metrics;
import io.reactivx.mantis.operators.DisableBackPressureOperator;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.observables.GroupedObservable;
import rx.schedulers.Schedulers;

/* loaded from: input_file:io/reactivex/mantis/network/push/ObservableTrigger.class */
public final class ObservableTrigger {
    private static final Logger logger = LoggerFactory.getLogger(ObservableTrigger.class);

    private ObservableTrigger() {
    }

    private static <T> PushTrigger<T> trigger(final String str, final Observable<T> observable, final Action0 action0, final Action1<Throwable> action1) {
        final AtomicReference atomicReference = new AtomicReference();
        Metrics build = new Metrics.Builder().name("ObservableTrigger_" + str).addGauge("subscriptionActive").build();
        final Gauge gauge = build.getGauge("subscriptionActive");
        return new PushTrigger<>(new Action1<MonitoredQueue<T>>() { // from class: io.reactivex.mantis.network.push.ObservableTrigger.1
            public void call(MonitoredQueue<T> monitoredQueue) {
                AtomicReference atomicReference2 = atomicReference;
                Observable filter = observable.filter(obj -> {
                    return Boolean.valueOf(obj != null);
                });
                String str2 = str;
                Gauge gauge2 = gauge;
                Observable doOnSubscribe = filter.doOnSubscribe(() -> {
                    ObservableTrigger.logger.info("Subscription is ACTIVE for observable trigger with name: " + str2);
                    gauge2.set(1L);
                });
                String str3 = str;
                Gauge gauge3 = gauge;
                Observable doOnUnsubscribe = doOnSubscribe.doOnUnsubscribe(() -> {
                    ObservableTrigger.logger.info("Subscription is INACTIVE for observable trigger with name: " + str3);
                    gauge3.set(0L);
                });
                Action1 action12 = obj2 -> {
                    monitoredQueue.write(obj2);
                };
                String str4 = str;
                Action1 action13 = action1;
                Action1 action14 = th -> {
                    ObservableTrigger.logger.warn("Observable used to push data errored, on server with name: " + str4, th);
                    if (action13 != null) {
                        action13.call(th);
                    }
                };
                String str5 = str;
                Action0 action02 = action0;
                atomicReference2.set(doOnUnsubscribe.subscribe(action12, action14, () -> {
                    ObservableTrigger.logger.info("Observable used to push data completed, on server with name: " + str5);
                    if (action02 != null) {
                        action02.call();
                    }
                }));
            }
        }, new Action1<MonitoredQueue<T>>() { // from class: io.reactivex.mantis.network.push.ObservableTrigger.2
            public void call(MonitoredQueue<T> monitoredQueue) {
                if (atomicReference.get() != null) {
                    ObservableTrigger.logger.warn("Connections from next stage has dropped to 0. Do not propagate unsubscribe");
                }
            }
        }, build);
    }

    private static <T> PushTrigger<T> ssetrigger(final String str, final Observable<T> observable, final Action0 action0, final Action1<Throwable> action1) {
        final AtomicReference atomicReference = new AtomicReference();
        Metrics build = new Metrics.Builder().name("ObservableTrigger_" + str).addGauge("subscriptionActive").build();
        final Gauge gauge = build.getGauge("subscriptionActive");
        return new PushTrigger<>(new Action1<MonitoredQueue<T>>() { // from class: io.reactivex.mantis.network.push.ObservableTrigger.3
            public void call(MonitoredQueue<T> monitoredQueue) {
                AtomicReference atomicReference2 = atomicReference;
                Observable filter = observable.filter(obj -> {
                    return Boolean.valueOf(obj != null);
                });
                String str2 = str;
                Gauge gauge2 = gauge;
                Observable doOnSubscribe = filter.doOnSubscribe(() -> {
                    ObservableTrigger.logger.info("Subscription is ACTIVE for observable trigger with name: " + str2);
                    gauge2.set(1L);
                });
                String str3 = str;
                Gauge gauge3 = gauge;
                Observable doOnUnsubscribe = doOnSubscribe.doOnUnsubscribe(() -> {
                    ObservableTrigger.logger.info("Subscription is INACTIVE for observable trigger with name: " + str3);
                    gauge3.set(0L);
                });
                Action1 action12 = obj2 -> {
                    monitoredQueue.write(obj2);
                };
                String str4 = str;
                Action1 action13 = action1;
                Action1 action14 = th -> {
                    ObservableTrigger.logger.warn("Observable used to push data errored, on server with name: " + str4, th);
                    if (action13 != null) {
                        action13.call(th);
                    }
                };
                String str5 = str;
                Action0 action02 = action0;
                atomicReference2.set(doOnUnsubscribe.subscribe(action12, action14, () -> {
                    ObservableTrigger.logger.info("Observable used to push data completed, on server with name: " + str5);
                    if (action02 != null) {
                        action02.call();
                    }
                }));
            }
        }, new Action1<MonitoredQueue<T>>() { // from class: io.reactivex.mantis.network.push.ObservableTrigger.4
            public void call(MonitoredQueue<T> monitoredQueue) {
                if (atomicReference.get() != null) {
                    ObservableTrigger.logger.warn("Connections from next stage has dropped to 0 for SSE stage. propagate unsubscribe");
                    ((Subscription) atomicReference.get()).unsubscribe();
                }
            }
        }, build);
    }

    private static <K, V> PushTrigger<KeyValuePair<K, V>> groupTrigger(final String str, final Observable<GroupedObservable<K, V>> observable, final Action0 action0, final Action1<Throwable> action1, final long j, final Func1<K, byte[]> func1, final HashFunction hashFunction) {
        final AtomicReference atomicReference = new AtomicReference();
        Metrics build = new Metrics.Builder().name("ObservableTrigger_" + str).addGauge("subscriptionActive").build();
        final Gauge gauge = build.getGauge("subscriptionActive");
        return new PushTrigger<>(new Action1<MonitoredQueue<KeyValuePair<K, V>>>() { // from class: io.reactivex.mantis.network.push.ObservableTrigger.5
            public void call(MonitoredQueue<KeyValuePair<K, V>> monitoredQueue) {
                AtomicReference atomicReference2 = atomicReference;
                Observable observeOn = observable.observeOn(Schedulers.computation());
                String str2 = str;
                Gauge gauge2 = gauge;
                Observable doOnSubscribe = observeOn.doOnSubscribe(() -> {
                    ObservableTrigger.logger.info("Subscription is ACTIVE for observable trigger with name: " + str2);
                    gauge2.set(1L);
                });
                String str3 = str;
                Gauge gauge3 = gauge;
                Observable doOnUnsubscribe = doOnSubscribe.doOnUnsubscribe(() -> {
                    ObservableTrigger.logger.info("Subscription is INACTIVE for observable trigger with name: " + str3);
                    gauge3.set(0L);
                });
                Func1 func12 = func1;
                HashFunction hashFunction2 = hashFunction;
                long j2 = j;
                Observable flatMap = doOnUnsubscribe.flatMap(groupedObservable -> {
                    byte[] bArr = (byte[]) func12.call(groupedObservable.getKey());
                    long computeHash = hashFunction2.computeHash(bArr);
                    return groupedObservable.timeout(j2, TimeUnit.SECONDS, Observable.empty()).lift(new DisableBackPressureOperator()).buffer(250L, TimeUnit.MILLISECONDS).filter(list -> {
                        return Boolean.valueOf((list == null || list.isEmpty()) ? false : true);
                    }).map(list2 -> {
                        ArrayList arrayList = new ArrayList(list2.size());
                        Iterator it = list2.iterator();
                        while (it.hasNext()) {
                            arrayList.add(new KeyValuePair(computeHash, bArr, it.next()));
                        }
                        return arrayList;
                    });
                });
                Action1 action12 = list -> {
                    Iterator it = list.iterator();
                    while (it.hasNext()) {
                        monitoredQueue.write((KeyValuePair) it.next());
                    }
                };
                String str4 = str;
                Action1 action13 = action1;
                Action1 action14 = th -> {
                    ObservableTrigger.logger.warn("Observable used to push data errored, on server with name: " + str4, th);
                    if (action13 != null) {
                        action13.call(th);
                    }
                };
                String str5 = str;
                Action0 action02 = action0;
                atomicReference2.set(flatMap.subscribe(action12, action14, () -> {
                    ObservableTrigger.logger.info("Observable used to push data completed, on server with name: " + str5);
                    if (action02 != null) {
                        action02.call();
                    }
                }));
            }
        }, new Action1<MonitoredQueue<KeyValuePair<K, V>>>() { // from class: io.reactivex.mantis.network.push.ObservableTrigger.6
            public void call(MonitoredQueue<KeyValuePair<K, V>> monitoredQueue) {
                if (atomicReference.get() != null) {
                    ObservableTrigger.logger.warn("Connections from next stage has dropped to 0. Do not propagate unsubscribe");
                }
            }
        }, build);
    }

    private static <K, V> PushTrigger<KeyValuePair<K, V>> mantisGroupTrigger(final String str, final Observable<MantisGroup<K, V>> observable, final Action0 action0, final Action1<Throwable> action1, long j, final Func1<K, byte[]> func1, final HashFunction hashFunction) {
        final AtomicReference atomicReference = new AtomicReference();
        Metrics build = new Metrics.Builder().name("ObservableTrigger_" + str).addGauge("subscriptionActive").build();
        final Gauge gauge = build.getGauge("subscriptionActive");
        return new PushTrigger<>(new Action1<MonitoredQueue<KeyValuePair<K, V>>>() { // from class: io.reactivex.mantis.network.push.ObservableTrigger.7
            public void call(MonitoredQueue<KeyValuePair<K, V>> monitoredQueue) {
                AtomicReference atomicReference2 = atomicReference;
                Observable observable2 = observable;
                String str2 = str;
                Gauge gauge2 = gauge;
                Observable doOnSubscribe = observable2.doOnSubscribe(() -> {
                    ObservableTrigger.logger.info("Subscription is ACTIVE for observable trigger with name: " + str2);
                    gauge2.set(1L);
                });
                String str3 = str;
                Gauge gauge3 = gauge;
                Observable doOnUnsubscribe = doOnSubscribe.doOnUnsubscribe(() -> {
                    ObservableTrigger.logger.info("Subscription is INACTIVE for observable trigger with name: " + str3);
                    gauge3.set(0L);
                });
                Func1 func12 = func1;
                HashFunction hashFunction2 = hashFunction;
                Observable map = doOnUnsubscribe.map(mantisGroup -> {
                    byte[] bArr = (byte[]) func12.call(mantisGroup.getKeyValue());
                    return new KeyValuePair(hashFunction2.computeHash(bArr), bArr, mantisGroup.getValue());
                });
                Action1 action12 = keyValuePair -> {
                    monitoredQueue.write(keyValuePair);
                };
                String str4 = str;
                Action1 action13 = action1;
                Action1 action14 = th -> {
                    ObservableTrigger.logger.warn("Observable used to push data errored, on server with name: " + str4, th);
                    if (action13 != null) {
                        action13.call(th);
                    }
                };
                String str5 = str;
                Action0 action02 = action0;
                atomicReference2.set(map.subscribe(action12, action14, () -> {
                    ObservableTrigger.logger.info("Observable used to push data completed, on server with name: " + str5);
                    if (action02 != null) {
                        action02.call();
                    }
                }));
            }
        }, new Action1<MonitoredQueue<KeyValuePair<K, V>>>() { // from class: io.reactivex.mantis.network.push.ObservableTrigger.8
            public void call(MonitoredQueue<KeyValuePair<K, V>> monitoredQueue) {
                if (atomicReference.get() != null) {
                    ObservableTrigger.logger.warn("Connections from next stage has dropped to 0. Do not propagate unsubscribe");
                }
            }
        }, build);
    }

    public static <T> PushTrigger<T> o(String str, Observable<T> observable, Action0 action0, Action1<Throwable> action1) {
        return ssetrigger(str, observable, action0, action1);
    }

    public static <T> PushTrigger<T> oo(String str, Observable<Observable<T>> observable, Action0 action0, Action1<Throwable> action1) {
        return trigger(str, Observable.merge(observable), action0, action1);
    }

    public static <K, V> PushTrigger<KeyValuePair<K, V>> oogo(String str, Observable<Observable<GroupedObservable<K, V>>> observable, Action0 action0, Action1<Throwable> action1, long j, Func1<K, byte[]> func1, HashFunction hashFunction) {
        return groupTrigger(str, Observable.merge(observable), action0, action1, j, func1, hashFunction);
    }

    public static <K, V> PushTrigger<KeyValuePair<K, V>> oomgo(String str, Observable<Observable<MantisGroup<K, V>>> observable, Action0 action0, Action1<Throwable> action1, long j, Func1<K, byte[]> func1, HashFunction hashFunction) {
        return mantisGroupTrigger(str, Observable.merge(observable), action0, action1, j, func1, hashFunction);
    }
}
