package org.apache.activemq.artemis.core.paging.cursor.impl;

import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.LongObjectHashMap;
import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.utils.collections.LinkedList;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:artemis-server-2.36.0.jar:org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.class */
public class PageCounterRebuildManager implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final PagingStore pgStore;
    private final PagingManager pagingManager;
    private final StorageManager sm;
    private final Map<Long, PageTransactionInfo> transactions;
    private boolean paging;
    private long limitPageId;
    private int limitMessageNr;
    private LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new LongObjectHashMap<>();
    private final Set<Long> storedLargeMessages;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:artemis-server-2.36.0.jar:org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager$CopiedConsumedPage.class */
    public static class CopiedConsumedPage implements ConsumedPage {
        boolean done;
        IntObjectHashMap<Boolean> acks;

        private CopiedConsumedPage() {
        }

        @Override // org.apache.activemq.artemis.core.paging.cursor.ConsumedPage
        public long getPageId() {
            throw new RuntimeException("method not implemented");
        }

        @Override // org.apache.activemq.artemis.core.paging.cursor.ConsumedPage
        public void forEachAck(BiConsumer<Integer, PagePosition> biConsumer) {
            throw new RuntimeException("method not implemented");
        }

        @Override // org.apache.activemq.artemis.core.paging.cursor.ConsumedPage
        public boolean isDone() {
            return this.done;
        }

        @Override // org.apache.activemq.artemis.core.paging.cursor.ConsumedPage
        public boolean isAck(int i) {
            if (this.done) {
                return true;
            }
            return (this.acks == null || this.acks.get(i) == null) ? false : true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:artemis-server-2.36.0.jar:org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager$CopiedSubscription.class */
    public static class CopiedSubscription {
        private boolean empty = true;
        LongObjectHashMap<CopiedConsumedPage> consumedPageMap = new LongObjectHashMap<>();
        PageSubscriptionCounter subscriptionCounter;
        PageSubscription subscription;
        int addUp;
        long sizeUp;

        CopiedSubscription(PageSubscription pageSubscription) {
            this.subscriptionCounter = pageSubscription.getCounter();
            this.subscription = pageSubscription;
        }

        CopiedConsumedPage getPage(long j) {
            return this.consumedPageMap.get(j);
        }
    }

    public PageCounterRebuildManager(PagingManager pagingManager, PagingStore pagingStore, Map<Long, PageTransactionInfo> map, Set<Long> set, AtomicLong atomicLong) {
        initialize(pagingStore);
        this.pagingManager = pagingManager;
        this.pgStore = pagingStore;
        this.sm = pagingStore.getStorageManager();
        this.transactions = map;
        this.storedLargeMessages = set;
    }

    private void initialize(PagingStore pagingStore) {
        pagingStore.lock(-1L);
        try {
            try {
                this.paging = pagingStore.isPaging();
            } catch (Exception e) {
                logger.warn(e.getMessage(), (Throwable) e);
                this.limitPageId = pagingStore.getCurrentWritingPage();
            }
            if (!this.paging) {
                logger.trace("Destination {} was not paging, no need to rebuild counters", pagingStore.getAddress());
                pagingStore.getCursorProvider().forEachSubscription(pageSubscription -> {
                    pageSubscription.getCounter().markRebuilding();
                    pageSubscription.getCounter().finishRebuild();
                });
                pagingStore.getCursorProvider().counterRebuildDone();
                return;
            }
            pagingStore.getCursorProvider().counterRebuildStarted();
            Page currentPage = pagingStore.getCurrentPage();
            this.limitPageId = pagingStore.getCurrentWritingPage();
            this.limitMessageNr = currentPage.getNumberOfMessages();
            if (logger.isTraceEnabled()) {
                logger.trace("PageCounterRebuild for {}, Current writing page {} and limit will be {} with lastMessage on last page={}", pagingStore.getStoreName(), Long.valueOf(pagingStore.getCurrentWritingPage()), Long.valueOf(this.limitPageId), Integer.valueOf(this.limitMessageNr));
            }
            logger.trace("Copying page store ack information from address {}", pagingStore.getAddress());
            pagingStore.getCursorProvider().forEachSubscription(pageSubscription2 -> {
                if (logger.isTraceEnabled()) {
                    logger.trace("Copying subscription ID {}", Long.valueOf(pageSubscription2.getId()));
                }
                CopiedSubscription copiedSubscription = new CopiedSubscription(pageSubscription2);
                copiedSubscription.subscriptionCounter.markRebuilding();
                this.copiedSubscriptionMap.put(pageSubscription2.getId(), (long) copiedSubscription);
                pageSubscription2.forEachConsumedPage(consumedPage -> {
                    if (logger.isTraceEnabled()) {
                        logger.trace("Copying page {}", Long.valueOf(consumedPage.getPageId()));
                    }
                    CopiedConsumedPage copiedConsumedPage = new CopiedConsumedPage();
                    copiedSubscription.consumedPageMap.put(consumedPage.getPageId(), (long) copiedConsumedPage);
                    if (!consumedPage.isDone()) {
                        consumedPage.forEachAck((num, pagePosition) -> {
                            if (logger.isTraceEnabled()) {
                                logger.trace("Marking messageNR {} as acked on pageID={} copy", num, Long.valueOf(consumedPage.getPageId()));
                            }
                            if (copiedConsumedPage.acks == null) {
                                copiedConsumedPage.acks = new IntObjectHashMap<>();
                            }
                            copiedConsumedPage.acks.put2(num, (Integer) Boolean.TRUE);
                        });
                        return;
                    }
                    if (logger.isTraceEnabled()) {
                        logger.trace("Marking page {} as done on the copy", Long.valueOf(consumedPage.getPageId()));
                    }
                    copiedConsumedPage.done = true;
                });
            });
        } finally {
            pagingStore.unlock();
        }
    }

    private CopiedSubscription getSubscription(long j) {
        return this.copiedSubscriptionMap.get(j);
    }

    private boolean isACK(long j, long j2, int i) {
        CopiedSubscription subscription = getSubscription(j);
        if (subscription == null) {
            return true;
        }
        CopiedConsumedPage page = subscription.getPage(j2);
        if (page == null) {
            return false;
        }
        return page.isAck(i);
    }

    private void done() {
        this.copiedSubscriptionMap.forEach((l, copiedSubscription) -> {
            if (!copiedSubscription.empty) {
                copiedSubscription.subscription.notEmpty();
                try {
                    copiedSubscription.subscriptionCounter.increment(null, copiedSubscription.addUp, copiedSubscription.sizeUp);
                } catch (Exception e) {
                    logger.warn(e.getMessage(), (Throwable) e);
                }
            }
            if (!copiedSubscription.empty) {
                copiedSubscription.subscription.notEmpty();
            }
            if (copiedSubscription.subscriptionCounter != null) {
                copiedSubscription.subscriptionCounter.finishRebuild();
            }
        });
        this.pgStore.getCursorProvider().counterRebuildDone();
        this.pgStore.getCursorProvider().scheduleCleanup();
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            rebuild();
        } catch (Exception e) {
            logger.warn(e.getMessage(), (Throwable) e);
        }
    }

    public void rebuild() throws Exception {
        if (this.pgStore == null) {
            logger.trace("Page store is null during rebuildCounters");
            return;
        }
        if (!this.paging) {
            logger.trace("Ignoring call to rebuild pgStore {}", this.pgStore.getAddress());
        }
        logger.debug("Rebuilding page counter for address {}", this.pgStore.getAddress());
        long firstPage = this.pgStore.getFirstPage();
        while (true) {
            long j = firstPage;
            if (j > this.limitPageId) {
                logger.debug("Counter rebuilding done for address {}", this.pgStore.getAddress());
                done();
                return;
            }
            if (logger.isDebugEnabled()) {
                logger.trace("Rebuilding counter on messages from page {} on rebuildCounters for address {}", Long.valueOf(j), this.pgStore.getAddress());
            }
            logger.debug("{} reading paging {} of {}", this.pgStore.getAddress(), Long.valueOf(j), Long.valueOf(this.limitPageId));
            Page newPageObject = this.pgStore.newPageObject(j);
            if (newPageObject.getFile().exists()) {
                newPageObject.open(false);
                LinkedList<PagedMessage> read = newPageObject.read(this.sm);
                newPageObject.close(false, false);
                LinkedListIterator<PagedMessage> it = read.iterator();
                while (true) {
                    try {
                        if (!it.hasNext()) {
                            break;
                        }
                        final PagedMessage next = it.next();
                        if (this.storedLargeMessages != null && next.getMessage().isLargeMessage()) {
                            if (logger.isDebugEnabled()) {
                                logger.trace("removing storedLargeMessage {}", Long.valueOf(next.getMessage().getMessageID()));
                            }
                            this.storedLargeMessages.remove(Long.valueOf(next.getMessage().getMessageID()));
                        }
                        if (this.limitPageId != j || next.getMessageNumber() < this.limitMessageNr) {
                            next.initMessage(this.sm);
                            long[] queueIDs = next.getQueueIDs();
                            if (logger.isTraceEnabled()) {
                                logger.trace("reading message for rebuild cursor on address={}, pg={}, messageNR={}, routedQueues={}, message={}, queueLIst={}", this.pgStore.getAddress(), Long.valueOf(next.getPageNumber()), Integer.valueOf(next.getMessageNumber()), queueIDs, next, queueIDs);
                            }
                            PageTransactionInfo pageTransactionInfo = null;
                            if (next.getTransactionID() > 0) {
                                pageTransactionInfo = this.transactions.get(Long.valueOf(next.getTransactionID()));
                                if (pageTransactionInfo != null) {
                                    pageTransactionInfo.setOrphaned(false);
                                }
                            }
                            Transaction preparedTransaction = pageTransactionInfo == null ? null : pageTransactionInfo.getPreparedTransaction();
                            if (logger.isTraceEnabled()) {
                                logger.trace("lookup on {}, tx={}, preparedTX={}", Long.valueOf(next.getTransactionID()), pageTransactionInfo, preparedTransaction);
                            }
                            for (long j2 : queueIDs) {
                                boolean z = !isACK(j2, next.getPageNumber(), next.getMessageNumber());
                                if (preparedTransaction != null) {
                                    final PageSubscription subscription = this.pgStore.getCursorProvider().getSubscription(j2);
                                    preparedTransaction.addOperation(new TransactionOperationAbstract() { // from class: org.apache.activemq.artemis.core.paging.cursor.impl.PageCounterRebuildManager.1
                                        @Override // org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract, org.apache.activemq.artemis.core.transaction.TransactionOperation
                                        public void afterCommit(Transaction transaction) {
                                            PagingManager pagingManager = PageCounterRebuildManager.this.pagingManager;
                                            PageSubscription pageSubscription = subscription;
                                            PagedMessage pagedMessage = next;
                                            pagingManager.execute(() -> {
                                                try {
                                                    pageSubscription.getCounter().increment(null, 1, pagedMessage.getStoredSize());
                                                } catch (Exception e) {
                                                    PageCounterRebuildManager.logger.warn(e.getMessage(), (Throwable) e);
                                                }
                                            });
                                        }
                                    });
                                } else {
                                    boolean z2 = next.getTransactionID() <= 0 || this.transactions == null || pageTransactionInfo != null;
                                    if (!z2) {
                                        logger.trace("TX is not included for {}", next);
                                    }
                                    if (z && z2) {
                                        if (logger.isTraceEnabled()) {
                                            logger.trace("Message pageNumber={}/{} NOT acked on queue {}", Long.valueOf(next.getPageNumber()), Integer.valueOf(next.getMessageNumber()), Long.valueOf(j2));
                                        }
                                        CopiedSubscription copiedSubscription = this.copiedSubscriptionMap.get(j2);
                                        if (copiedSubscription != null) {
                                            copiedSubscription.empty = false;
                                            copiedSubscription.addUp++;
                                            copiedSubscription.sizeUp += next.getPersistentSize();
                                        }
                                    } else if (logger.isTraceEnabled()) {
                                        logger.trace("Message pageNumber={}/{} IS acked on queue {}", Long.valueOf(next.getPageNumber()), Integer.valueOf(next.getMessageNumber()), Long.valueOf(j2));
                                    }
                                }
                            }
                        } else if (logger.isDebugEnabled()) {
                            logger.trace("Rebuild counting on {} reached the last message at {}-{}", this.pgStore.getAddress(), Long.valueOf(this.limitPageId), Integer.valueOf(this.limitMessageNr));
                        }
                    } catch (Throwable th) {
                        if (it != null) {
                            try {
                                it.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                }
                if (it != null) {
                    it.close();
                }
            } else if (logger.isDebugEnabled()) {
                logger.trace("Skipping page {} on store {}", Long.valueOf(j), this.pgStore.getAddress());
            }
            firstPage = j + 1;
        }
    }
}
