package com.github.ddth.queue.impl;

import com.github.ddth.queue.IQueueMessage;
import com.github.ddth.queue.utils.QueueException;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/* loaded from: input_file:com/github/ddth/queue/impl/DisruptorQueue.class */
public class DisruptorQueue<ID, DATA> extends AbstractEphemeralSupportQueue<ID, DATA> {
    private ConcurrentMap<Object, IQueueMessage<ID, DATA>> ephemeralStorage;
    private RingBuffer<Event<ID, DATA>> ringBuffer;
    private Sequence consumedSeq;
    private long knownPublishedSeq;
    private final EventFactory<Event<ID, DATA>> EVENT_FACTORY = () -> {
        return new Event();
    };
    private final Lock LOCK_TAKE = new ReentrantLock();
    private final Lock LOCK_PUT = new ReentrantLock();
    private int ringSize = 1024;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/ddth/queue/impl/DisruptorQueue$Event.class */
    public static final class Event<ID, DATA> {
        private IQueueMessage<ID, DATA> value;

        private Event() {
        }

        public void set(IQueueMessage<ID, DATA> iQueueMessage) {
            this.value = iQueueMessage;
        }

        public IQueueMessage<ID, DATA> get() {
            return this.value;
        }
    }

    public DisruptorQueue() {
    }

    public DisruptorQueue(int i) {
        setRingSize(i);
    }

    protected int getRingSize() {
        return this.ringSize;
    }

    private static int nextPowerOf2(int i) {
        if (i < 2) {
            return 2;
        }
        int i2 = i - 1;
        int i3 = i2 | (i2 >> 1);
        int i4 = i3 | (i3 >> 2);
        int i5 = i4 | (i4 >> 4);
        int i6 = i5 | (i5 >> 8);
        return (i6 | (i6 >> 16)) + 1;
    }

    public DisruptorQueue<ID, DATA> setRingSize(int i) {
        this.ringSize = nextPowerOf2(i);
        return this;
    }

    @Override // com.github.ddth.queue.impl.AbstractQueue
    public DisruptorQueue<ID, DATA> init() {
        this.ringBuffer = RingBuffer.createSingleProducer(this.EVENT_FACTORY, this.ringSize);
        if (!isEphemeralDisabled()) {
            int max = Math.max(0, getEphemeralMaxSize());
            this.ephemeralStorage = new ConcurrentHashMap(max > 0 ? Math.min(max, this.ringSize) : this.ringSize);
        }
        this.consumedSeq = new Sequence();
        this.ringBuffer.addGatingSequences(new Sequence[]{this.consumedSeq});
        long cursor = this.ringBuffer.getCursor();
        this.consumedSeq.set(cursor);
        this.knownPublishedSeq = cursor;
        return this;
    }

    @Override // com.github.ddth.queue.impl.AbstractQueue
    public void destroy() {
    }

