package io.reactivex.mantis.remote.observable;

import io.mantisrx.common.MantisGroup;
import io.mantisrx.common.metrics.Gauge;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.network.Endpoint;
import io.reactivex.mantis.remote.observable.ConnectToGroupedObservable;
import io.reactivex.mantis.remote.observable.ConnectToObservable;
import io.reactivex.mantis.remote.observable.EndpointChange;
import io.reactivex.mantis.remote.observable.reconciliator.ConnectionSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.jctools.queues.SpscArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func3;
import rx.observables.GroupedObservable;
import rx.subjects.PublishSubject;

/* loaded from: input_file:io/reactivex/mantis/remote/observable/DynamicConnectionSet.class */
public class DynamicConnectionSet<T> implements ConnectionSet<T> {
    private static final Logger logger = LoggerFactory.getLogger(DynamicConnectionSet.class);
    private static final SpscArrayQueue<MantisGroup<?, ?>> inputQueue = new SpscArrayQueue<>(1000);
    private static int MIN_TIME_SEC_DEFAULT = 1;
    private static int MAX_TIME_SEC_DEFAULT = 10;
    private EndpointInjector endpointInjector;
    private PublishSubject<EndpointChange> reconciliatorConnector;
    private Func3<Endpoint, Action0, PublishSubject<Integer>, RemoteRxConnection<T>> toObservableFunc;
    private Metrics connectionMetrics;
    private PublishSubject<Set<Endpoint>> activeConnectionsSubject;
    private Lock activeConnectionsLock;
    private Map<String, Endpoint> currentActiveConnections;
    private int minTimeoutOnUnexpectedTerminateSec;
    private int maxTimeoutOnUnexpectedTerminateSec;
    private Gauge activeConnectionsGauge;
    private Gauge closedConnections;
    private Gauge forceCompletedConnections;
    private Random random;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.reactivex.mantis.remote.observable.DynamicConnectionSet$4, reason: invalid class name */
    /* loaded from: input_file:io/reactivex/mantis/remote/observable/DynamicConnectionSet$4.class */
    public class AnonymousClass4 implements Func1<GroupedObservable<String, EndpointChange>, Observable<Observable<T>>> {

        /* JADX INFO: Access modifiers changed from: package-private */
        /* renamed from: io.reactivex.mantis.remote.observable.DynamicConnectionSet$4$1, reason: invalid class name */
        /* loaded from: input_file:io/reactivex/mantis/remote/observable/DynamicConnectionSet$4$1.class */
        public class AnonymousClass1 implements Func1<EndpointChange, Observable<T>> {
            final /* synthetic */ GroupedObservable val$group;
            final /* synthetic */ PublishSubject val$closeConnectionTrigger;

            AnonymousClass1(GroupedObservable groupedObservable, PublishSubject publishSubject) {
                this.val$group = groupedObservable;
                this.val$closeConnectionTrigger = publishSubject;
            }

            public Observable<T> call(final EndpointChange endpointChange) {
                DynamicConnectionSet.logger.info("Received add request, adding connection to active set, " + endpointChange.getEndpoint().getHost() + " port: " + endpointChange.getEndpoint().getPort() + ", with client id: " + endpointChange.getEndpoint().getSlotId());
                DynamicConnectionSet.this.addConnection((String) this.val$group.getKey(), endpointChange.getEndpoint());
                return ((RemoteRxConnection) DynamicConnectionSet.this.toObservableFunc.call(endpointChange.getEndpoint(), new Action0() { // from class: io.reactivex.mantis.remote.observable.DynamicConnectionSet.4.1.1
                    public void call() {
                        int nextInt = DynamicConnectionSet.this.random.nextInt((DynamicConnectionSet.this.maxTimeoutOnUnexpectedTerminateSec - DynamicConnectionSet.this.minTimeoutOnUnexpectedTerminateSec) + 1) + DynamicConnectionSet.this.minTimeoutOnUnexpectedTerminateSec;
                        DynamicConnectionSet.logger.info("Connection disconnected, waiting " + nextInt + " seconds before removing from active set of connections: " + endpointChange);
                        Observable.timer(nextInt, TimeUnit.SECONDS).doOnCompleted(new Action0() { // from class: io.reactivex.mantis.remote.observable.DynamicConnectionSet.4.1.1.1
                            public void call() {
                                DynamicConnectionSet.logger.warn("Removing connection from active set, " + endpointChange);
                                DynamicConnectionSet.this.closedConnections.increment();
                                DynamicConnectionSet.this.removeConnection((String) AnonymousClass1.this.val$group.getKey(), endpointChange.getEndpoint());
                            }
                        }).subscribe();
                    }
                }, this.val$closeConnectionTrigger)).getObservable().doOnCompleted(endpointChange.getEndpoint().getCompletedCallback()).doOnError(endpointChange.getEndpoint().getErrorCallback());
            }
        }

        AnonymousClass4() {
        }

