package io.fluxcapacitor.javaclient.tracking.client;

import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.TimingUtils;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.tracking.MessageBatch;
import io.fluxcapacitor.javaclient.configuration.client.Client;
import io.fluxcapacitor.javaclient.tracking.BatchInterceptor;
import io.fluxcapacitor.javaclient.tracking.BatchProcessingException;
import io.fluxcapacitor.javaclient.tracking.ConsumerConfiguration;
import io.fluxcapacitor.javaclient.tracking.Tracker;
import io.fluxcapacitor.javaclient.tracking.TrackingException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/fluxcapacitor/javaclient/tracking/client/DefaultTracker.class */
public class DefaultTracker implements Runnable, Registration {
    private static final Logger log = LoggerFactory.getLogger(DefaultTracker.class);
    private final Tracker tracker;
    private final Consumer<MessageBatch> processor;
    private final TrackingClient trackingClient;
    private final AtomicBoolean running = new AtomicBoolean();
    private final AtomicReference<Thread> thread = new AtomicReference<>();
    private final Duration retryDelay;
    private volatile Long lastProcessedIndex;
    private volatile boolean processing;

    public static Registration start(Consumer<List<SerializedMessage>> consumer, ConsumerConfiguration consumerConfiguration, Client client) {
        List list = (List) IntStream.range(0, consumerConfiguration.getThreads()).mapToObj(i -> {
            return new DefaultTracker(consumer, consumerConfiguration, client);
        }).collect(Collectors.toList());
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(consumerConfiguration.getThreads());
        Objects.requireNonNull(newFixedThreadPool);
        list.forEach((v1) -> {
            r1.submit(v1);
        });
        return () -> {
            list.forEach((v0) -> {
                v0.cancel();
            });
            newFixedThreadPool.shutdownNow();
        };
    }

    private DefaultTracker(Consumer<List<SerializedMessage>> consumer, ConsumerConfiguration consumerConfiguration, Client client) {
        this.tracker = new Tracker(consumerConfiguration.prependApplicationName() ? String.format("%s_%s", client.name(), consumerConfiguration.getName()) : consumerConfiguration.getName(), consumerConfiguration.getTrackerIdFactory().apply(client), consumerConfiguration);
        this.processor = BatchInterceptor.join(consumerConfiguration.getBatchInterceptors()).intercept(messageBatch -> {
            process(messageBatch, consumer);
        }, this.tracker);
        this.trackingClient = client.getTrackingClient(consumerConfiguration.getMessageType());
        this.retryDelay = Duration.ofSeconds(1L);
        this.lastProcessedIndex = consumerConfiguration.getLastIndex();
    }

    @Override // java.lang.Runnable
    public void run() {
        if (this.running.compareAndSet(false, true)) {
            Tracker.current.set(this.tracker);
            this.thread.set(Thread.currentThread());
            while (this.running.get()) {
                this.processor.accept(fetch(this.lastProcessedIndex));
            }
            Tracker.current.remove();
        }
    }

    protected MessageBatch fetch(Long l) {
        return (MessageBatch) TimingUtils.retryOnFailure(() -> {
            return this.trackingClient.readAndWait(this.tracker.getName(), this.tracker.getTrackerId(), l, this.tracker.getConfiguration());
        }, this.retryDelay, exc -> {
            return this.running.get();
        });
    }

    protected void process(MessageBatch messageBatch, Consumer<List<SerializedMessage>> consumer) {
        try {
            this.processing = true;
            List<SerializedMessage> messages = messageBatch.getMessages();
            if (messages.isEmpty() || !this.running.get()) {
                return;
            }
            try {
                consumer.accept(messages);
                updatePosition(messageBatch.getLastIndex(), messageBatch.getSegment());
                this.processing = false;
            } catch (BatchProcessingException e) {
                log.error("Consumer {} failed to handle batch of {} messages at index {} and did not handle exception. Consumer will be updated to the last processed index and then stopped.", new Object[]{this.tracker.getName(), Integer.valueOf(messages.size()), e.getMessageIndex()});
                updatePosition((Long) messages.stream().map((v0) -> {
                    return v0.getIndex();
                }).filter(l -> {
                    return (e.getMessageIndex() == null || l == null || l.longValue() >= e.getMessageIndex().longValue()) ? false : true;
                }).max(Comparator.naturalOrder()).orElse(null), messageBatch.getSegment());
                this.processing = false;
                cancel();
                throw e;
            } catch (Exception e2) {
                log.error("Consumer {} failed to handle batch of {} messages and did not handle exception. Tracker will be stopped.", new Object[]{this.tracker.getName(), Integer.valueOf(messages.size()), e2});
                this.processing = false;
                cancel();
                throw e2;
            }
        } finally {
            this.processing = false;
        }
    }

    private void updatePosition(Long l, int[] iArr) {
        if (l != null) {
            this.lastProcessedIndex = l;
            TimingUtils.retryOnFailure(() -> {
                try {
                    this.trackingClient.storePosition(this.tracker.getName(), iArr, l.longValue()).await();
                } catch (Exception e) {
                    throw new TrackingException(String.format("Failed to store position of segments %s for tracker %s to index %s", Arrays.toString(iArr), this.tracker, l), e);
                }
            }, this.retryDelay, exc -> {
                return this.running.get();
            });
        }
    }

    public void cancel() {
        if (this.running.compareAndSet(true, false)) {
            try {
            } catch (Exception e) {
                log.warn("Not allowed to cancel tracker {}", this.tracker.getName(), e);
            } finally {
                this.thread.set(null);
            }
            if (!this.processing) {
                this.thread.get().interrupt();
                this.tracker.getConfiguration().getBatchInterceptors().forEach(batchInterceptor -> {
                    try {
                        batchInterceptor.shutdown(this.tracker);
                    } catch (Exception e2) {
                        log.warn("Failed to stop batch interceptor {}", batchInterceptor, e2);
                    }
                });
            }
            while (this.processing) {
                try {
                    Thread.sleep(1L);
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
            this.tracker.getConfiguration().getBatchInterceptors().forEach(batchInterceptor2 -> {
                try {
                    batchInterceptor2.shutdown(this.tracker);
                } catch (Exception e22) {
                    log.warn("Failed to stop batch interceptor {}", batchInterceptor2, e22);
                }
            });
        }
    }
}
