package io.github.opensabe.common.buffer;

import com.google.common.collect.Lists;
import com.google.errorprone.annotations.ForOverride;
import io.github.opensabe.common.buffer.BufferedElement;
import io.github.opensabe.common.buffer.observation.BatchBufferQueueBatchManipulateObservationDocumentation;
import io.github.opensabe.common.executor.ThreadPoolFactory;
import io.github.opensabe.common.executor.ThreadPoolFactoryGracefulShutDownHandler;
import io.github.opensabe.common.observation.UnifiedObservationFactory;
import io.micrometer.observation.Observation;
import io.micrometer.tracing.TraceContext;
import jakarta.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jctools.queues.MpscBlockingConsumerArrayQueue;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:io/github/opensabe/common/buffer/BatchBufferedQueue.class */
public abstract class BatchBufferedQueue<E extends BufferedElement> {
    private static final Logger log = LogManager.getLogger(BatchBufferedQueue.class);
    protected static final int DEFAULT_QUEUE_COUNT = 1;
    protected static final int DEFAULT_QUEUE_SIZE = 1048576;
    protected static final int DEFAULT_BATCH_SIZE = 2048;
    protected static final long DEFAULT_POLL_WAIT_TIME_IN_MILLIS = 1000;
    protected static final long DEFAULT_MAX_WAIT_TIME_IN_MILLIS = 1000;
    private MpscBlockingConsumerArrayQueue<E>[] mpscBlockingConsumerArrayQueues;
    private ExecutorService[] executorServices;
    private final AtomicInteger counter = new AtomicInteger(0);

    @Autowired
    private ThreadPoolFactory threadPoolFactory;

    @Autowired
    private UnifiedObservationFactory unifiedObservationFactory;

    @Autowired
    private ThreadPoolFactoryGracefulShutDownHandler threadPoolFactoryGracefulShutDownHandler;

    @ForOverride
    protected int queueCount() {
        return 1;
    }

    @ForOverride
    protected int queueSize() {
        return DEFAULT_QUEUE_SIZE;
    }

    @ForOverride
    protected int batchSize() {
        return DEFAULT_BATCH_SIZE;
    }

    @ForOverride
    protected long pollWaitTimeInMillis() {
        return 1000L;
    }

    @ForOverride
    protected long maxWaitTimeInMillis() {
        return 1000L;
    }

    protected abstract Comparator<E> comparator();

    protected abstract void batchManipulate(List<E> list);

    protected void beforeExecute(List<E> list) {
    }

    protected void afterBatchFinish(List<E> list) {
    }

    protected void afterBatchError(List<E> list, Throwable th) {
    }

    private void manipulate(List<E> list) {
        Observation observation = BatchBufferQueueBatchManipulateObservationDocumentation.DEFAULT_OBSERVATION_DOCUMENTATION.observation(this.unifiedObservationFactory.getObservationRegistry());
        if (CollectionUtils.isEmpty(list)) {
            log.debug("BatchBufferedQueue: empty queue");
            return;
        }
        log.info("BatchBufferedQueue: current batch size: {}", Integer.valueOf(list.size()));
        try {
            try {
                observation.start();
                TraceContext traceContext = UnifiedObservationFactory.getTraceContext(observation);
                list.forEach(bufferedElement -> {
                    bufferedElement.beforeElementManipulate(traceContext.spanId());
                });
                list = (List) list.stream().sorted(comparator()).collect(Collectors.toList());
                beforeExecute(list);
                batchManipulate(list);
                list.forEach((v0) -> {
                    v0.afterElementManipulate();
                });
                afterBatchFinish(list);
                observation.stop();
            } catch (Throwable th) {
                list.forEach(bufferedElement2 -> {
                    bufferedElement2.afterElementManipulateError(th);
                });
                afterBatchError(list, th);
                observation.error(th);
                observation.stop();
            }
        } catch (Throwable th2) {
            observation.stop();
            throw th2;
        }
    }

