package com.github.ddth.queue.impl;

import com.github.ddth.queue.IQueueMessage;
import com.github.ddth.queue.utils.QueueException;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/* loaded from: input_file:com/github/ddth/queue/impl/DisruptorQueue.class */
public class DisruptorQueue extends AbstractEphemeralSupportQueue {
    private static final EventFactory<Event> EVENT_FACTORY = new EventFactory<Event>() { // from class: com.github.ddth.queue.impl.DisruptorQueue.1
        /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
        public Event m2newInstance() {
            return new Event();
        }
    };
    private ConcurrentMap<Object, IQueueMessage> ephemeralStorage;
    private RingBuffer<Event> ringBuffer;
    private Sequence consumedSeq;
    private long knownPublishedSeq;
    private int ringSize = 1024;
    private Lock LOCK_TAKE = new ReentrantLock();
    private Lock LOCK_PUT = new ReentrantLock();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/ddth/queue/impl/DisruptorQueue$Event.class */
    public static final class Event {
        private IQueueMessage value;

        private Event() {
        }

        public void set(IQueueMessage iQueueMessage) {
            this.value = iQueueMessage;
        }

        public IQueueMessage get() {
            return this.value;
        }
    }

    public DisruptorQueue() {
    }

    public DisruptorQueue(int i) {
        setRingSize(i);
    }

    protected int getRingSize() {
        return this.ringSize;
    }

    private static int nextPowerOf2(int i) {
        if (i < 2) {
            return 2;
        }
        int i2 = i - 1;
        int i3 = i2 | (i2 >> 1);
        int i4 = i3 | (i3 >> 2);
        int i5 = i4 | (i4 >> 4);
        int i6 = i5 | (i5 >> 8);
        return (i6 | (i6 >> 16)) + 1;
    }

    public DisruptorQueue setRingSize(int i) {
        this.ringSize = nextPowerOf2(i);
        return this;
    }

    @Override // com.github.ddth.queue.impl.AbstractQueue
    public DisruptorQueue init() {
        this.ringBuffer = RingBuffer.createSingleProducer(EVENT_FACTORY, this.ringSize);
        if (!isEphemeralDisabled()) {
            int max = Math.max(0, getEphemeralMaxSize());
            this.ephemeralStorage = new ConcurrentHashMap(max > 0 ? Math.min(max, this.ringSize) : this.ringSize);
        }
        this.consumedSeq = new Sequence();
        this.ringBuffer.addGatingSequences(new Sequence[]{this.consumedSeq});
        long cursor = this.ringBuffer.getCursor();
        this.consumedSeq.set(cursor);
        this.knownPublishedSeq = cursor;
        return this;
    }

    @Override // com.github.ddth.queue.impl.AbstractQueue
    public void destroy() {
    }

    protected void publish(IQueueMessage iQueueMessage, long j) {
        ((Event) this.ringBuffer.get(j)).set(iQueueMessage);
        this.ringBuffer.publish(j);
    }

