package info.bitrich.xchangestream.kucoin;

import com.fasterxml.jackson.databind.ObjectMapper;
import info.bitrich.xchangestream.core.StreamingTradeService;
import info.bitrich.xchangestream.kucoin.dto.KucoinWebSocketOrderEvent;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import io.reactivex.Observable;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.Order;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:info/bitrich/xchangestream/kucoin/KucoinStreamingTradeService.class */
public class KucoinStreamingTradeService implements StreamingTradeService {
    private static final Logger logger = LoggerFactory.getLogger(KucoinStreamingTradeService.class);
    private final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();
    private final KucoinStreamingService service;

    public KucoinStreamingTradeService(KucoinStreamingService kucoinStreamingService) {
        this.service = kucoinStreamingService;
    }

    public Observable<Order> getOrderChanges(CurrencyPair currencyPair, Object... objArr) {
        return getRawOrderChanges(currencyPair).map(KucoinStreamingAdapters::adaptOrder);
    }

    public Observable<KucoinWebSocketOrderEvent> getRawOrderChanges(CurrencyPair currencyPair) {
        return this.service.subscribeChannel("/spotMarket/tradeOrders", new Object[0]).doOnError(th -> {
            logger.warn("encountered error while subscribing to order changes", th);
        }).map(jsonNode -> {
            return (KucoinWebSocketOrderEvent) this.mapper.treeToValue(jsonNode, KucoinWebSocketOrderEvent.class);
        }).filter(kucoinWebSocketOrderEvent -> {
            return currencyPair == null || currencyPair.equals(kucoinWebSocketOrderEvent.data.getCurrencyPair());
        });
    }
}