    @PostConstruct
    public void init() {
        int nearest2Power = getNearest2Power(queueCount());
        int nearest2Power2 = getNearest2Power(queueSize());
        int nearest2Power3 = getNearest2Power(batchSize());
        long pollWaitTimeInMillis = pollWaitTimeInMillis();
        long maxWaitTimeInMillis = maxWaitTimeInMillis();
        String simpleName = getClass().getSimpleName();
        this.mpscBlockingConsumerArrayQueues = new MpscBlockingConsumerArrayQueue[nearest2Power];
        this.executorServices = new ExecutorService[nearest2Power];
        for (int i = 0; i < nearest2Power; i++) {
            this.mpscBlockingConsumerArrayQueues[i] = new MpscBlockingConsumerArrayQueue<>(nearest2Power2);
            this.executorServices[i] = this.threadPoolFactory.createSingleThreadPoolExecutor(simpleName + "-" + i);
            int i2 = i;
            this.executorServices[i].submit(() -> {
                BufferedElement bufferedElement;
                Thread currentThread = Thread.currentThread();
                while (!currentThread.isInterrupted() && !this.threadPoolFactoryGracefulShutDownHandler.isShuttingDown()) {
                    try {
                        ArrayList newArrayList = Lists.newArrayList();
                        long currentTimeMillis = System.currentTimeMillis();
                        while (newArrayList.size() < nearest2Power3 && (bufferedElement = (BufferedElement) this.mpscBlockingConsumerArrayQueues[i2].poll(pollWaitTimeInMillis, TimeUnit.MILLISECONDS)) != null) {
                            newArrayList.add(bufferedElement);
                            log.info("BatchBufferedQueue: {} origin traceId: {} spanId: {} add to batch", simpleName, bufferedElement.traceId(), bufferedElement.spanId());
                            if (System.currentTimeMillis() - currentTimeMillis > maxWaitTimeInMillis) {
                                break;
                            }
                        }
                        manipulate(newArrayList);
                    } catch (Throwable th) {
                        log.fatal("BatchBufferedQueue: {} error: {}", simpleName, th.getMessage(), th);
                    }
                }
                log.info("BatchBufferedQueue: {} before shutting down, drain all remaining: {}", simpleName, Integer.valueOf(this.mpscBlockingConsumerArrayQueues[i2].size()));
                ArrayList newArrayList2 = Lists.newArrayList();
                while (true) {
                    BufferedElement bufferedElement2 = (BufferedElement) this.mpscBlockingConsumerArrayQueues[i2].relaxedPoll();
                    if (bufferedElement2 == null) {
                        break;
                    }
                    newArrayList2.add(bufferedElement2);
                    log.info("BatchBufferedQueue: {} origin traceId: {} spanId: {} add to batch", simpleName, bufferedElement2.traceId(), bufferedElement2.spanId());
                    if (newArrayList2.size() >= nearest2Power3) {
                        manipulate(newArrayList2);
                        newArrayList2.clear();
                    }
                }
                if (CollectionUtils.isNotEmpty(newArrayList2)) {
                    manipulate(newArrayList2);
                }
                log.info("BatchBufferedQueue {} shutdown", simpleName);
            });
        }
    }

    public void submit(E e) {
        TraceContext traceContext = UnifiedObservationFactory.getTraceContext(this.unifiedObservationFactory.getCurrentOrCreateEmptyObservation());
        e.setSubmitInfo(traceContext == null ? null : traceContext.traceId(), traceContext == null ? null : traceContext.spanId());
        log.info("BatchBufferedQueue-sumbit: {} -> {}", getClass().getSimpleName(), e.hashKey());
        String hashKey = e.hashKey();
        int length = this.mpscBlockingConsumerArrayQueues.length;
        this.mpscBlockingConsumerArrayQueues[StringUtils.isNotBlank(hashKey) ? hashKey.hashCode() & (length - 1) : Math.abs(this.counter.incrementAndGet() & (length - 1))].offer(e);
    }

    private static int getNearest2Power(int i) {
        if ((i & (i - 1)) == 0) {
            return i;
        }
        int i2 = i | (i >>> 1);
        int i3 = i2 | (i2 >>> 2);
        int i4 = i3 | (i3 >>> 4);
        int i5 = i4 | (i4 >>> 8);
        return (i5 | (i5 >>> 16)) + 1;
    }
}