        public Observable<Observable<T>> call(final GroupedObservable<String, EndpointChange> groupedObservable) {
            final PublishSubject create = PublishSubject.create();
            return groupedObservable.doOnNext(new Action1<EndpointChange>() { // from class: io.reactivex.mantis.remote.observable.DynamicConnectionSet.4.3
                public void call(EndpointChange endpointChange) {
                    if (EndpointChange.Type.complete == endpointChange.getType() && DynamicConnectionSet.this.activeConnectionsContains((String) groupedObservable.getKey(), endpointChange.getEndpoint())) {
                        DynamicConnectionSet.logger.info("Received complete request, removing connection from active set, " + endpointChange.getEndpoint().getHost() + " port: " + endpointChange.getEndpoint().getPort() + " id: " + endpointChange.getEndpoint().getSlotId());
                        DynamicConnectionSet.this.forceCompletedConnections.increment();
                        DynamicConnectionSet.this.removeConnection((String) groupedObservable.getKey(), endpointChange.getEndpoint());
                        create.onNext(1);
                    }
                }
            }).filter(new Func1<EndpointChange, Boolean>() { // from class: io.reactivex.mantis.remote.observable.DynamicConnectionSet.4.2
                public Boolean call(EndpointChange endpointChange) {
                    boolean activeConnectionsContains = DynamicConnectionSet.this.activeConnectionsContains((String) groupedObservable.getKey(), endpointChange.getEndpoint());
                    if (activeConnectionsContains) {
                        DynamicConnectionSet.logger.info("Skipping latent add for endpoint, already in active set: " + endpointChange);
                    }
                    return Boolean.valueOf(EndpointChange.Type.add == endpointChange.getType() && !activeConnectionsContains);
                }
            }).map(new AnonymousClass1(groupedObservable, create));
        }
    }

    public DynamicConnectionSet(Func3<Endpoint, Action0, PublishSubject<Integer>, RemoteRxConnection<T>> func3, int i, int i2) {
        this.reconciliatorConnector = PublishSubject.create();
        this.activeConnectionsSubject = PublishSubject.create();
        this.activeConnectionsLock = new ReentrantLock();
        this.currentActiveConnections = new HashMap();
        this.random = new Random();
        this.toObservableFunc = func3;
        this.connectionMetrics = new Metrics.Builder().name("DynamicConnectionSet").addGauge("activeConnections").addGauge("closedConnections").addGauge("forceCompletedConnections").build();
        this.activeConnectionsGauge = this.connectionMetrics.getGauge("activeConnections");
        this.closedConnections = this.connectionMetrics.getGauge("closedConnections");
        this.forceCompletedConnections = this.connectionMetrics.getGauge("forceCompletedConnections");
        this.minTimeoutOnUnexpectedTerminateSec = i;
        this.maxTimeoutOnUnexpectedTerminateSec = i2;
    }

    public DynamicConnectionSet(Func3<Endpoint, Action0, PublishSubject<Integer>, RemoteRxConnection<T>> func3) {
        this(func3, MIN_TIME_SEC_DEFAULT, MAX_TIME_SEC_DEFAULT);
    }

    public static <K, V> DynamicConnectionSet<GroupedObservable<K, V>> create(final ConnectToGroupedObservable.Builder<K, V> builder, int i) {
        return new DynamicConnectionSet<>(new Func3<Endpoint, Action0, PublishSubject<Integer>, RemoteRxConnection<GroupedObservable<K, V>>>() { // from class: io.reactivex.mantis.remote.observable.DynamicConnectionSet.1
            public RemoteRxConnection<GroupedObservable<K, V>> call(Endpoint endpoint, Action0 action0, PublishSubject<Integer> publishSubject) {
                ConnectToGroupedObservable.Builder builder2 = new ConnectToGroupedObservable.Builder(ConnectToGroupedObservable.Builder.this);
                builder2.host(endpoint.getHost()).port(endpoint.getPort()).closeTrigger(publishSubject).connectionDisconnectCallback(action0).slotId(endpoint.getSlotId());
                return RemoteObservable.connect(builder2.build());
            }
        }, MIN_TIME_SEC_DEFAULT, i);
    }

    public static <K, V> DynamicConnectionSet<MantisGroup<K, V>> createMGO(final ConnectToGroupedObservable.Builder<K, V> builder, int i, final SpscArrayQueue<MantisGroup<?, ?>> spscArrayQueue) {
        return new DynamicConnectionSet<>(new Func3<Endpoint, Action0, PublishSubject<Integer>, RemoteRxConnection<MantisGroup<K, V>>>() { // from class: io.reactivex.mantis.remote.observable.DynamicConnectionSet.2
            public RemoteRxConnection<MantisGroup<K, V>> call(Endpoint endpoint, Action0 action0, PublishSubject<Integer> publishSubject) {
                ConnectToGroupedObservable.Builder builder2 = new ConnectToGroupedObservable.Builder(ConnectToGroupedObservable.Builder.this);
                builder2.host(endpoint.getHost()).port(endpoint.getPort()).closeTrigger(publishSubject).connectionDisconnectCallback(action0).slotId(endpoint.getSlotId());
                return RemoteObservable.connectToMGO(builder2.build(), spscArrayQueue);
            }
        }, MIN_TIME_SEC_DEFAULT, i);
    }

