package io.fluxcapacitor.javaclient.tracking;

import io.fluxcapacitor.common.Interceptor;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.TimingUtils;
import io.fluxcapacitor.common.api.Message;
import io.fluxcapacitor.common.api.tracking.MessageBatch;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/fluxcapacitor/javaclient/tracking/Tracker.class */
public class Tracker implements Runnable, Registration {
    private static final Logger log = LoggerFactory.getLogger(Tracker.class);
    private final String name;
    private final int channel;
    private final TrackingConfiguration configuration;
    private final Consumer<List<Message>> consumer;
    private final ConsumerService consumerService;
    private final AtomicBoolean running = new AtomicBoolean();

    public Tracker(String str, int i, TrackingConfiguration trackingConfiguration, Consumer<List<Message>> consumer, ConsumerService consumerService) {
        this.name = str;
        this.channel = i;
        this.configuration = trackingConfiguration;
        this.consumer = Interceptor.join(trackingConfiguration.getBatchInterceptors()).intercept(consumer);
        this.consumerService = consumerService;
    }

    @Override // java.lang.Runnable
    public void run() {
        if (this.running.compareAndSet(false, true)) {
            while (this.running.get()) {
                MessageBatch fetch = fetch();
                process(fetch.getMessages(), fetch.getSegment());
            }
        }
    }

    public boolean cancel() {
        return this.running.compareAndSet(true, false);
    }

    protected MessageBatch fetch() {
        return (MessageBatch) TimingUtils.retryOnFailure(() -> {
            return this.consumerService.read(this.name, this.channel, this.configuration.getMaxFetchBatchSize(), this.configuration.getMaxWaitDuration());
        }, this.configuration.getRetryDelay(), exc -> {
            return this.running.get();
        });
    }

    protected void process(List<Message> list, int[] iArr) {
        if (list.isEmpty() || !this.running.get()) {
            return;
        }
        if (list.size() <= this.configuration.getMaxConsumerBatchSize()) {
            processBatch(list, iArr);
            return;
        }
        int i = 0;
        while (true) {
            int i2 = i;
            if (i2 >= list.size()) {
                return;
            }
            processBatch(list.subList(i2, Math.min(i2 + this.configuration.getMaxConsumerBatchSize(), list.size())), iArr);
            i = i2 + this.configuration.getMaxConsumerBatchSize();
        }
    }

    protected void processBatch(List<Message> list, int[] iArr) {
        try {
            this.consumer.accept(list);
            TimingUtils.retryOnFailure(() -> {
                this.consumerService.storePosition(this.name, iArr, ((Message) list.get(list.size() - 1)).getIndex().longValue());
                return null;
            }, this.configuration.getRetryDelay(), exc -> {
                return this.running.get();
            });
        } catch (Exception e) {
            log.error("Consumer {} failed to handle batch of {} messages and did not handle exception. Tracker will be stopped.", new Object[]{this.name, Integer.valueOf(list.size()), e});
            cancel();
            throw e;
        }
    }
}
