package com.github.ddth.queue.impl;

import com.github.ddth.queue.IQueueMessage;
import com.github.ddth.queue.impl.AbstractQueue;
import com.github.ddth.queue.utils.QueueException;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/* loaded from: input_file:com/github/ddth/queue/impl/DisruptorQueue.class */
public class DisruptorQueue<ID, DATA> extends AbstractInmemEphemeralQueue<ID, DATA> {
    private RingBuffer<Event<ID, DATA>> ringBuffer;
    private Sequence consumedSeq;
    private long knownPublishedSeq;
    private final EventFactory<Event<ID, DATA>> EVENT_FACTORY = () -> {
        return new Event();
    };
    private final Lock LOCK_TAKE = new ReentrantLock();
    private final Lock LOCK_PUT = new ReentrantLock();
    private int ringSize = 1024;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/ddth/queue/impl/DisruptorQueue$Event.class */
    public static final class Event<ID, DATA> {
        private IQueueMessage<ID, DATA> value;

        private Event() {
        }

        public void set(IQueueMessage<ID, DATA> iQueueMessage) {
            this.value = iQueueMessage;
        }

        public IQueueMessage<ID, DATA> get() {
            return this.value;
        }
    }

    public DisruptorQueue() {
    }

    public DisruptorQueue(int i) {
        setRingSize(i);
    }

    protected int getRingSize() {
        return this.ringSize;
    }

    private static int nextPowerOf2(int i) {
        if (i < 2) {
            return 2;
        }
        int i2 = i - 1;
        int i3 = i2 | (i2 >> 1);
        int i4 = i3 | (i3 >> 2);
        int i5 = i4 | (i4 >> 4);
        int i6 = i5 | (i5 >> 8);
        return (i6 | (i6 >> 16)) + 1;
    }

    public DisruptorQueue<ID, DATA> setRingSize(int i) {
        this.ringSize = nextPowerOf2(i);
        return this;
    }

    @Override // com.github.ddth.queue.impl.AbstractQueue
    public DisruptorQueue<ID, DATA> init() throws Exception {
        this.ringBuffer = RingBuffer.createSingleProducer(this.EVENT_FACTORY, this.ringSize);
        initEphemeralStorage(this.ringSize);
        this.consumedSeq = new Sequence();
        this.ringBuffer.addGatingSequences(new Sequence[]{this.consumedSeq});
        long cursor = this.ringBuffer.getCursor();
        this.consumedSeq.set(cursor);
        this.knownPublishedSeq = cursor;
        super.init();
        return this;
    }

    @Override // com.github.ddth.queue.impl.AbstractQueue
    protected boolean doPutToQueue(IQueueMessage<ID, DATA> iQueueMessage, AbstractQueue.PutToQueueCase putToQueueCase) throws QueueException.QueueIsFull {
        this.LOCK_PUT.lock();
        try {
            if (this.ringBuffer.tryPublishEvent((event, j) -> {
                event.set(iQueueMessage);
                this.knownPublishedSeq = j > this.knownPublishedSeq ? j : this.knownPublishedSeq;
                if (putToQueueCase == null || putToQueueCase == AbstractQueue.PutToQueueCase.NEW) {
                    return;
                }
                doRemoveFromEphemeralStorage(iQueueMessage);
            })) {
                return true;
            }
            throw new QueueException.QueueIsFull(getRingSize());
        } finally {
            this.LOCK_PUT.unlock();
        }
    }

    @Override // com.github.ddth.queue.IQueue
    public void finish(IQueueMessage<ID, DATA> iQueueMessage) {
        doRemoveFromEphemeralStorage(iQueueMessage);
    }

    protected IQueueMessage<ID, DATA> takeFromRingBuffer() {
        this.LOCK_TAKE.lock();
        try {
            long j = this.consumedSeq.get() + 1;
            if (j > this.knownPublishedSeq) {
                this.knownPublishedSeq = this.ringBuffer.getCursor();
                this.LOCK_TAKE.unlock();
                return null;
            }
            try {
                Event event = (Event) this.ringBuffer.get(j);
                try {
                    IQueueMessage<ID, DATA> iQueueMessage = event.get();
                    event.set(null);
                    this.consumedSeq.incrementAndGet();
                    this.LOCK_TAKE.unlock();
                    return iQueueMessage;
                } catch (Throwable th) {
                    event.set(null);
                    throw th;
                }
            } catch (Throwable th2) {
                this.consumedSeq.incrementAndGet();
                throw th2;
            }
        } catch (Throwable th3) {
            this.LOCK_TAKE.unlock();
            throw th3;
        }
    }

    @Override // com.github.ddth.queue.IQueue
    public IQueueMessage<ID, DATA> take() throws QueueException.EphemeralIsFull {
        ensureEphemeralSize();
        IQueueMessage<ID, DATA> takeFromRingBuffer = takeFromRingBuffer();
        doPutToEphemeralStorage(takeFromRingBuffer);
        return takeFromRingBuffer;
    }

    @Override // com.github.ddth.queue.IQueue
    public int queueSize() {
        return (int) (this.ringBuffer.getCursor() - this.consumedSeq.get());
    }
}
