package io.mantisrx.client;

import com.netflix.spectator.api.BasicTag;
import com.netflix.spectator.api.Tag;
import io.mantisrx.common.metrics.Gauge;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.spectator.MetricGroupId;
import io.mantisrx.server.master.client.MasterClientWrapper;
import io.reactivex.mantis.remote.observable.EndpointChange;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.functions.Action1;
import rx.functions.Func1;

/* loaded from: input_file:io/mantisrx/client/SinkClientImpl.class */
public class SinkClientImpl<T> implements SinkClient<T> {
    private static final Logger logger = LoggerFactory.getLogger(SinkClientImpl.class);
    final String jobId;
    final SinkConnectionFunc<T> sinkConnectionFunc;
    final JobSinkLocator jobSinkLocator;
    private final AtomicBoolean nowClosed;
    private final SinkClientImpl<T>.SinkConnections<T> sinkConnections;
    private final String sinkGuageName = "SinkConnections";
    private final String expectedSinksGaugeName = "ExpectedSinkConnections";
    private final String sinkReceivingDataGaugeName = "sinkRecvngData";
    private final String clientNotConnectedToAllSourcesGaugeName = "clientNotConnectedToAllSources";
    private final Gauge sinkGauge;
    private final Gauge expectedSinksGauge;
    private final Gauge sinkReceivingDataGauge;
    private final Gauge clientNotConnectedToAllSourcesGauge;
    private final AtomicInteger numSinkWorkers;
    private final Observer<SinkConnectionsStatus> sinkConnectionsStatusObserver;
    private final long dataRecvTimeoutSecs;
    private final Metrics metrics;
    private final boolean disablePingFiltering;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/mantisrx/client/SinkClientImpl$SinkConnections.class */
    public class SinkConnections<T> {
        private final Map<String, SinkConnection<T>> sinkConnections = new HashMap();
        private boolean isClosed = false;

