package io.fluxcapacitor.common;

import io.jooby.buffer.DefaultDataBufferFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/fluxcapacitor/common/Backlog.class */
public class Backlog<T> implements Monitored<List<T>> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) Backlog.class);
    private final int maxBatchSize;
    private final Queue<T> queue;
    private final ThrowingFunction<List<T>, CompletableFuture<?>> consumer;
    private final ErrorHandler<List<T>> errorHandler;
    private final ExecutorService executorService;
    private final AtomicBoolean flushing;
    private final AtomicLong insertPosition;
    private final AtomicLong flushPosition;
    private final ConcurrentSkipListMap<Long, CompletableFuture<Void>> results;
    private final Collection<Consumer<List<T>>> monitors;

    @FunctionalInterface
    /* loaded from: input_file:io/fluxcapacitor/common/Backlog$BatchConsumer.class */
    public interface BatchConsumer<T> {
        CompletableFuture<Void> accept(List<T> list) throws Exception;
    }

    public static <T> Backlog<T> forConsumer(ThrowingConsumer<List<T>> throwingConsumer) {
        return forConsumer(throwingConsumer, DefaultDataBufferFactory.DEFAULT_INITIAL_CAPACITY);
    }

    public static <T> Backlog<T> forConsumer(ThrowingConsumer<List<T>> throwingConsumer, int i) {
        return forConsumer(throwingConsumer, i, (th, list) -> {
            log.error("Consumer {} failed to handle batch of size {}. Continuing with next batch.", throwingConsumer, Integer.valueOf(list.size()), th);
        });
    }

    public static <T> Backlog<T> forConsumer(ThrowingConsumer<List<T>> throwingConsumer, int i, ErrorHandler<List<T>> errorHandler) {
        return new Backlog<>(list -> {
            throwingConsumer.accept(list);
            return null;
        }, i, errorHandler);
    }

    public static <T> Backlog<T> forAsyncConsumer(ThrowingFunction<List<T>, CompletableFuture<?>> throwingFunction) {
        return forAsyncConsumer(throwingFunction, DefaultDataBufferFactory.DEFAULT_INITIAL_CAPACITY);
    }

    public static <T> Backlog<T> forAsyncConsumer(ThrowingFunction<List<T>, CompletableFuture<?>> throwingFunction, int i) {
        return forAsyncConsumer(throwingFunction, i, (th, list) -> {
            log.error("Consumer {} failed to handle batch of size {}. Continuing with next batch.", throwingFunction, Integer.valueOf(list.size()), th);
        });
    }

    public static <T> Backlog<T> forAsyncConsumer(ThrowingFunction<List<T>, CompletableFuture<?>> throwingFunction, int i, ErrorHandler<List<T>> errorHandler) {
        return new Backlog<>(throwingFunction, i, errorHandler);
    }

    protected Backlog(ThrowingFunction<List<T>, CompletableFuture<?>> throwingFunction) {
        this(throwingFunction, DefaultDataBufferFactory.DEFAULT_INITIAL_CAPACITY);
    }

    protected Backlog(ThrowingFunction<List<T>, CompletableFuture<?>> throwingFunction, int i) {
        this(throwingFunction, i, (th, list) -> {
            log.error("Consumer {} failed to handle batch {}. Continuing with next batch.", throwingFunction, list, th);
        });
    }

    protected Backlog(ThrowingFunction<List<T>, CompletableFuture<?>> throwingFunction, int i, ErrorHandler<List<T>> errorHandler) {
        this.queue = new ConcurrentLinkedQueue();
        this.flushing = new AtomicBoolean();
        this.insertPosition = new AtomicLong();
        this.flushPosition = new AtomicLong();
        this.results = new ConcurrentSkipListMap<>();
        this.monitors = new CopyOnWriteArraySet();
        this.maxBatchSize = i;
        this.consumer = throwingFunction;
        this.executorService = Executors.newSingleThreadExecutor(ObjectUtils.newThreadFactory("Backlog"));
        this.errorHandler = errorHandler;
    }

    @SafeVarargs
    public final CompletableFuture<Void> add(T... tArr) {
        Collections.addAll(this.queue, tArr);
        return tArr.length == 0 ? CompletableFuture.completedFuture(null) : awaitFlush(this.insertPosition.updateAndGet(j -> {
            return j + tArr.length;
        }));
    }

    public CompletableFuture<Void> add(Collection<? extends T> collection) {
        this.queue.addAll(collection);
        return collection.isEmpty() ? CompletableFuture.completedFuture(null) : awaitFlush(this.insertPosition.updateAndGet(j -> {
            return j + collection.size();
        }));
    }

    private CompletableFuture<Void> awaitFlush(long j) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.results.put(Long.valueOf(j), completableFuture);
        flushIfNotFlushing();
        return completableFuture;
    }

    private void flushIfNotFlushing() {
        if (this.flushing.compareAndSet(false, true)) {
            this.executorService.execute(this::flush);
        }
    }

    private void flush() {
        CompletableFuture<?> failedFuture;
        T poll;
        while (!this.queue.isEmpty()) {
            try {
                ArrayList arrayList = new ArrayList(this.maxBatchSize);
                while (arrayList.size() < this.maxBatchSize && (poll = this.queue.poll()) != null) {
                    arrayList.add(poll);
                }
                try {
                    failedFuture = this.consumer.apply(arrayList);
                } catch (Throwable th) {
                    failedFuture = CompletableFuture.failedFuture(th);
                    this.errorHandler.handleError(th, arrayList);
                }
                long addAndGet = this.flushPosition.addAndGet(arrayList.size());
                if (failedFuture == null) {
                    completeResults(addAndGet, null);
                } else {
                    failedFuture.whenComplete((BiConsumer<? super Object, ? super Throwable>) (obj, th2) -> {
                        completeResults(addAndGet, th2);
                    });
                }
                this.monitors.forEach(consumer -> {
                    consumer.accept(arrayList);
                });
            } catch (Throwable th3) {
                log.error("Failed to flush the backlog", th3);
                this.flushing.set(false);
                throw th3;
            }
        }
        this.flushing.set(false);
        if (!this.queue.isEmpty()) {
            flushIfNotFlushing();
        }
    }

    protected void completeResults(long j, Throwable th) {
        ConcurrentNavigableMap<Long, CompletableFuture<Void>> headMap = this.results.headMap((ConcurrentSkipListMap<Long, CompletableFuture<Void>>) Long.valueOf(j), true);
        headMap.forEach((l, completableFuture) -> {
            if (th == null) {
                completableFuture.complete(null);
            } else {
                completableFuture.completeExceptionally(th);
            }
        });
        headMap.clear();
    }

    @Override // io.fluxcapacitor.common.Monitored
    public Registration registerMonitor(Consumer<List<T>> consumer) {
        this.monitors.add(consumer);
        return () -> {
            this.monitors.remove(consumer);
        };
    }

    public void shutDown() {
        try {
            this.executorService.shutdown();
            try {
                this.executorService.awaitTermination(1L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                log.warn("Shutdown of executor was interrupted", (Throwable) e);
                Thread.currentThread().interrupt();
            }
        } catch (Throwable th) {
            log.warn("Failed to shutdown a backlog", th);
        }
    }
}
