package com.github.ddth.queue.impl;

import com.github.ddth.queue.IQueue;
import com.github.ddth.queue.IQueueMessage;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import java.io.Closeable;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/* loaded from: input_file:com/github/ddth/queue/impl/DisruptorQueue.class */
public class DisruptorQueue implements IQueue, Closeable, AutoCloseable {
    private static final EventFactory<Event> EVENT_FACTORY = new EventFactory<Event>() { // from class: com.github.ddth.queue.impl.DisruptorQueue.1
        /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
        public Event m4newInstance() {
            return new Event();
        }
    };
    private ConcurrentMap<Object, IQueueMessage> ephemeralStorage;
    private RingBuffer<Event> ringBuffer;
    private Sequence consumedSeq;
    private long knownPublishedSeq;
    private boolean ephemeralDisabled = false;
    private int ringSize = 1024;
    private Lock LOCK_TAKE = new ReentrantLock();
    private Lock LOCK_PUT = new ReentrantLock();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/ddth/queue/impl/DisruptorQueue$Event.class */
    public static final class Event {
        private IQueueMessage value;

        private Event() {
        }

        public void set(IQueueMessage iQueueMessage) {
            this.value = iQueueMessage;
        }

        public IQueueMessage get() {
            return this.value;
        }
    }

    public DisruptorQueue() {
    }

    public DisruptorQueue(int i) {
        setRingSize(i);
    }

    public boolean getEphemeralDisabled() {
        return this.ephemeralDisabled;
    }

    public boolean isEphemeralDisabled() {
        return this.ephemeralDisabled;
    }

    public DisruptorQueue setEphemeralDisabled(boolean z) {
        this.ephemeralDisabled = z;
        return this;
    }

    protected int getRingSize() {
        return this.ringSize;
    }

    private static int nextPowerOf2(int i) {
        if (i < 2) {
            return 2;
        }
        int i2 = i - 1;
        int i3 = i2 | (i2 >> 1);
        int i4 = i3 | (i3 >> 2);
        int i5 = i4 | (i4 >> 4);
        int i6 = i5 | (i5 >> 8);
        return (i6 | (i6 >> 16)) + 1;
    }

    public DisruptorQueue setRingSize(int i) {
        this.ringSize = nextPowerOf2(i);
        return this;
    }

    public DisruptorQueue init() {
        this.ringBuffer = RingBuffer.createSingleProducer(EVENT_FACTORY, this.ringSize);
        if (!this.ephemeralDisabled) {
            this.ephemeralStorage = new ConcurrentHashMap(this.ringSize);
        }
        this.consumedSeq = new Sequence();
        this.ringBuffer.addGatingSequences(new Sequence[]{this.consumedSeq});
        long cursor = this.ringBuffer.getCursor();
        this.consumedSeq.set(cursor);
        this.knownPublishedSeq = cursor;
        return this;
    }

    public void destroy() {
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        destroy();
    }

    protected void publish(IQueueMessage iQueueMessage, long j) {
        ((Event) this.ringBuffer.get(j)).set(iQueueMessage);
        this.ringBuffer.publish(j);
    }

