package info.bitrich.xchangestream.kucoin;

import info.bitrich.xchangestream.core.ProductSubscription;
import info.bitrich.xchangestream.core.StreamingExchange;
import info.bitrich.xchangestream.core.StreamingTradeService;
import info.bitrich.xchangestream.service.netty.NettyStreamingService;
import info.bitrich.xchangestream.util.Events;
import io.reactivex.Completable;
import io.reactivex.Observable;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.knowm.xchange.kucoin.KucoinExchange;
import org.knowm.xchange.kucoin.dto.response.WebsocketResponse;

/* loaded from: input_file:info/bitrich/xchangestream/kucoin/KucoinStreamingExchange.class */
public class KucoinStreamingExchange extends KucoinExchange implements StreamingExchange {
    private KucoinStreamingService publicStreamingService;
    private KucoinStreamingService privateStreamingService;
    private KucoinStreamingMarketDataService streamingMarketDataService;
    private KucoinStreamingTradeService streamingTradeService;
    private final List<NettyStreamingService<?>> services = new ArrayList();
    private Runnable onApiCall;

    protected void initServices() {
        super.initServices();
        this.onApiCall = Events.onApiCall(this.exchangeSpecification);
    }

    public Completable connect(ProductSubscription... productSubscriptionArr) {
        ProductSubscription productSubscription = productSubscriptionArr[0];
        Completable complete = Completable.complete();
        this.services.clear();
        if (productSubscription.hasUnauthenticated()) {
            complete = complete.doOnComplete(() -> {
                WebsocketResponse publicWebsocketConnectionDetails = getPublicWebsocketConnectionDetails();
                WebsocketResponse.InstanceServer instanceServer = (WebsocketResponse.InstanceServer) publicWebsocketConnectionDetails.getInstanceServers().get(0);
                this.publicStreamingService = new KucoinStreamingService(instanceServer.getEndpoint() + "?token=" + publicWebsocketConnectionDetails.getToken(), instanceServer.getPingInterval(), false);
                applyStreamingSpecification(getExchangeSpecification(), this.publicStreamingService);
                this.publicStreamingService.connect().doOnError(th -> {
                    this.logger.warn("encountered error while subscribing to public websocket", th);
                }).blockingAwait();
                this.services.add(this.publicStreamingService);
                this.streamingMarketDataService = new KucoinStreamingMarketDataService(this.publicStreamingService, getMarketDataService(), this.onApiCall);
            });
        }
        if (productSubscription.hasAuthenticated()) {
            if (this.exchangeSpecification.getApiKey() == null) {
                throw new IllegalArgumentException("API key required for authenticated streams");
            }
            complete = complete.doOnComplete(() -> {
                WebsocketResponse privateWebsocketConnectionDetails = getPrivateWebsocketConnectionDetails();
                WebsocketResponse.InstanceServer instanceServer = (WebsocketResponse.InstanceServer) privateWebsocketConnectionDetails.getInstanceServers().get(0);
                this.privateStreamingService = new KucoinStreamingService(instanceServer.getEndpoint() + "?token=" + privateWebsocketConnectionDetails.getToken(), instanceServer.getPingInterval(), true);
                applyStreamingSpecification(getExchangeSpecification(), this.privateStreamingService);
                this.privateStreamingService.connect().doOnError(th -> {
                    this.logger.warn("encountered error while subscribing to private websocket", th);
                }).blockingAwait();
                this.services.add(this.privateStreamingService);
                this.streamingTradeService = new KucoinStreamingTradeService(this.privateStreamingService);
            });
        }
        return complete;
    }

    public Completable disconnect() {
        if (this.publicStreamingService != null) {
            this.publicStreamingService = null;
            this.streamingMarketDataService = null;
        }
        if (this.privateStreamingService != null) {
            this.privateStreamingService = null;
        }
        List list = (List) this.services.stream().map((v0) -> {
            return v0.disconnect();
        }).collect(Collectors.toList());
        this.services.clear();
        return Completable.concat(list);
    }

    public boolean isAlive() {
        return this.services.stream().anyMatch((v0) -> {
            return v0.isSocketOpen();
        });
    }

    public Observable<Throwable> reconnectFailure() {
        return Observable.concat((Iterable) this.services.stream().map((v0) -> {
            return v0.subscribeReconnectFailure();
        }).collect(Collectors.toList()));
    }

    public Observable<Object> connectionSuccess() {
        return Observable.concat((Iterable) this.services.stream().map((v0) -> {
            return v0.subscribeConnectionSuccess();
        }).collect(Collectors.toList()));
    }

    /* renamed from: getStreamingMarketDataService, reason: merged with bridge method [inline-methods] */
    public KucoinStreamingMarketDataService m0getStreamingMarketDataService() {
        return this.streamingMarketDataService;
    }

    public StreamingTradeService getStreamingTradeService() {
        return this.streamingTradeService;
    }

    public void useCompressedMessages(boolean z) {
        this.services.forEach(nettyStreamingService -> {
            nettyStreamingService.useCompressedMessages(z);
        });
    }
}
