package info.bitrich.xchangestream.kucoin;

import com.fasterxml.jackson.databind.ObjectMapper;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.kucoin.dto.KucoinOrderBookEvent;
import info.bitrich.xchangestream.kucoin.dto.KucoinOrderBookEventData;
import info.bitrich.xchangestream.kucoin.dto.KucoinTickerEvent;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import io.reactivex.Observable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.Ticker;
import org.knowm.xchange.dto.marketdata.Trade;
import org.knowm.xchange.exceptions.NotYetImplementedForExchangeException;
import org.knowm.xchange.kucoin.KucoinAdapters;
import org.knowm.xchange.kucoin.KucoinMarketDataService;
import org.knowm.xchange.kucoin.dto.response.OrderBookResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:info/bitrich/xchangestream/kucoin/KucoinStreamingMarketDataService.class */
public class KucoinStreamingMarketDataService implements StreamingMarketDataService {
    private static final Logger logger = LoggerFactory.getLogger(KucoinStreamingMarketDataService.class);
    private final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();
    private final Map<CurrencyPair, Observable<OrderBook>> orderbookSubscriptions = new ConcurrentHashMap();
    private final Map<CurrencyPair, Observable<KucoinOrderBookEventData>> orderBookRawUpdatesSubscriptions = new ConcurrentHashMap();
    private final KucoinStreamingService service;
    private final KucoinMarketDataService marketDataService;
    private final Runnable onApiCall;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:info/bitrich/xchangestream/kucoin/KucoinStreamingMarketDataService$OrderbookSubscription.class */
    public final class OrderbookSubscription {
        final Observable<KucoinOrderBookEventData> stream;
        final AtomicLong lastUpdateId;
        final AtomicLong snapshotLastUpdateId;
        OrderBook orderBook;

        private OrderbookSubscription(Observable<KucoinOrderBookEventData> observable) {
            this.lastUpdateId = new AtomicLong();
            this.snapshotLastUpdateId = new AtomicLong();
            this.stream = observable;
        }

        void invalidateSnapshot() {
            this.snapshotLastUpdateId.set(0L);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void initSnapshotIfInvalid(CurrencyPair currencyPair) {
            if (this.snapshotLastUpdateId.get() != 0) {
                return;
            }
            try {
                KucoinStreamingMarketDataService.logger.info("Fetching initial orderbook snapshot for {} ", currencyPair);
                KucoinStreamingMarketDataService.this.onApiCall.run();
                OrderBookResponse kucoinOrderBookFull = KucoinStreamingMarketDataService.this.marketDataService.getKucoinOrderBookFull(currencyPair);
                this.lastUpdateId.set(Long.parseLong(kucoinOrderBookFull.getSequence()));
                this.snapshotLastUpdateId.set(this.lastUpdateId.get());
                this.orderBook = KucoinAdapters.adaptOrderBook(currencyPair, kucoinOrderBookFull);
            } catch (Exception e) {
                KucoinStreamingMarketDataService.logger.error("Failed to fetch initial order book for " + currencyPair, e);
                this.snapshotLastUpdateId.set(0L);
                this.lastUpdateId.set(0L);
                this.orderBook = null;
            }
        }
    }

    public KucoinStreamingMarketDataService(KucoinStreamingService kucoinStreamingService, KucoinMarketDataService kucoinMarketDataService, Runnable runnable) {
        this.service = kucoinStreamingService;
        this.marketDataService = kucoinMarketDataService;
        this.onApiCall = runnable;
    }

    public Observable<Ticker> getTicker(CurrencyPair currencyPair, Object... objArr) {
        String str = "/market/ticker:" + KucoinAdapters.adaptCurrencyPair(currencyPair);
        return this.service.subscribeChannel(str, new Object[0]).doOnError(th -> {
            logger.warn("encountered error while subscribing to channel " + str, th);
        }).map(jsonNode -> {
            return (KucoinTickerEvent) this.mapper.treeToValue(jsonNode, KucoinTickerEvent.class);
        }).map((v0) -> {
            return v0.getTicker();
        });
    }

    public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object... objArr) {
        return this.orderbookSubscriptions.computeIfAbsent(currencyPair, this::initOrderBookIfAbsent);
    }

    private Observable<OrderBook> initOrderBookIfAbsent(CurrencyPair currencyPair) {
        this.orderBookRawUpdatesSubscriptions.computeIfAbsent(currencyPair, currencyPair2 -> {
            return triggerObservableBody(rawOrderBookUpdates(currencyPair2));
        });
        return createOrderBookObservable(currencyPair);
    }

    private Observable<KucoinOrderBookEventData> rawOrderBookUpdates(CurrencyPair currencyPair) {
        String str = "/market/level2:" + KucoinAdapters.adaptCurrencyPair(currencyPair);
        return this.service.subscribeChannel(str, new Object[0]).doOnError(th -> {
            logger.warn("encountered error while subscribing to channel " + str, th);
        }).map(jsonNode -> {
            return (KucoinOrderBookEvent) this.mapper.treeToValue(jsonNode, KucoinOrderBookEvent.class);
        }).map(kucoinOrderBookEvent -> {
            return kucoinOrderBookEvent.data;
        });
    }

    private Observable<OrderBook> createOrderBookObservable(CurrencyPair currencyPair) {
        OrderbookSubscription orderbookSubscription = new OrderbookSubscription(this.orderBookRawUpdatesSubscriptions.get(currencyPair));
        return orderbookSubscription.stream.doOnNext(kucoinOrderBookEventData -> {
            orderbookSubscription.initSnapshotIfInvalid(currencyPair);
        }).doOnError(th -> {
            logger.warn("encountered error while processing order book event", th);
        }).filter(kucoinOrderBookEventData2 -> {
            return orderbookSubscription.snapshotLastUpdateId.get() > 0;
        }).filter(kucoinOrderBookEventData3 -> {
            return kucoinOrderBookEventData3.sequenceEnd > orderbookSubscription.snapshotLastUpdateId.get();
        }).filter(kucoinOrderBookEventData4 -> {
            long j = orderbookSubscription.lastUpdateId.get();
            boolean z = j == 0 || (kucoinOrderBookEventData4.sequenceStart <= j + 1 && kucoinOrderBookEventData4.sequenceEnd >= j + 1);
            if (z) {
                orderbookSubscription.lastUpdateId.set(kucoinOrderBookEventData4.sequenceEnd);
            } else {
                logger.info("Orderbook snapshot for {} out of date (last={}, U={}, u={}). This is normal. Re-syncing.", new Object[]{currencyPair, Long.valueOf(j), Long.valueOf(kucoinOrderBookEventData4.sequenceStart), Long.valueOf(kucoinOrderBookEventData4.sequenceEnd)});
                orderbookSubscription.invalidateSnapshot();
            }
            return z;
        }).map(kucoinOrderBookEventData5 -> {
            kucoinOrderBookEventData5.update(currencyPair, orderbookSubscription.orderBook);
            return orderbookSubscription.orderBook;
        }).share();
    }

    private <T> Observable<T> triggerObservableBody(Observable<T> observable) {
        observable.subscribe(obj -> {
        });
        return observable;
    }

    public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... objArr) {
        throw new NotYetImplementedForExchangeException();
    }
}