    protected boolean putToRingBuffer(IQueueMessage iQueueMessage) {
        this.LOCK_PUT.lock();
        try {
            try {
                publish(iQueueMessage, this.ringBuffer.tryNext());
                this.LOCK_PUT.unlock();
                return true;
            } finally {
                this.LOCK_PUT.unlock();
            }
        } catch (InsufficientCapacityException e) {
            return false;
        }
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean queue(IQueueMessage iQueueMessage) {
        IQueueMessage mo2clone = iQueueMessage.mo2clone();
        Date date = new Date();
        mo2clone.qNumRequeues(0).qOriginalTimestamp(date).qTimestamp(date);
        return putToRingBuffer(mo2clone);
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean requeue(IQueueMessage iQueueMessage) {
        IQueueMessage mo2clone = iQueueMessage.mo2clone();
        mo2clone.qIncNumRequeues().qTimestamp(new Date());
        if (!putToRingBuffer(mo2clone)) {
            return false;
        }
        if (this.ephemeralDisabled) {
            return true;
        }
        this.ephemeralStorage.remove(mo2clone.qId());
        return true;
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean requeueSilent(IQueueMessage iQueueMessage) {
        if (!putToRingBuffer(iQueueMessage)) {
            return false;
        }
        if (this.ephemeralDisabled) {
            return true;
        }
        this.ephemeralStorage.remove(iQueueMessage.qId());
        return true;
    }

    @Override // com.github.ddth.queue.IQueue
    public void finish(IQueueMessage iQueueMessage) {
        if (this.ephemeralDisabled) {
            return;
        }
        this.ephemeralStorage.remove(iQueueMessage.qId());
    }

    protected void updatePublishedSequence() {
        long cursor = this.ringBuffer.getCursor();
        if (cursor >= this.knownPublishedSeq + 1) {
            long j = cursor;
            long j2 = this.knownPublishedSeq;
            while (true) {
                long j3 = j2 + 1;
                if (j3 > cursor) {
                    break;
                }
                if (!this.ringBuffer.isPublished(j3)) {
                    j = j3 - 1;
                    break;
                }
                j2 = j3;
            }
            this.knownPublishedSeq = j;
        }
    }

    protected IQueueMessage takeFromRingBuffer() {
        this.LOCK_TAKE.lock();
        try {
            long j = this.consumedSeq.get() + 1;
            if (j > this.knownPublishedSeq) {
                updatePublishedSequence();
            }
            if (j > this.knownPublishedSeq) {
                return null;
            }
            IQueueMessage iQueueMessage = ((Event) this.ringBuffer.get(j)).get();
            this.consumedSeq.incrementAndGet();
            this.LOCK_TAKE.unlock();
            return iQueueMessage;
        } finally {
            this.LOCK_TAKE.unlock();
        }
    }

    @Override // com.github.ddth.queue.IQueue
    public IQueueMessage take() {
        IQueueMessage takeFromRingBuffer = takeFromRingBuffer();
        if (takeFromRingBuffer != null && !this.ephemeralDisabled) {
            this.ephemeralStorage.putIfAbsent(takeFromRingBuffer.qId(), takeFromRingBuffer);
        }
        return takeFromRingBuffer;
    }

    @Override // com.github.ddth.queue.IQueue
    public Collection<IQueueMessage> getOrphanMessages(long j) {
        if (this.ephemeralDisabled) {
            return null;
        }
        HashSet hashSet = new HashSet();
        long currentTimeMillis = System.currentTimeMillis();
        Iterator<Map.Entry<Object, IQueueMessage>> it = this.ephemeralStorage.entrySet().iterator();
        while (it.hasNext()) {
            IQueueMessage value = it.next().getValue();
            if (value.qOriginalTimestamp().getTime() + j < currentTimeMillis) {
                hashSet.add(value);
            }
        }
        return hashSet;
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean moveFromEphemeralToQueueStorage(IQueueMessage iQueueMessage) {
        IQueueMessage remove;
        if (this.ephemeralDisabled || (remove = this.ephemeralStorage.remove(iQueueMessage.qId())) == null) {
            return false;
        }
        if (putToRingBuffer(remove)) {
            return true;
        }
        this.ephemeralStorage.putIfAbsent(remove.qId(), remove);
        return false;
    }

    @Override // com.github.ddth.queue.IQueue
    public int queueSize() {
        return (int) (this.ringBuffer.getCursor() - this.consumedSeq.get());
    }

    @Override // com.github.ddth.queue.IQueue
    public int ephemeralSize() {
        if (this.ephemeralDisabled) {
            return -1;
        }
        return this.ephemeralStorage.size();
    }
}
