package org.exploit.finja.listener;

import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.exploit.finja.core.EventFetcher;
import org.exploit.finja.core.TransactionListener;
import org.exploit.finja.core.constant.Asset;
import org.exploit.finja.core.event.TxnEvent;
import org.exploit.finja.listener.config.PollingLimiterConfig;
import org.exploit.signalix.manager.EventScope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/exploit/finja/listener/PollingTxListener.class */
public class PollingTxListener implements TransactionListener {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PollingTxListener.class);
    protected Disposable pollTask;
    protected final Set<PollingAddressHook> hooks = new HashSet();
    protected final Asset asset;
    protected final EventFetcher eventFetcher;
    protected final PollingEventRestorer restorer;
    protected final PollingLimiterConfig limiterConfig;
    protected final long pollIntervalSeconds;
    protected final EventScope eventScope;

    public PollingTxListener(Asset asset, EventFetcher eventFetcher, PollingLimiterConfig pollingLimiterConfig, long j, EventScope eventScope) {
        this.asset = asset;
        this.eventFetcher = eventFetcher;
        this.restorer = new PollingEventRestorer(eventFetcher);
        this.limiterConfig = pollingLimiterConfig;
        this.pollIntervalSeconds = j;
        this.eventScope = eventScope;
    }

    @Override // org.exploit.finja.core.TransactionListener
    public void start() {
        this.pollTask = Flux.interval(Duration.ofSeconds(this.pollIntervalSeconds)).flatMap(l -> {
            return Flux.fromIterable(this.hooks);
        }).flatMap(pollingAddressHook -> {
            Mono<List<TxnEvent>> collectList = this.eventFetcher.events(pollingAddressHook.getAddress(), pollingAddressHook.getTimestamp()).collectList();
            Objects.requireNonNull(pollingAddressHook);
            return collectList.map((v1) -> {
                return r1.processEvents(v1);
            }).onErrorResume(th -> {
                log.error("Couldn't poll from address: {}", pollingAddressHook.getAddress(), th);
                return Mono.empty();
            });
        }, this.limiterConfig.getMaxPerUpdate()).delayElements(Duration.ofMillis(this.limiterConfig.getDelay())).onBackpressureBuffer().flatMap((v0) -> {
            return Flux.fromIterable(v0);
        }).flatMap(event -> {
            return Mono.fromRunnable(() -> {
                this.eventScope.call(event);
            });
        }).onErrorResume(th -> {
            log.error("Error occurred while polling: ", th);
            return Mono.empty();
        }).subscribe();
    }

    @Override // org.exploit.finja.core.TransactionListener
    public void listen(String str, long j) {
        this.hooks.add(new PollingAddressHook(str, j));
    }

    @Override // org.exploit.finja.core.TransactionListener
    public void removeListener(String str) {
        this.hooks.removeIf(pollingAddressHook -> {
            return pollingAddressHook.getAddress().equals(str);
        });
    }

    @Override // org.exploit.finja.core.TransactionListener
    public Flux<TxnEvent> restore(String str, long j) {
        return this.restorer.restore(this.hooks, str, j);
    }

    @Override // reactor.core.Disposable
    public void dispose() {
        if (this.pollTask != null) {
            this.pollTask.dispose();
        }
    }

    @Override // org.exploit.finja.core.TransactionListener
    public Asset asset() {
        return this.asset;
    }
}
