package org.apache.activemq.artemis.core.paging.cursor.impl;

import io.netty.util.collection.LongObjectHashMap;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
import org.apache.activemq.artemis.core.paging.cursor.PagedReferenceImpl;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.routing.KeyResolver;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.ArtemisCloseable;
import org.apache.activemq.artemis.utils.SimpleFutureImpl;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
import org.apache.activemq.artemis.utils.collections.LinkedList;
import org.apache.activemq.artemis.utils.collections.LongHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.class */
public class PageCursorProviderImpl implements PageCursorProvider {
    private static final Logger logger;
    protected final PagingStore pagingStore;
    protected final StorageManager storageManager;
    private static final long PAGE_READ_TIMEOUT_NS;
    private static final long CONCURRENT_PAGE_READ_TIMEOUT_NS;
    private static final long PAGE_READ_PERMISSION_TIMEOUT_NS;
    static final /* synthetic */ boolean $assertionsDisabled;
    protected final AtomicInteger scheduledCleanup = new AtomicInteger(0);
    protected volatile boolean cleanupEnabled = true;
    protected volatile boolean rebuildDone = true;
    private final ConcurrentLongHashMap<PageSubscription> activeCursors = new ConcurrentLongHashMap<>();

    public PageCursorProviderImpl(PagingStore pagingStore, StorageManager storageManager) {
        this.pagingStore = pagingStore;
        this.storageManager = storageManager;
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public synchronized PageSubscription createSubscription(long j, Filter filter, boolean z) {
        if (logger.isTraceEnabled()) {
            logger.trace("{} creating subscription {} {} with filter {}", this.pagingStore.getAddress(), Long.valueOf(j), filter);
        }
        if (this.activeCursors.containsKey(j)) {
            throw new IllegalStateException("Cursor " + j + " had already been created");
        }
        PageSubscriptionImpl pageSubscriptionImpl = new PageSubscriptionImpl(this, this.pagingStore, this.storageManager, filter, j, z, createPageCounter(j, z));
        this.activeCursors.put(j, pageSubscriptionImpl);
        return pageSubscriptionImpl;
    }

    private PageSubscriptionCounter createPageCounter(long j, boolean z) {
        return new PageSubscriptionCounterImpl(this.storageManager, j);
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public synchronized PageSubscription getSubscription(long j) {
        return this.activeCursors.get(j);
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public void forEachSubscription(Consumer<PageSubscription> consumer) {
        this.activeCursors.forEach((j, pageSubscription) -> {
            consumer.accept(pageSubscription);
        });
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public PagedReference newReference(PagedMessage pagedMessage, PageSubscription pageSubscription) {
        return new PagedReferenceImpl(pagedMessage, pageSubscription);
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public void processReload() throws Exception {
        List<PageSubscription> values = this.activeCursors.values();
        Iterator<PageSubscription> it = values.iterator();
        while (it.hasNext()) {
            it.next().processReload();
        }
        if (!values.isEmpty()) {
            long checkMinPage = checkMinPage(values);
            if (checkMinPage != Long.MAX_VALUE) {
                long firstPage = this.pagingStore.getFirstPage();
                while (true) {
                    long j = firstPage;
                    if (j >= checkMinPage) {
                        break;
                    }
                    Iterator<PageSubscription> it2 = values.iterator();
                    while (it2.hasNext()) {
                        it2.next().reloadPageInfo(j);
                    }
                    firstPage = j + 1;
                }
            }
        }
        cleanup();
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public void stop() {
        Iterator<PageSubscription> it = this.activeCursors.values().iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
        int i = this.scheduledCleanup.get();
        if (i > 0) {
            logger.trace("Stopping with {} cleanup tasks to be completed yet", Integer.valueOf(i));
        }
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public void counterSnapshot() {
        Iterator<PageSubscription> it = this.activeCursors.values().iterator();
        while (it.hasNext()) {
            it.next().counterSnapshot();
        }
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public void flushExecutors() {
        this.pagingStore.flushExecutors();
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public void close(PageSubscription pageSubscription) {
        this.activeCursors.remove(pageSubscription.getId());
        scheduleCleanup();
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public Future<Boolean> scheduleCleanup() {
        final SimpleFutureImpl simpleFutureImpl = new SimpleFutureImpl();
        if (!this.cleanupEnabled || this.scheduledCleanup.intValue() > 2) {
            this.pagingStore.execute(() -> {
                simpleFutureImpl.set(true);
            });
            return simpleFutureImpl;
        }
        this.scheduledCleanup.incrementAndGet();
        this.pagingStore.execute(new Runnable() { // from class: org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl.1
            @Override // java.lang.Runnable
            public void run() {
                PageCursorProviderImpl.this.storageManager.setContext(PageCursorProviderImpl.this.storageManager.newSingleThreadContext());
                try {
                    if (PageCursorProviderImpl.this.cleanupEnabled) {
                        PageCursorProviderImpl.this.cleanup();
                    }
                } finally {
                    PageCursorProviderImpl.this.storageManager.clearContext();
                    PageCursorProviderImpl.this.scheduledCleanup.decrementAndGet();
                    simpleFutureImpl.set(Boolean.valueOf(true));
                }
            }
        });
        return simpleFutureImpl;
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public void onPageModeCleared() {
        ArrayList<PageSubscription> cloneSubscriptions = cloneSubscriptions();
        TransactionImpl transactionImpl = new TransactionImpl(this.storageManager);
        Iterator<PageSubscription> it = cloneSubscriptions.iterator();
        while (it.hasNext()) {
            PageSubscription next = it.next();
            try {
                next.onPageModeCleared(transactionImpl);
            } catch (Exception e) {
                ActiveMQServerLogger.LOGGER.errorCleaningPagingOnQueue(next.getQueue().getName().toString(), e);
            }
        }
        try {
            transactionImpl.commit();
        } catch (Exception e2) {
            ActiveMQServerLogger.LOGGER.errorCleaningPagingDuringCommit(e2);
        }
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public void disableCleanup() {
        this.cleanupEnabled = false;
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public void resumeCleanup() {
        this.cleanupEnabled = true;
        scheduleCleanup();
    }

    private long getNumberOfMessagesOnSubscriptions() {
        AtomicLong atomicLong = new AtomicLong();
        this.activeCursors.forEach((j, pageSubscription) -> {
            long value = pageSubscription.getCounter().getValue();
            if (value > atomicLong.get()) {
                atomicLong.set(value);
            }
        });
        return atomicLong.get();
    }

    void checkClearPageLimit() {
        this.pagingStore.checkPageLimit(getNumberOfMessagesOnSubscriptions());
    }

    protected void cleanup() {
        if (!this.rebuildDone) {
            logger.debug("Counters were not rebuilt yet, cleanup has to be ignored on address {}", this.pagingStore != null ? this.pagingStore.getAddress() : KeyResolver.NULL_KEY_VALUE);
            return;
        }
        ArrayList<Page> arrayList = new ArrayList<>();
        LongHashSet longHashSet = new LongHashSet();
        ArtemisCloseable closeableReadLock = this.storageManager.closeableReadLock();
        do {
            try {
                if (this.pagingStore.lock(100L)) {
                    logger.trace(">>>> Cleanup {}", this.pagingStore.getAddress());
                    synchronized (this) {
                        try {
                            try {
                                if (!this.pagingStore.isStarted()) {
                                    logger.trace("Paging store is not started");
                                    logger.trace("<<<< Cleanup end on {}", this.pagingStore.getAddress());
                                    this.pagingStore.unlock();
                                    if (closeableReadLock != null) {
                                        closeableReadLock.close();
                                        return;
                                    }
                                    return;
                                }
                                if (!this.pagingStore.isPaging()) {
                                    logger.trace("Paging Store was not paging, so no reason to retry the cleanup");
                                    logger.trace("<<<< Cleanup end on {}", this.pagingStore.getAddress());
                                    this.pagingStore.unlock();
                                    if (closeableReadLock != null) {
                                        closeableReadLock.close();
                                        return;
                                    }
                                    return;
                                }
                                ArrayList<PageSubscription> cloneSubscriptions = cloneSubscriptions();
                                long checkMinPage = checkMinPage(cloneSubscriptions);
                                long firstPage = this.pagingStore.getFirstPage();
                                deliverIfNecessary(cloneSubscriptions, checkMinPage);
                                if (logger.isTraceEnabled()) {
                                    logger.trace("firstPage={}, minPage={}, currentWritingPage={}", Long.valueOf(firstPage), Long.valueOf(checkMinPage), Long.valueOf(this.pagingStore.getCurrentWritingPage()));
                                }
                                cleanupRegularStream(arrayList, longHashSet, cloneSubscriptions, checkMinPage, firstPage);
                                cleanupMiddleStream(arrayList, longHashSet, cloneSubscriptions, checkMinPage, firstPage);
                                if (this.pagingStore.isPageFull()) {
                                    checkClearPageLimit();
                                }
                                if (!$assertionsDisabled && this.pagingStore.getNumberOfPages() < 0) {
                                    throw new AssertionError();
                                }
                                if (this.pagingStore.getNumberOfPages() == 0 || (this.pagingStore.getNumberOfPages() == 1 && (this.pagingStore.getCurrentPage() == null || this.pagingStore.getCurrentPage().getNumberOfMessages() == 0))) {
                                    logger.trace("StopPaging being called on {}", this.pagingStore);
                                    this.pagingStore.stopPaging();
                                } else if (logger.isTraceEnabled()) {
                                    logger.trace("Couldn't cleanup page on address {} as numberOfPages == {}  and currentPage.numberOfMessages = {}", this.pagingStore.getAddress(), Long.valueOf(this.pagingStore.getNumberOfPages()), Integer.valueOf(this.pagingStore.getCurrentPage().getNumberOfMessages()));
                                }
                                logger.trace("<<<< Cleanup end on {}", this.pagingStore.getAddress());
                                this.pagingStore.unlock();
                                if (closeableReadLock != null) {
                                    closeableReadLock.close();
                                }
                                finishCleanup(arrayList);
                                return;
                            } catch (Throwable th) {
                                logger.trace("<<<< Cleanup end on {}", this.pagingStore.getAddress());
                                this.pagingStore.unlock();
                                throw th;
                            }
                        } catch (Throwable th2) {
                            ActiveMQServerLogger.LOGGER.problemCleaningPageAddress(this.pagingStore.getAddress(), th2);
                            logger.warn(th2.getMessage(), th2);
                            logger.trace("<<<< Cleanup end on {}", this.pagingStore.getAddress());
                            this.pagingStore.unlock();
                            if (closeableReadLock != null) {
                                closeableReadLock.close();
                                return;
                            }
                            return;
                        }
                    }
                }
            } catch (Throwable th3) {
                if (closeableReadLock != null) {
                    try {
                        closeableReadLock.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        } while (this.pagingStore.isStarted());
        if (closeableReadLock != null) {
            closeableReadLock.close();
        }
    }

    private void cleanupRegularStream(ArrayList<Page> arrayList, LongHashSet longHashSet, ArrayList<PageSubscription> arrayList2, long j, long j2) throws Exception {
        Page depage;
        Page currentPage = this.pagingStore.getCurrentPage();
        if (j == this.pagingStore.getCurrentWritingPage() && currentPage != null && currentPage.getNumberOfMessages() > 0 && checkPageCompletion(arrayList2, j)) {
            cleanupComplete(arrayList2);
        }
        long j3 = j2;
        while (true) {
            long j4 = j3;
            if (j4 > j || !checkPageCompletion(arrayList2, j4) || (depage = this.pagingStore.depage()) == null) {
                return;
            }
            if (logger.isDebugEnabled()) {
                logger.debug("Depaging page {}", Long.valueOf(depage.getPageId()));
            }
            longHashSet.add(depage.getPageId());
            arrayList.add(depage);
            j3 = j4 + 1;
        }
    }

    private void cleanupMiddleStream(ArrayList<Page> arrayList, LongHashSet longHashSet, ArrayList<PageSubscription> arrayList2, long j, long j2) {
        long currentWritingPage = this.pagingStore.getCurrentWritingPage();
        LongObjectHashMap longObjectHashMap = new LongObjectHashMap();
        int size = arrayList2.size();
        arrayList2.forEach(pageSubscription -> {
            pageSubscription.forEachConsumedPage(consumedPage -> {
                if (consumedPage.isDone()) {
                    AtomicInteger atomicInteger = (AtomicInteger) longObjectHashMap.get(consumedPage.getPageId());
                    if (atomicInteger == null) {
                        atomicInteger = new AtomicInteger(0);
                        longObjectHashMap.put(consumedPage.getPageId(), (long) atomicInteger);
                    }
                    atomicInteger.incrementAndGet();
                }
            });
        });
        longObjectHashMap.forEach((l, atomicInteger) -> {
            try {
                if (l.longValue() > j && l.longValue() > j2 && l.longValue() != currentWritingPage && atomicInteger.get() >= size && !longHashSet.contains(l.longValue())) {
                    Page removePage = this.pagingStore.removePage(l.intValue());
                    if (logger.isDebugEnabled()) {
                        logger.debug("Removing page {}", l);
                    }
                    if (removePage != null) {
                        arrayList.add(removePage);
                        longHashSet.add(removePage.getPageId());
                    }
                }
            } catch (Throwable th) {
                ActiveMQServerLogger.LOGGER.problemCleaningPagesubscriptionCounter(th);
                logger.debug("Error while Issuing cleanupMiddlePages with {}, counter = {}", l, atomicInteger, th);
                arrayList.forEach(page -> {
                    logger.debug("page {}", page);
                });
            }
        });
    }

    protected void cleanupComplete(ArrayList<PageSubscription> arrayList) throws Exception {
        logger.debug("Address {} is leaving page mode as all messages are consumed and acknowledged from the page store", this.pagingStore.getAddress());
        this.pagingStore.forceAnotherPage();
        storeBookmark(arrayList, this.pagingStore.getCurrentPage());
        this.pagingStore.stopPaging();
    }

    protected void finishCleanup(ArrayList<Page> arrayList) {
        logger.trace("this({}) finishing cleanup on {}", this, arrayList);
        try {
            Iterator<Page> it = arrayList.iterator();
            while (it.hasNext()) {
                Page next = it.next();
                try {
                    next.open(false);
                    LinkedList<PagedMessage> read = next.read(this.storageManager, true);
                    try {
                        next.close(false, false);
                    } catch (Exception e) {
                    }
                    next.delete(read);
                    onDeletePage(next);
                } finally {
                }
            }
        } catch (Exception e2) {
            ActiveMQServerLogger.LOGGER.problemCleaningPageAddress(this.pagingStore.getAddress(), e2);
        }
    }

    private boolean checkPageCompletion(ArrayList<PageSubscription> arrayList, long j) throws Exception {
        logger.trace("checkPageCompletion({})", Long.valueOf(j));
        boolean z = true;
        if (!this.pagingStore.checkPageFileExists(j)) {
            logger.trace("store {} did not have an existing file, considering it a complete file then", this.pagingStore.getAddress());
            return true;
        }
        Iterator<PageSubscription> it = arrayList.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            PageSubscription next = it.next();
            if (!next.isComplete(j)) {
                if (logger.isTraceEnabled()) {
                    logger.trace("Cursor {} was considered incomplete at pageNr={}", next, Long.valueOf(j));
                }
                z = false;
            } else if (logger.isTraceEnabled()) {
                logger.trace("Cursor {} was considered **complete** at pageNr={}", next, Long.valueOf(j));
            }
        }
        return z;
    }

    private synchronized ArrayList<PageSubscription> cloneSubscriptions() {
        return new ArrayList<>(this.activeCursors.values());
    }

    protected void onDeletePage(Page page) throws Exception {
        Iterator<PageSubscription> it = cloneSubscriptions().iterator();
        while (it.hasNext()) {
            it.next().onDeletePage(page);
        }
    }

    protected void storeBookmark(ArrayList<PageSubscription> arrayList, Page page) throws Exception {
        try {
            Iterator<PageSubscription> it = arrayList.iterator();
            while (it.hasNext()) {
                it.next().confirmPosition(new PagePositionImpl(page.getPageId(), -1));
            }
        } finally {
            Iterator<PageSubscription> it2 = arrayList.iterator();
            while (it2.hasNext()) {
                it2.next().enableAutoCleanup();
            }
        }
    }

    public String toString() {
        return "PageCursorProviderImpl{pagingStore=" + this.pagingStore + "}";
    }

    private long checkMinPage(Collection<PageSubscription> collection) {
        long j = Long.MAX_VALUE;
        if (logger.isTraceEnabled()) {
            logger.trace("Min page cursorList size {} on {}", Integer.valueOf(collection.size()), this.pagingStore.getAddress(), new Exception("trace"));
        }
        for (PageSubscription pageSubscription : collection) {
            long firstPage = pageSubscription.getFirstPage();
            if (logger.isTraceEnabled()) {
                logger.trace("{} has a cursor {} with first page={}", this.pagingStore.getAddress(), pageSubscription, Long.valueOf(firstPage));
            }
            if (firstPage >= 0 && firstPage < j) {
                j = firstPage;
            }
        }
        if (logger.isTraceEnabled()) {
            logger.trace("checkMinPage({}) will have minPage={}", this.pagingStore.getAddress(), Long.valueOf(j));
        }
        return j;
    }

    private void deliverIfNecessary(Collection<PageSubscription> collection, long j) {
        boolean z = j == this.pagingStore.getCurrentWritingPage();
        for (PageSubscription pageSubscription : collection) {
            long firstPage = pageSubscription.getFirstPage();
            if (firstPage == j && pageSubscription.getQueue().getMessageCount() == 0 && (!z || !pageSubscription.isComplete(firstPage))) {
                pageSubscription.getQueue().deliverAsync();
                return;
            }
        }
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public void counterRebuildStarted() {
        this.rebuildDone = false;
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public void counterRebuildDone() {
        this.rebuildDone = true;
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public boolean isRebuildDone() {
        return this.rebuildDone;
    }

    static {
        $assertionsDisabled = !PageCursorProviderImpl.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
        PAGE_READ_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(30L);
        CONCURRENT_PAGE_READ_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(10L);
        PAGE_READ_PERMISSION_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(10L);
    }
}
