package io.reactivex.mantis.remote.observable.reconciliator;

import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Gauge;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.network.Endpoint;
import io.reactivex.mantis.remote.observable.DynamicConnectionSet;
import io.reactivex.mantis.remote.observable.EndpointChange;
import io.reactivex.mantis.remote.observable.EndpointInjector;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.subjects.PublishSubject;

/* loaded from: input_file:io/reactivex/mantis/remote/observable/reconciliator/Reconciliator.class */
public class Reconciliator<T> {
    private static final Logger logger = LoggerFactory.getLogger(Reconciliator.class);
    private static final AtomicBoolean startedReconciliation = new AtomicBoolean(false);
    private String name;
    private Subscription subscription;
    private DynamicConnectionSet<T> connectionSet;
    private EndpointInjector injector;
    private Metrics metrics;
    private Counter reconciliationCheck;
    private Gauge running;
    private Gauge expectedSetSize;
    private PublishSubject<Set<Endpoint>> currentExpectedSet = PublishSubject.create();
    private PublishSubject<EndpointChange> reconciledChanges = PublishSubject.create();

    /* loaded from: input_file:io/reactivex/mantis/remote/observable/reconciliator/Reconciliator$Builder.class */
    public static class Builder<T> {
        private String name;
        private EndpointInjector injector;
        private DynamicConnectionSet<T> connectionSet;

        public Builder<T> connectionSet(DynamicConnectionSet<T> dynamicConnectionSet) {
            this.connectionSet = dynamicConnectionSet;
            return this;
        }

        public Builder<T> name(String str) {
            this.name = str;
            return this;
        }

        public Builder<T> injector(EndpointInjector endpointInjector) {
            this.injector = endpointInjector;
            return this;
        }

        public Reconciliator<T> build() {
            return new Reconciliator<>(this);
        }
    }

    Reconciliator(Builder<T> builder) {
        this.name = ((Builder) builder).name;
        this.injector = ((Builder) builder).injector;
        this.connectionSet = ((Builder) builder).connectionSet;
        this.metrics = new Metrics.Builder().name("Reconciliator_" + this.name).addCounter("reconciliationCheck").addGauge("expectedSetSize").addGauge("running").build();
        this.reconciliationCheck = this.metrics.getCounter("reconciliationCheck");
        this.running = this.metrics.getGauge("running");
        this.expectedSetSize = this.metrics.getGauge("expectedSetSize");
    }