    public static <K, V> DynamicConnectionSet<GroupedObservable<K, V>> create(ConnectToGroupedObservable.Builder<K, V> builder) {
        return create(builder, MAX_TIME_SEC_DEFAULT);
    }

    public static <K, V> DynamicConnectionSet<MantisGroup<K, V>> createMGO(ConnectToGroupedObservable.Builder<K, V> builder) {
        return createMGO(builder, MAX_TIME_SEC_DEFAULT, inputQueue);
    }

    public static <T> DynamicConnectionSet<T> create(final ConnectToObservable.Builder<T> builder, int i) {
        return new DynamicConnectionSet<>(new Func3<Endpoint, Action0, PublishSubject<Integer>, RemoteRxConnection<T>>() { // from class: io.reactivex.mantis.remote.observable.DynamicConnectionSet.3
            public RemoteRxConnection<T> call(Endpoint endpoint, Action0 action0, PublishSubject<Integer> publishSubject) {
                ConnectToObservable.Builder builder2 = new ConnectToObservable.Builder(ConnectToObservable.Builder.this);
                builder2.host(endpoint.getHost()).port(endpoint.getPort()).closeTrigger(publishSubject).connectionDisconnectCallback(action0).slotId(endpoint.getSlotId());
                return RemoteObservable.connect(builder2.build());
            }
        }, MIN_TIME_SEC_DEFAULT, i);
    }

    public static <T> DynamicConnectionSet<T> create(ConnectToObservable.Builder<T> builder) {
        return create(builder, MAX_TIME_SEC_DEFAULT);
    }

    public void setEndpointInjector(EndpointInjector endpointInjector) {
        this.endpointInjector = endpointInjector;
    }

    public Observer<EndpointChange> reconciliatorObserver() {
        return this.reconciliatorConnector;
    }

    public Metrics getConnectionMetrics() {
        return this.connectionMetrics;
    }

    @Override // io.reactivex.mantis.remote.observable.reconciliator.ConnectionSet
    public Observable<Observable<T>> observables() {
        return this.endpointInjector.deltas().doOnCompleted(() -> {
            logger.info("onComplete on injector deltas");
        }).doOnError(th -> {
            logger.error("caught unexpected error {}", th.getMessage(), th);
        }).doOnSubscribe(new Action0() { // from class: io.reactivex.mantis.remote.observable.DynamicConnectionSet.6
            public void call() {
                DynamicConnectionSet.logger.info("Subscribing, clearing active connection set");
                DynamicConnectionSet.this.resetActiveConnections();
            }
        }).groupBy(new Func1<EndpointChange, String>() { // from class: io.reactivex.mantis.remote.observable.DynamicConnectionSet.5
            public String call(EndpointChange endpointChange) {
                return Endpoint.uniqueHost(endpointChange.getEndpoint().getHost(), endpointChange.getEndpoint().getPort(), endpointChange.getEndpoint().getSlotId());
            }
        }).flatMap(new AnonymousClass4());
    }

    @Override // io.reactivex.mantis.remote.observable.reconciliator.ConnectionSet
    public Observable<Set<Endpoint>> activeConnections() {
        return this.activeConnectionsSubject;
    }

    public boolean activeConnectionsContains(String str, Endpoint endpoint) {
        try {
            this.activeConnectionsLock.lock();
            boolean containsKey = this.currentActiveConnections.containsKey(str);
            this.activeConnectionsLock.unlock();
            return containsKey;
        } catch (Throwable th) {
            this.activeConnectionsLock.unlock();
            throw th;
        }
    }

    public void resetActiveConnections() {
        try {
            this.activeConnectionsLock.lock();
            this.currentActiveConnections.clear();
            this.activeConnectionsGauge.set(0L);
            this.activeConnectionsSubject.onNext(new HashSet());
        } finally {
            this.activeConnectionsLock.unlock();
        }
    }

    public void addConnection(String str, Endpoint endpoint) {
        try {
            this.activeConnectionsLock.lock();
            if (!this.currentActiveConnections.containsKey(str)) {
                this.currentActiveConnections.put(str, new Endpoint(endpoint.getHost(), endpoint.getPort(), endpoint.getSlotId(), endpoint.getCompletedCallback(), endpoint.getErrorCallback()));
                this.activeConnectionsGauge.increment();
                this.activeConnectionsSubject.onNext(new HashSet(this.currentActiveConnections.values()));
            }
        } finally {
            this.activeConnectionsLock.unlock();
        }
    }

    public void removeConnection(String str, Endpoint endpoint) {
        try {
            this.activeConnectionsLock.lock();
            if (this.currentActiveConnections.containsKey(str)) {
                this.currentActiveConnections.remove(str);
                this.activeConnectionsGauge.decrement();
                this.activeConnectionsSubject.onNext(new HashSet(this.currentActiveConnections.values()));
            }
        } finally {
            this.activeConnectionsLock.unlock();
        }
    }
}