    protected void putToRingBuffer(IQueueMessage<ID, DATA> iQueueMessage) throws QueueException.QueueIsFull {
        if (iQueueMessage == null) {
            throw new NullPointerException("Supplied queue message is null!");
        }
        this.LOCK_PUT.lock();
        try {
            if (this.ringBuffer.tryPublishEvent((event, j) -> {
                event.set(iQueueMessage);
                this.knownPublishedSeq = j > this.knownPublishedSeq ? j : this.knownPublishedSeq;
            })) {
            } else {
                throw new QueueException.QueueIsFull(getRingSize());
            }
        } finally {
            this.LOCK_PUT.unlock();
        }
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean queue(IQueueMessage<ID, DATA> iQueueMessage) throws QueueException.QueueIsFull {
        IQueueMessage<ID, DATA> mo0clone = iQueueMessage.mo0clone();
        Date date = new Date();
        mo0clone.qNumRequeues2(0).qOriginalTimestamp2(date).qTimestamp2(date);
        putToRingBuffer(mo0clone);
        return true;
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean requeue(IQueueMessage<ID, DATA> iQueueMessage) throws QueueException.QueueIsFull {
        IQueueMessage<ID, DATA> mo0clone = iQueueMessage.mo0clone();
        mo0clone.qIncNumRequeues2().qTimestamp2(new Date());
        putToRingBuffer(mo0clone);
        if (isEphemeralDisabled()) {
            return true;
        }
        this.ephemeralStorage.remove(mo0clone.qId());
        return true;
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean requeueSilent(IQueueMessage<ID, DATA> iQueueMessage) throws QueueException.QueueIsFull {
        IQueueMessage<ID, DATA> mo0clone = iQueueMessage.mo0clone();
        putToRingBuffer(mo0clone);
        if (isEphemeralDisabled()) {
            return true;
        }
        this.ephemeralStorage.remove(mo0clone.qId());
        return true;
    }

    @Override // com.github.ddth.queue.IQueue
    public void finish(IQueueMessage<ID, DATA> iQueueMessage) {
        if (isEphemeralDisabled()) {
            return;
        }
        this.ephemeralStorage.remove(iQueueMessage.qId());
    }

    protected IQueueMessage<ID, DATA> takeFromRingBuffer() {
        this.LOCK_TAKE.lock();
        try {
            long j = this.consumedSeq.get() + 1;
            if (j > this.knownPublishedSeq) {
                this.knownPublishedSeq = this.ringBuffer.getCursor();
                this.LOCK_TAKE.unlock();
                return null;
            }
            try {
                Event event = (Event) this.ringBuffer.get(j);
                try {
                    IQueueMessage<ID, DATA> iQueueMessage = event.get();
                    event.set(null);
                    this.consumedSeq.incrementAndGet();
                    this.LOCK_TAKE.unlock();
                    return iQueueMessage;
                } catch (Throwable th) {
                    event.set(null);
                    throw th;
                }
            } catch (Throwable th2) {
                this.consumedSeq.incrementAndGet();
                throw th2;
            }
        } catch (Throwable th3) {
            this.LOCK_TAKE.unlock();
            throw th3;
        }
    }

    @Override // com.github.ddth.queue.IQueue
    public IQueueMessage<ID, DATA> take() throws QueueException.EphemeralIsFull {
        int ephemeralMaxSize;
        if (!isEphemeralDisabled() && (ephemeralMaxSize = getEphemeralMaxSize()) > 0 && this.ephemeralStorage.size() >= ephemeralMaxSize) {
            throw new QueueException.EphemeralIsFull(ephemeralMaxSize);
        }
        IQueueMessage<ID, DATA> takeFromRingBuffer = takeFromRingBuffer();
        if (takeFromRingBuffer != null && !isEphemeralDisabled()) {
            this.ephemeralStorage.putIfAbsent(takeFromRingBuffer.qId(), takeFromRingBuffer);
        }
        return takeFromRingBuffer;
    }

    @Override // com.github.ddth.queue.IQueue
    public Collection<IQueueMessage<ID, DATA>> getOrphanMessages(long j) {
        if (isEphemeralDisabled()) {
            return null;
        }
        HashSet hashSet = new HashSet();
        long currentTimeMillis = System.currentTimeMillis();
        this.ephemeralStorage.forEach((obj, iQueueMessage) -> {
            if (iQueueMessage.qTimestamp().getTime() + j < currentTimeMillis) {
                hashSet.add(iQueueMessage);
            }
        });
        return hashSet;
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean moveFromEphemeralToQueueStorage(IQueueMessage<ID, DATA> iQueueMessage) {
        IQueueMessage<ID, DATA> remove;
        if (isEphemeralDisabled() || (remove = this.ephemeralStorage.remove(iQueueMessage.qId())) == null) {
            return true;
        }
        try {
            putToRingBuffer(remove);
            return true;
        } catch (QueueException.QueueIsFull e) {
            this.ephemeralStorage.putIfAbsent(remove.qId(), remove);
            return false;
        }
    }

    @Override // com.github.ddth.queue.IQueue
    public int queueSize() {
        return (int) (this.ringBuffer.getCursor() - this.consumedSeq.get());
    }

    @Override // com.github.ddth.queue.IQueue
    public int ephemeralSize() {
        if (isEphemeralDisabled()) {
            return 0;
        }
        return this.ephemeralStorage.size();
    }
}
