package net.sf.sprtool.recordevent.postgres.impl;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import net.sf.sprtool.recordevent.postgres.impl.EventProcessorWorker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/sf/sprtool/recordevent/postgres/impl/StatelessEventProcessorWorker.class */
public class StatelessEventProcessorWorker extends EventProcessorWorker {
    private static Logger logger = LoggerFactory.getLogger(StatelessEventProcessorWorker.class);
    private volatile boolean fromQuery;
    private volatile long lastEventId;
    private AtomicLong queueSequence;
    private volatile long lastQueueIndex;
    private BlockingQueue<QueueEvent> eventQueue;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:net/sf/sprtool/recordevent/postgres/impl/StatelessEventProcessorWorker$QueueEvent.class */
    public static class QueueEvent {
        long index;
        Map event;

        QueueEvent(long j, Map map) {
            this.index = j;
            this.event = map;
        }
    }

    public StatelessEventProcessorWorker(EventProcessorWrapper eventProcessorWrapper, long j) {
        super(eventProcessorWrapper);
        this.lastEventId = 0L;
        this.queueSequence = new AtomicLong(0L);
        this.lastEventId = j;
    }

    @Override // net.sf.sprtool.recordevent.postgres.impl.EventProcessorWorker
    public void init() {
        super.init();
        this.eventQueue = new ArrayBlockingQueue(this.eventProcessorWrapper.getProcessor().getBatchSize() * 3);
    }

    @Override // net.sf.sprtool.recordevent.postgres.impl.EventProcessorWorker
    protected void grantedPartitionChanged(boolean z) {
    }

    @Override // net.sf.sprtool.recordevent.postgres.impl.EventProcessorWorker
    protected void eventProcessed(List<Long> list) {
    }

    @Override // net.sf.sprtool.recordevent.postgres.impl.EventProcessorWorker
    public boolean notifyEvent(Map map, long j) {
        if (j > 0) {
            this.queueSequence.addAndGet(j);
            return false;
        }
        return this.eventQueue.offer(new QueueEvent(this.queueSequence.incrementAndGet(), map));
    }

    @Override // net.sf.sprtool.recordevent.postgres.impl.EventProcessorWorker
    protected void processEvents(Consumer<EventProcessorWorker.ProcessResult> consumer) throws Exception {
        if (this.fromQuery) {
            queryEvents(consumer);
        } else {
            consumeQueue(consumer);
        }
    }

    private void queryEvents(Consumer<EventProcessorWorker.ProcessResult> consumer) throws InterruptedException {
        long longValue;
        Set<Integer> allocatedPartitions = getAllocatedPartitions();
        if (allocatedPartitions.size() == 0) {
            consumer.accept(new EventProcessorWorker.ProcessResult(true));
            return;
        }
        List<Map> list = (List) this.eventProcessorWrapper.getJdbc().callStatement(this.eventProcessorWrapper.getSqls().getStatelessQuerySql(), preparedStatement -> {
            preparedStatement.setInt(1, this.eventProcessorWrapper.getId());
            preparedStatement.setArray(2, preparedStatement.getConnection().createArrayOf("int4", allocatedPartitions.toArray(new Integer[0])));
            preparedStatement.setObject(3, Long.valueOf(this.lastEventId));
            this.eventProcessorWrapper.getJdbc();
            return JdbcConnection.resultSetToList(preparedStatement.executeQuery());
        });
        if (list.size() == 0 && this.eventQueue.size() == 0) {
            this.fromQuery = false;
            consumer.accept(new EventProcessorWorker.ProcessResult(true));
            return;
        }
        if (list.size() > 0) {
            doProcessEvents(list, true, consumer);
            this.lastEventId = ((Long) list.get(list.size() - 1).get("id")).longValue();
        }
        do {
            QueueEvent poll = this.eventQueue.poll(0L, TimeUnit.MILLISECONDS);
            if (poll == null || this.destroyed) {
                consumer.accept(new EventProcessorWorker.ProcessResult(true));
                return;
            } else {
                longValue = ((Long) poll.event.get("id")).longValue();
                this.lastQueueIndex = poll.index;
            }
        } while (longValue < this.lastEventId);
        if (longValue != this.lastEventId) {
            consumer.accept(new EventProcessorWorker.ProcessResult(false));
        } else {
            this.fromQuery = false;
            consumer.accept(new EventProcessorWorker.ProcessResult(false));
        }
    }

    private void consumeQueue(Consumer<EventProcessorWorker.ProcessResult> consumer) throws InterruptedException {
        int batchSize = this.eventProcessorWrapper.getProcessor().getBatchSize();
        ArrayList arrayList = new ArrayList(batchSize);
        int i = 0;
        while (true) {
            if (i >= batchSize) {
                break;
            }
            if (this.destroyed) {
                this.fromQuery = true;
                consumer.accept(new EventProcessorWorker.ProcessResult(true));
                return;
            }
            QueueEvent poll = this.eventQueue.poll(0L, TimeUnit.MILLISECONDS);
            if (poll == null) {
                break;
            }
            this.lastQueueIndex++;
            if (poll.index != this.lastQueueIndex) {
                this.fromQuery = true;
                break;
            } else {
                arrayList.add(poll.event);
                i++;
            }
        }
        if (arrayList.size() <= 0) {
            consumer.accept(new EventProcessorWorker.ProcessResult(true));
        } else {
            doProcessEvents(arrayList, true, consumer);
            this.lastEventId = ((Long) arrayList.get(arrayList.size() - 1).get("id")).longValue();
        }
    }
}