    protected void putToRingBuffer(IQueueMessage iQueueMessage) throws QueueException.QueueIsFull {
        this.LOCK_PUT.lock();
        try {
            try {
                publish(iQueueMessage, this.ringBuffer.tryNext());
                this.LOCK_PUT.unlock();
            } catch (InsufficientCapacityException e) {
                throw new QueueException.QueueIsFull(getRingSize());
            }
        } catch (Throwable th) {
            this.LOCK_PUT.unlock();
            throw th;
        }
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean queue(IQueueMessage iQueueMessage) throws QueueException.QueueIsFull {
        IQueueMessage mo5clone = iQueueMessage.mo5clone();
        Date date = new Date();
        mo5clone.qNumRequeues(0).qOriginalTimestamp(date).qTimestamp(date);
        putToRingBuffer(mo5clone);
        return true;
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean requeue(IQueueMessage iQueueMessage) throws QueueException.QueueIsFull {
        IQueueMessage mo5clone = iQueueMessage.mo5clone();
        mo5clone.qIncNumRequeues().qTimestamp(new Date());
        putToRingBuffer(mo5clone);
        if (isEphemeralDisabled()) {
            return true;
        }
        this.ephemeralStorage.remove(mo5clone.qId());
        return true;
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean requeueSilent(IQueueMessage iQueueMessage) throws QueueException.QueueIsFull {
        IQueueMessage mo5clone = iQueueMessage.mo5clone();
        putToRingBuffer(mo5clone);
        if (isEphemeralDisabled()) {
            return true;
        }
        this.ephemeralStorage.remove(mo5clone.qId());
        return true;
    }

    @Override // com.github.ddth.queue.IQueue
    public void finish(IQueueMessage iQueueMessage) {
        if (isEphemeralDisabled()) {
            return;
        }
        this.ephemeralStorage.remove(iQueueMessage.qId());
    }

    protected void updatePublishedSequence() {
        long cursor = this.ringBuffer.getCursor();
        if (cursor >= this.knownPublishedSeq + 1) {
            long j = cursor;
            long j2 = this.knownPublishedSeq;
            while (true) {
                long j3 = j2 + 1;
                if (j3 > cursor) {
                    break;
                }
                if (!this.ringBuffer.isPublished(j3)) {
                    j = j3 - 1;
                    break;
                }
                j2 = j3;
            }
            this.knownPublishedSeq = j;
        }
    }

    protected IQueueMessage takeFromRingBuffer() {
        this.LOCK_TAKE.lock();
        try {
            long j = this.consumedSeq.get() + 1;
            if (j > this.knownPublishedSeq) {
                updatePublishedSequence();
            }
            if (j > this.knownPublishedSeq) {
                return null;
            }
            IQueueMessage iQueueMessage = ((Event) this.ringBuffer.get(j)).get();
            this.consumedSeq.incrementAndGet();
            this.LOCK_TAKE.unlock();
            return iQueueMessage;
        } finally {
            this.LOCK_TAKE.unlock();
        }
    }

    @Override // com.github.ddth.queue.IQueue
    public IQueueMessage take() throws QueueException.EphemeralIsFull {
        int ephemeralMaxSize;
        if (!isEphemeralDisabled() && (ephemeralMaxSize = getEphemeralMaxSize()) > 0 && this.ephemeralStorage.size() >= ephemeralMaxSize) {
            throw new QueueException.EphemeralIsFull(ephemeralMaxSize);
        }
        IQueueMessage takeFromRingBuffer = takeFromRingBuffer();
        if (takeFromRingBuffer != null && !isEphemeralDisabled()) {
            this.ephemeralStorage.putIfAbsent(takeFromRingBuffer.qId(), takeFromRingBuffer);
        }
        return takeFromRingBuffer;
    }

    @Override // com.github.ddth.queue.IQueue
    public Collection<IQueueMessage> getOrphanMessages(long j) {
        if (!isEphemeralDisabled()) {
            return null;
        }
        HashSet hashSet = new HashSet();
        long currentTimeMillis = System.currentTimeMillis();
        Iterator<Map.Entry<Object, IQueueMessage>> it = this.ephemeralStorage.entrySet().iterator();
        while (it.hasNext()) {
            IQueueMessage value = it.next().getValue();
            if (value.qOriginalTimestamp().getTime() + j < currentTimeMillis) {
                hashSet.add(value);
            }
        }
        return hashSet;
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean moveFromEphemeralToQueueStorage(IQueueMessage iQueueMessage) {
        IQueueMessage remove;
        if (isEphemeralDisabled() || (remove = this.ephemeralStorage.remove(iQueueMessage.qId())) == null) {
            return true;
        }
        try {
            putToRingBuffer(remove);
            return true;
        } catch (QueueException.QueueIsFull e) {
            this.ephemeralStorage.putIfAbsent(remove.qId(), remove);
            return false;
        }
    }

    @Override // com.github.ddth.queue.IQueue
    public int queueSize() {
        return (int) (this.ringBuffer.getCursor() - this.consumedSeq.get());
    }

    @Override // com.github.ddth.queue.IQueue
    public int ephemeralSize() {
        if (isEphemeralDisabled()) {
            return 0;
        }
        return this.ephemeralStorage.size();
    }
}