    public Metrics getMetrics() {
        return this.metrics;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Observable<EndpointChange> deltas() {
        final HashMap hashMap = new HashMap();
        final PublishSubject create = PublishSubject.create();
        return Observable.merge(this.reconciledChanges.takeUntil(create).doOnCompleted(() -> {
            logger.info("onComplete triggered for reconciledChanges");
        }).doOnError(th -> {
            logger.error("caught exception for reconciledChanges {}", th.getMessage(), th);
        }), this.injector.deltas().doOnCompleted(new Action0() { // from class: io.reactivex.mantis.remote.observable.reconciliator.Reconciliator.4
            public void call() {
                Reconciliator.logger.info("Stopping reconciliator, injector completed.");
                create.onNext(1);
                Reconciliator.this.stopReconciliation();
            }
        }).doOnError(th2 -> {
            logger.error("caught exception for injector deltas {}", th2.getMessage(), th2);
        }).doOnNext(new Action1<EndpointChange>() { // from class: io.reactivex.mantis.remote.observable.reconciliator.Reconciliator.3
            public void call(EndpointChange endpointChange) {
                String uniqueHost = Endpoint.uniqueHost(endpointChange.getEndpoint().getHost(), endpointChange.getEndpoint().getPort(), endpointChange.getEndpoint().getSlotId());
                if (hashMap.containsKey(uniqueHost)) {
                    if (endpointChange.getType() == EndpointChange.Type.complete) {
                        Reconciliator.this.expectedSetSize.decrement();
                        hashMap.remove(uniqueHost);
                        Reconciliator.this.currentExpectedSet.onNext(new HashSet(hashMap.values()));
                        return;
                    }
                    return;
                }
                if (endpointChange.getType() == EndpointChange.Type.add) {
                    Reconciliator.this.expectedSetSize.increment();
                    hashMap.put(uniqueHost, new Endpoint(endpointChange.getEndpoint().getHost(), endpointChange.getEndpoint().getPort(), endpointChange.getEndpoint().getSlotId()));
                    Reconciliator.this.currentExpectedSet.onNext(new HashSet(hashMap.values()));
                }
            }
        })).doOnError(th3 -> {
            logger.error("caught error processing reconciliator deltas {}", th3.getMessage(), th3);
        }).doOnSubscribe(new Action0() { // from class: io.reactivex.mantis.remote.observable.reconciliator.Reconciliator.2
            public void call() {
                Reconciliator.logger.info("Subscribed to deltas for {}, clearing active connection set", Reconciliator.this.name);
                Reconciliator.this.connectionSet.resetActiveConnections();
                Reconciliator.this.startReconciliation();
            }
        }).doOnUnsubscribe(new Action0() { // from class: io.reactivex.mantis.remote.observable.reconciliator.Reconciliator.1
            public void call() {
                Reconciliator.logger.info("Unsubscribed from deltas for {}", Reconciliator.this.name);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void startReconciliation() {
        if (!startedReconciliation.compareAndSet(false, true)) {
            logger.info("reconciliation already started for {}", this.name);
            return;
        }
        logger.info("Starting reconciliation for name: " + this.name);
        this.running.increment();
        this.subscription = Observable.combineLatest(this.currentExpectedSet, this.connectionSet.activeConnections(), new Func2<Set<Endpoint>, Set<Endpoint>, Void>() { // from class: io.reactivex.mantis.remote.observable.reconciliator.Reconciliator.7
            public Void call(Set<Endpoint> set, Set<Endpoint> set2) {
                Reconciliator.this.reconciliationCheck.increment();
                boolean equals = set.equals(set2);
                Reconciliator.logger.debug("Check result: " + equals + ", size expected: " + set.size() + " actual: " + set2.size() + ", for values expected: " + set + " versus actual: " + set2);
                if (equals) {
                    return null;
                }
                HashSet<Endpoint> hashSet = new HashSet(set);
                hashSet.removeAll(set2);
                if (hashSet.size() > 0) {
                    for (Endpoint endpoint : hashSet) {
                        Reconciliator.logger.info("Connection missing from expected set, adding missing connection: " + endpoint);
                        Reconciliator.this.reconciledChanges.onNext(new EndpointChange(EndpointChange.Type.add, endpoint));
                    }
                }
                HashSet<Endpoint> hashSet2 = new HashSet(set2);
                hashSet2.removeAll(set);
                if (hashSet2.size() <= 0) {
                    return null;
                }
                for (Endpoint endpoint2 : hashSet2) {
                    Reconciliator.logger.info("Unexpected connection in active set, removing connection: " + endpoint2);
                    Reconciliator.this.reconciledChanges.onNext(new EndpointChange(EndpointChange.Type.complete, endpoint2));
                }
                return null;
            }
        }).onErrorResumeNext(new Func1<Throwable, Observable<? extends Void>>() { // from class: io.reactivex.mantis.remote.observable.reconciliator.Reconciliator.6
            public Observable<? extends Void> call(Throwable th) {
                Reconciliator.logger.error("caught error in Reconciliation for {}", Reconciliator.this.name, th);
                return Observable.empty();
            }
        }).doOnCompleted(new Action0() { // from class: io.reactivex.mantis.remote.observable.reconciliator.Reconciliator.5
            public void call() {
                Reconciliator.logger.error("onComplete in Reconciliation observable chain for {}", Reconciliator.this.name);
                Reconciliator.this.stopReconciliation();
            }
        }).subscribe();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void stopReconciliation() {
        if (!startedReconciliation.compareAndSet(true, false)) {
            logger.info("reconciliation already stopped for name: " + this.name);
            return;
        }
        logger.info("Stopping reconciliation for name: " + this.name);
        this.running.decrement();
        this.subscription.unsubscribe();
    }

    public Observable<Observable<T>> observables() {
        this.connectionSet.setEndpointInjector(new EndpointInjector() { // from class: io.reactivex.mantis.remote.observable.reconciliator.Reconciliator.8
            @Override // io.reactivex.mantis.remote.observable.EndpointInjector
            public Observable<EndpointChange> deltas() {
                return Reconciliator.this.deltas();
            }
        });
        return this.connectionSet.observables();
    }
}