        SinkConnections() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void put(String str, SinkConnection<T> sinkConnection) {
            synchronized (this.sinkConnections) {
                if (this.isClosed) {
                    return;
                }
                this.sinkConnections.put(str, sinkConnection);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public SinkConnection<T> remove(String str) {
            SinkConnection<T> remove;
            synchronized (this.sinkConnections) {
                remove = this.sinkConnections.remove(str);
            }
            return remove;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void closeOut(Action1<SinkConnection<T>> action1) {
            synchronized (this.sinkConnections) {
                this.isClosed = true;
            }
            for (SinkConnection<T> sinkConnection : this.sinkConnections.values()) {
                SinkClientImpl.logger.info("Closing " + sinkConnection.getName());
                action1.call(sinkConnection);
            }
        }

        public String toString() {
            return "SinkClientImpl.SinkConnections(sinkConnections=" + this.sinkConnections + ", isClosed=" + this.isClosed + ")";
        }
    }

    SinkClientImpl(String str, SinkConnectionFunc<T> sinkConnectionFunc, JobSinkLocator jobSinkLocator, Observable<Integer> observable, Observer<SinkConnectionsStatus> observer, long j) {
        this(str, sinkConnectionFunc, jobSinkLocator, observable, observer, j, false);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SinkClientImpl(String str, SinkConnectionFunc<T> sinkConnectionFunc, JobSinkLocator jobSinkLocator, Observable<Integer> observable, Observer<SinkConnectionsStatus> observer, long j, boolean z) {
        this.nowClosed = new AtomicBoolean(false);
        this.sinkConnections = new SinkConnections<>();
        this.sinkGuageName = "SinkConnections";
        this.expectedSinksGaugeName = "ExpectedSinkConnections";
        this.sinkReceivingDataGaugeName = "sinkRecvngData";
        this.clientNotConnectedToAllSourcesGaugeName = "clientNotConnectedToAllSources";
        this.numSinkWorkers = new AtomicInteger();
        this.jobId = str;
        this.sinkConnectionFunc = sinkConnectionFunc;
        this.jobSinkLocator = jobSinkLocator;
        this.metrics = new Metrics.Builder().id(new MetricGroupId(SinkClientImpl.class.getCanonicalName(), new Tag[]{new BasicTag("jobId", (String) Optional.ofNullable(str).orElse("NullJobId"))})).addGauge("SinkConnections").addGauge("ExpectedSinkConnections").addGauge("sinkRecvngData").addGauge("clientNotConnectedToAllSources").build();
        this.sinkGauge = this.metrics.getGauge("SinkConnections");
        this.expectedSinksGauge = this.metrics.getGauge("ExpectedSinkConnections");
        this.sinkReceivingDataGauge = this.metrics.getGauge("sinkRecvngData");
        this.clientNotConnectedToAllSourcesGauge = this.metrics.getGauge("clientNotConnectedToAllSources");
        observable.doOnNext(num -> {
            this.numSinkWorkers.set(num.intValue());
        }).takeWhile(num2 -> {
            return Boolean.valueOf(!this.nowClosed.get());
        }).subscribe();
        this.sinkConnectionsStatusObserver = observer;
        this.dataRecvTimeoutSecs = j;
        this.disablePingFiltering = z;
    }

    private String toSinkName(String str, int i) {
        return str + "-" + i;
    }

    @Override // io.mantisrx.client.SinkClient
    public boolean hasError() {
        return false;
    }

    @Override // io.mantisrx.client.SinkClient
    public String getError() {
        return null;
    }

    @Override // io.mantisrx.client.SinkClient
    public Observable<Observable<T>> getResults() {
        return getPartitionedResults(-1, 0);
    }

    @Override // io.mantisrx.client.SinkClient
    public Observable<Observable<T>> getPartitionedResults(int i, int i2) {
        return internalGetResults(i, i2);
    }

    private <T> Observable<Observable<T>> internalGetResults(int i, int i2) {
        return this.jobSinkLocator.locatePartitionedSinkForJob(this.jobId, i, i2).map(new Func1<EndpointChange, Observable<T>>() { // from class: io.mantisrx.client.SinkClientImpl.1
            public Observable<T> call(EndpointChange endpointChange) {
                return SinkClientImpl.this.nowClosed.get() ? Observable.empty() : endpointChange.getType() == EndpointChange.Type.complete ? SinkClientImpl.this.handleEndpointClose(endpointChange) : SinkClientImpl.this.handleEndpointConnect(endpointChange);
            }
        }).doOnUnsubscribe(() -> {
            try {
                logger.warn("Closing connections to sink of job {}", this.jobId);
                closeAllConnections();
            } catch (Exception e) {
                logger.warn("Error closing all connections to sink of job {}", this.jobId, e);
            }
        }).share();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> Observable<T> handleEndpointConnect(EndpointChange endpointChange) {
        logger.info("Opening connection to sink at " + endpointChange.toString());
        String unwrappedHost = MasterClientWrapper.getUnwrappedHost(endpointChange.getEndpoint().getHost());
        SinkConnection<T> call = this.sinkConnectionFunc.call(unwrappedHost, Integer.valueOf(endpointChange.getEndpoint().getPort()), new Action1<Boolean>() { // from class: io.mantisrx.client.SinkClientImpl.2
            public void call(Boolean bool) {
                SinkClientImpl.this.updateSinkConx(bool);
            }
        }, new Action1<Boolean>() { // from class: io.mantisrx.client.SinkClientImpl.3
            public void call(Boolean bool) {
                SinkClientImpl.this.updateSinkDataReceivingStatus(bool);
            }
        }, this.dataRecvTimeoutSecs, this.disablePingFiltering);
        if (this.nowClosed.get()) {
            try {
                call.close();
            } catch (Exception e) {
                logger.warn("Error closing sink connection " + call.getName() + " - " + e.getMessage(), e);
            }
            return Observable.empty();
        }
        this.sinkConnections.put(toSinkName(unwrappedHost, endpointChange.getEndpoint().getPort()), call);
        if (this.nowClosed.get()) {
            try {
                call.close();
                this.sinkConnections.remove(toSinkName(unwrappedHost, endpointChange.getEndpoint().getPort()));
                return Observable.empty();
            } catch (Exception e2) {
                logger.warn("Error closing sink connection - " + e2.getMessage());
            }
        }
        return ((Observable) call.call()).takeWhile(obj -> {
            return Boolean.valueOf(!this.nowClosed.get());
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateSinkDataReceivingStatus(Boolean bool) {
        if (bool.booleanValue()) {
            this.sinkReceivingDataGauge.increment();
        } else {
            this.sinkReceivingDataGauge.decrement();
        }
        this.expectedSinksGauge.set(this.numSinkWorkers.get());
        if (this.expectedSinksGauge.value() != this.sinkReceivingDataGauge.value()) {
            this.clientNotConnectedToAllSourcesGauge.set(1L);
        } else {
            this.clientNotConnectedToAllSourcesGauge.set(0L);
        }
        if (this.sinkConnectionsStatusObserver != null) {
            synchronized (this.sinkConnectionsStatusObserver) {
                this.sinkConnectionsStatusObserver.onNext(new SinkConnectionsStatus(this.sinkReceivingDataGauge.value(), this.sinkGauge.value(), this.numSinkWorkers.get()));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateSinkConx(Boolean bool) {
        if (bool.booleanValue()) {
            this.sinkGauge.increment();
        } else {
            this.sinkGauge.decrement();
        }
        this.expectedSinksGauge.set(this.numSinkWorkers.get());
        if (this.expectedSinksGauge.value() != this.sinkReceivingDataGauge.value()) {
            this.clientNotConnectedToAllSourcesGauge.set(1L);
        } else {
            this.clientNotConnectedToAllSourcesGauge.set(0L);
        }
        if (this.sinkConnectionsStatusObserver != null) {
            synchronized (this.sinkConnectionsStatusObserver) {
                this.sinkConnectionsStatusObserver.onNext(new SinkConnectionsStatus(this.sinkReceivingDataGauge.value(), this.sinkGauge.value(), this.numSinkWorkers.get()));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> Observable<T> handleEndpointClose(EndpointChange endpointChange) {
        logger.info("Closed connection to sink at " + endpointChange.toString());
        String unwrappedHost = MasterClientWrapper.getUnwrappedHost(endpointChange.getEndpoint().getHost());
        SinkConnection remove = this.sinkConnections.remove(toSinkName(unwrappedHost, endpointChange.getEndpoint().getPort()));
        if (remove != null) {
            try {
                remove.close();
            } catch (Exception e) {
                logger.error("Unexpected exception on closing sinkConnection: " + e.getMessage(), e);
            }
        } else {
            logger.error("SinkConnections does not contain endpoint to be removed. host: {}, sinkConnections: {}", unwrappedHost, this.sinkConnections);
        }
        return Observable.empty();
    }

    private void closeAllConnections() throws Exception {
        this.nowClosed.set(true);
        this.sinkConnections.closeOut(sinkConnection -> {
            try {
                sinkConnection.close();
            } catch (Exception e) {
                logger.warn("Error closing sink connection " + sinkConnection.getName() + " - " + e.getMessage(), e);
            }
        });
    }
}
