package org.apache.activemq.artemis.core.server.impl;

import com.sun.xml.bind.v2.runtime.reflect.opt.Const;
import java.lang.invoke.MethodHandles;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Function;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RefCountMessage;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.message.LargeBodyReader;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.CoreLargeServerMessage;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.logs.AuditLogger;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:artemis-server-2.32.0.jar:org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.class */
public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final long id;
    private final long sequentialID;
    protected final Queue messageQueue;
    private final Filter filter;
    private final int priority;
    private final int minLargeMessageSize;
    private ServerSession session;
    protected final Object lock;
    private final boolean supportLargeMessage;
    private Object protocolData;
    private Object protocolContext;
    private final ActiveMQServer server;
    private SlowConsumerDetectionListener slowConsumerListener;
    private final ReusableLatch pendingDelivery;
    private volatile AtomicInteger availableCredits;
    private boolean started;
    private volatile CoreLargeMessageDeliverer largeMessageDeliverer;
    private final boolean browseOnly;
    protected BrowserDeliverer browserDeliverer;
    private final boolean strictUpdateDeliveryCount;
    private final StorageManager storageManager;
    private final Deque<MessageReference> deliveringRefs;
    private SessionCallback callback;
    private boolean preAcknowledge;
    private final ManagementService managementService;
    private final Binding binding;
    private boolean transferring;
    private final long creationTime;
    private AtomicLong consumerRateCheckTime;
    private AtomicLong messageConsumedSnapshot;
    private boolean requiresLegacyPrefix;
    private boolean anycast;
    private boolean isClosed;
    ServerConsumerMetrics metrics;
    private final Runnable resumeLargeMessageRunnable;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:artemis-server-2.32.0.jar:org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl$BrowserDeliverer.class */
    public class BrowserDeliverer implements Runnable {
        protected MessageReference current = null;
        public final LinkedListIterator<MessageReference> iterator;

        public BrowserDeliverer(LinkedListIterator<MessageReference> linkedListIterator) {
            this.iterator = linkedListIterator;
        }

        public synchronized void close() {
            this.iterator.close();
        }

        @Override // java.lang.Runnable
        public synchronized void run() {
            if (this.current != null) {
                try {
                    HandleStatus handle = ServerConsumerImpl.this.handle(this.current);
                    if (handle == HandleStatus.BUSY) {
                        return;
                    }
                    if (handle == HandleStatus.HANDLED) {
                        ServerConsumerImpl.this.proceedDeliver(this.current);
                    }
                    this.current = null;
                } catch (Exception e) {
                    ActiveMQServerLogger.LOGGER.errorBrowserHandlingMessage(this.current, e);
                    return;
                }
            }
            MessageReference messageReference = null;
            while (true) {
                try {
                    messageReference = null;
                    synchronized (ServerConsumerImpl.this.messageQueue) {
                        if (!this.iterator.hasNext()) {
                            ServerConsumerImpl.logger.trace("browser finished");
                            ServerConsumerImpl.this.callback.browserFinished(ServerConsumerImpl.this);
                            return;
                        }
                        messageReference = this.iterator.next();
                        ServerConsumerImpl.logger.trace("Receiving {}", messageReference.getMessage());
                        HandleStatus handle2 = ServerConsumerImpl.this.handle(messageReference);
                        if (handle2 == HandleStatus.HANDLED) {
                            ServerConsumerImpl.this.proceedDeliver(messageReference);
                        } else if (handle2 == HandleStatus.BUSY) {
                            this.current = messageReference;
                            return;
                        }
                    }
                } catch (Exception e2) {
                    ActiveMQServerLogger.LOGGER.errorBrowserHandlingMessage(messageReference, e2);
                    return;
                }
            }
        }

        public boolean isBrowsed() {
            ServerConsumerImpl.this.messageQueue.deliverAsync();
            return !this.iterator.hasNext();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:artemis-server-2.32.0.jar:org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl$CoreLargeMessageDeliverer.class */
    public final class CoreLargeMessageDeliverer {
        private long sizePendingLargeMessage;
        private LargeServerMessage largeMessage;
        private final MessageReference ref;
        private boolean sentInitialPacket = false;
        private long positionPendingLargeMessage;
        private LargeBodyReader context;
        private ByteBuffer chunkBytes;
        static final /* synthetic */ boolean $assertionsDisabled;

        private CoreLargeMessageDeliverer(MessageReference messageReference) {
            this.ref = messageReference;
            this.largeMessage = (LargeServerMessage) messageReference.getMessage();
            this.largeMessage.toMessage().usageUp();
            this.chunkBytes = null;
        }

        public String toString() {
            return "ServerConsumerImpl$LargeMessageDeliverer[ref=[" + this.ref + "]]";
        }

        private ByteBuffer acquireHeapBodyBuffer(int i) {
            if (this.chunkBytes == null || this.chunkBytes.capacity() != i) {
                this.chunkBytes = ByteBuffer.allocate(i);
            } else {
                this.chunkBytes.clear();
            }
            return this.chunkBytes;
        }

        private void releaseHeapBodyBuffer() {
            this.chunkBytes = null;
        }

        public boolean deliver() throws Exception {
            ServerConsumerImpl.this.pendingDelivery.countUp();
            try {
                if (!ServerConsumerImpl.this.started) {
                    return false;
                }
                LargeServerMessage largeServerMessage = this.largeMessage;
                if (largeServerMessage == null) {
                    ServerConsumerImpl.this.pendingDelivery.countDown();
                    return true;
                }
                if (ServerConsumerImpl.this.availableCredits != null && ServerConsumerImpl.this.availableCredits.get() <= 0) {
                    ServerConsumerImpl.logger.trace("{}::FlowControl::delivery largeMessage interrupting as there are no more credits, available={}", this, ServerConsumerImpl.this.availableCredits);
                    releaseHeapBodyBuffer();
                    ServerConsumerImpl.this.pendingDelivery.countDown();
                    return false;
                }
                if (!this.sentInitialPacket) {
                    this.context = largeServerMessage.getLargeBodyReader();
                    this.sizePendingLargeMessage = this.context.getSize();
                    this.context.open();
                    this.sentInitialPacket = true;
                    int sendLargeMessage = ServerConsumerImpl.this.callback.sendLargeMessage(this.ref, ServerConsumerImpl.this, this.context.getSize(), this.ref.getDeliveryCount());
                    if (ServerConsumerImpl.this.availableCredits != null) {
                        if (ServerConsumerImpl.this.availableCredits.addAndGet(-sendLargeMessage) <= 0) {
                            releaseHeapBodyBuffer();
                        }
                        if (ServerConsumerImpl.logger.isTraceEnabled()) {
                            ServerConsumerImpl.logger.trace("{}::FlowControl:: deliver initialpackage with {} delivered, available now = {}", this, Integer.valueOf(sendLargeMessage), ServerConsumerImpl.this.availableCredits);
                        }
                    }
                    ServerConsumerImpl.this.resumeLargeMessage();
                    ServerConsumerImpl.this.pendingDelivery.countDown();
                    return false;
                }
                if (ServerConsumerImpl.this.availableCredits != null && ServerConsumerImpl.this.availableCredits.get() <= 0) {
                    ServerConsumerImpl.logger.trace("{}::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available={}", this, ServerConsumerImpl.this.availableCredits);
                    releaseHeapBodyBuffer();
                    ServerConsumerImpl.this.pendingDelivery.countDown();
                    return false;
                }
                int min = (int) Math.min(this.sizePendingLargeMessage - this.positionPendingLargeMessage, ServerConsumerImpl.this.minLargeMessageSize);
                ByteBuffer acquireHeapBodyBuffer = acquireHeapBodyBuffer(min);
                if (!$assertionsDisabled && acquireHeapBodyBuffer.remaining() != min) {
                    throw new AssertionError();
                }
                int readInto = this.context.readInto(acquireHeapBodyBuffer);
                if (!$assertionsDisabled && readInto != min) {
                    long messageID = this.largeMessage.getMessageID();
                    System.identityHashCode(this.largeMessage);
                    AssertionError assertionError = new AssertionError("readBytes = " + readInto + ", localChunkLen=" + min + " on large message " + messageID + ", hash = " + assertionError);
                    throw assertionError;
                }
                byte[] array = acquireHeapBodyBuffer.array();
                if (!$assertionsDisabled && array.length != readInto) {
                    throw new AssertionError();
                }
                int sendLargeMessageContinuation = ServerConsumerImpl.this.callback.sendLargeMessageContinuation(ServerConsumerImpl.this, array, this.positionPendingLargeMessage + ((long) min) < this.sizePendingLargeMessage, false);
                int length = array.length;
                if (ServerConsumerImpl.this.availableCredits != null) {
                    if (ServerConsumerImpl.this.availableCredits.addAndGet(-sendLargeMessageContinuation) <= 0) {
                        releaseHeapBodyBuffer();
                    }
                    if (ServerConsumerImpl.logger.isTraceEnabled()) {
                        ServerConsumerImpl.logger.trace("{}::FlowControl::largeMessage deliver continuation, packetSize={} available now={}", this, Integer.valueOf(sendLargeMessageContinuation), ServerConsumerImpl.this.availableCredits);
                    }
                }
                this.positionPendingLargeMessage += length;
                if (this.positionPendingLargeMessage < this.sizePendingLargeMessage) {
                    ServerConsumerImpl.this.resumeLargeMessage();
                    ServerConsumerImpl.this.pendingDelivery.countDown();
                    return false;
                }
                ServerConsumerImpl.logger.trace("Finished deliverLargeMessage");
                finish();
                ServerConsumerImpl.this.pendingDelivery.countDown();
                return true;
            } finally {
                ServerConsumerImpl.this.pendingDelivery.countDown();
            }
        }

        public void finish() throws Exception {
            synchronized (ServerConsumerImpl.this.lock) {
                releaseHeapBodyBuffer();
                if (this.largeMessage == null) {
                    return;
                }
                if (this.context != null) {
                    this.context.close();
                    this.context = null;
                }
                this.largeMessage.releaseResources(false, false);
                this.largeMessage.toMessage().usageDown();
                ServerConsumerImpl.this.largeMessageDeliverer = null;
                this.largeMessage = null;
            }
        }

        static {
            $assertionsDisabled = !ServerConsumerImpl.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:artemis-server-2.32.0.jar:org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl$ServerConsumerMetrics.class */
    public static class ServerConsumerMetrics extends TransactionOperationAbstract {
        private volatile long messagesInTransitSize = 0;
        private volatile int messagesAcknowledgedAwaitingCommit = 0;
        private volatile long messagesDeliveredSize = 0;
        private volatile long lastDeliveredTime = 0;
        private volatile long lastAcknowledgedTime = 0;
        private volatile long messagesDelivered = 0;
        private volatile long messagesAcknowledged = 0;
        private static final AtomicLongFieldUpdater<ServerConsumerMetrics> messagesInTransitSizeUpdater = AtomicLongFieldUpdater.newUpdater(ServerConsumerMetrics.class, "messagesInTransitSize");
        private static final AtomicIntegerFieldUpdater<ServerConsumerMetrics> messagesAcknowledgedAwaitingCommitUpdater = AtomicIntegerFieldUpdater.newUpdater(ServerConsumerMetrics.class, "messagesAcknowledgedAwaitingCommit");
        private static final AtomicLongFieldUpdater<ServerConsumerMetrics> messagesDeliveredSizeUpdater = AtomicLongFieldUpdater.newUpdater(ServerConsumerMetrics.class, "messagesDeliveredSize");
        private static final AtomicLongFieldUpdater<ServerConsumerMetrics> messagesDeliveredUpdater = AtomicLongFieldUpdater.newUpdater(ServerConsumerMetrics.class, "messagesDelivered");
        private static final AtomicLongFieldUpdater<ServerConsumerMetrics> messagesAcknowledgedUpdater = AtomicLongFieldUpdater.newUpdater(ServerConsumerMetrics.class, "messagesAcknowledged");

        ServerConsumerMetrics() {
        }

        public long getMessagesInTransitSize() {
            return messagesInTransitSizeUpdater.get(this);
        }

        public long getMessagesDeliveredSize() {
            return messagesDeliveredSizeUpdater.get(this);
        }

        public long getLastDeliveredTime() {
            return this.lastDeliveredTime;
        }

        public long getLastAcknowledgedTime() {
            return this.lastAcknowledgedTime;
        }

        public long getMessagesDelivered() {
            return messagesDeliveredUpdater.get(this);
        }

        public long getMessagesAcknowledged() {
            return messagesAcknowledgedUpdater.get(this);
        }

        public int getMessagesAcknowledgedAwaitingCommit() {
            return messagesAcknowledgedAwaitingCommitUpdater.get(this);
        }

        public void addMessage(int i) {
            messagesInTransitSizeUpdater.addAndGet(this, i);
            messagesDeliveredSizeUpdater.addAndGet(this, i);
            messagesDeliveredUpdater.addAndGet(this, 1L);
            this.lastDeliveredTime = System.currentTimeMillis();
        }

        public void addAcknowledge(int i, Transaction transaction) {
            messagesInTransitSizeUpdater.addAndGet(this, -i);
            messagesAcknowledgedUpdater.addAndGet(this, 1L);
            this.lastAcknowledgedTime = System.currentTimeMillis();
            if (transaction != null) {
                addOperation(transaction);
                messagesAcknowledgedAwaitingCommitUpdater.addAndGet(this, 1);
            }
        }

        @Override // org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract, org.apache.activemq.artemis.core.transaction.TransactionOperation
        public void afterCommit(Transaction transaction) {
            messagesAcknowledgedAwaitingCommitUpdater.set(this, 0);
        }

        @Override // org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract, org.apache.activemq.artemis.core.transaction.TransactionOperation
        public void afterRollback(Transaction transaction) {
            messagesAcknowledgedAwaitingCommitUpdater.set(this, 0);
        }

        public void addOperation(Transaction transaction) {
            if (transaction.getProperty(10) == null) {
                transaction.putProperty(10, this);
                transaction.addOperation(this);
            }
        }
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public String debug() {
        String str;
        String str2 = toString() + "::Delivering ";
        synchronized (this.lock) {
            str = str2 + this.deliveringRefs.size();
        }
        return str;
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public boolean isClosed() {
        return this.isClosed;
    }

    public ServerConsumerImpl(long j, ServerSession serverSession, QueueBinding queueBinding, Filter filter, boolean z, boolean z2, StorageManager storageManager, SessionCallback sessionCallback, boolean z3, boolean z4, ManagementService managementService, ActiveMQServer activeMQServer) throws Exception {
        this(j, serverSession, queueBinding, filter, z, z2, storageManager, sessionCallback, z3, z4, managementService, true, null, activeMQServer);
    }

    public ServerConsumerImpl(long j, ServerSession serverSession, QueueBinding queueBinding, Filter filter, boolean z, boolean z2, StorageManager storageManager, SessionCallback sessionCallback, boolean z3, boolean z4, ManagementService managementService, boolean z5, Integer num, ActiveMQServer activeMQServer) throws Exception {
        this(j, serverSession, queueBinding, filter, ActiveMQDefaultConfiguration.getDefaultConsumerPriority(), z, z2, storageManager, sessionCallback, z3, z4, managementService, z5, num, activeMQServer);
    }

    public ServerConsumerImpl(long j, ServerSession serverSession, QueueBinding queueBinding, Filter filter, int i, boolean z, boolean z2, StorageManager storageManager, SessionCallback sessionCallback, boolean z3, boolean z4, ManagementService managementService, boolean z5, Integer num, ActiveMQServer activeMQServer) throws Exception {
        this.lock = new Object();
        this.pendingDelivery = new ReusableLatch(0);
        this.availableCredits = new AtomicInteger(0);
        this.largeMessageDeliverer = null;
        this.deliveringRefs = new ArrayDeque();
        this.transferring = false;
        this.consumerRateCheckTime = new AtomicLong(System.currentTimeMillis());
        this.messageConsumedSnapshot = new AtomicLong(0L);
        this.requiresLegacyPrefix = false;
        this.anycast = false;
        this.isClosed = false;
        this.metrics = new ServerConsumerMetrics();
        this.resumeLargeMessageRunnable = new Runnable() { // from class: org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl.2
            @Override // java.lang.Runnable
            public void run() {
                synchronized (ServerConsumerImpl.this.lock) {
                    try {
                        if (ServerConsumerImpl.this.largeMessageDeliverer == null || ServerConsumerImpl.this.largeMessageDeliverer.deliver()) {
                            ServerConsumerImpl.this.forceDelivery();
                        }
                    } catch (Exception e) {
                        ActiveMQServerLogger.LOGGER.errorRunningLargeMessageDeliverer(e);
                    }
                }
            }
        };
        if (serverSession == null || serverSession.getRemotingConnection() == null) {
            throw new NullPointerException("session = " + serverSession);
        }
        if (serverSession != null && serverSession.getRemotingConnection() != null && serverSession.getRemotingConnection().isDestroyed()) {
            throw ActiveMQMessageBundle.BUNDLE.connectionDestroyed(serverSession.getRemotingConnection().getRemoteAddress());
        }
        this.id = j;
        this.sequentialID = activeMQServer.getStorageManager().generateID();
        this.filter = filter;
        this.priority = i;
        this.session = serverSession;
        this.binding = queueBinding;
        this.messageQueue = queueBinding.getQueue();
        this.started = z2 || z;
        this.browseOnly = z2;
        this.storageManager = storageManager;
        this.callback = sessionCallback;
        this.preAcknowledge = z3;
        this.managementService = managementService;
        this.minLargeMessageSize = serverSession.getMinLargeMessageSize();
        this.strictUpdateDeliveryCount = z4;
        this.creationTime = System.currentTimeMillis();
        this.supportLargeMessage = z5;
        if (num != null) {
            if (num.intValue() == -1) {
                this.availableCredits = null;
            } else {
                this.availableCredits.set(num.intValue());
            }
        }
        this.server = activeMQServer;
        if (z2) {
            this.browserDeliverer = new BrowserDeliverer(this.messageQueue.browserIterator());
        } else {
            this.messageQueue.addConsumer(this);
        }
        if (serverSession.getRemotingConnection() instanceof CoreRemotingConnection) {
            CoreRemotingConnection coreRemotingConnection = (CoreRemotingConnection) serverSession.getRemotingConnection();
            if (serverSession.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) == null || coreRemotingConnection.getChannelVersion() >= 129) {
                return;
            }
            this.requiresLegacyPrefix = true;
            if (getQueue().getRoutingType().equals(RoutingType.ANYCAST)) {
                this.anycast = true;
            }
        }
    }

    @Override // org.apache.activemq.artemis.spi.core.remoting.ReadyListener
    public void readyForWriting() {
        promptDelivery();
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public boolean allowReferenceCallback() {
        if (this.browseOnly) {
            return false;
        }
        return this.messageQueue.allowsReferenceCallback();
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public long sequentialID() {
        return this.sequentialID;
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public Object getProtocolData() {
        return this.protocolData;
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public void setProtocolData(Object obj) {
        this.protocolData = obj;
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public void setlowConsumerDetection(SlowConsumerDetectionListener slowConsumerDetectionListener) {
        this.slowConsumerListener = slowConsumerDetectionListener;
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public SlowConsumerDetectionListener getSlowConsumerDetecion() {
        return this.slowConsumerListener;
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public void fireSlowConsumer() {
        if (this.slowConsumerListener != null) {
            this.slowConsumerListener.onSlowConsumer(this);
        }
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public Object getProtocolContext() {
        return this.protocolContext;
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public void setProtocolContext(Object obj) {
        this.protocolContext = obj;
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public long getID() {
        return this.id;
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public boolean isBrowseOnly() {
        return this.browseOnly;
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public long getCreationTime() {
        return this.creationTime;
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public Object getConnectionID() {
        return this.session.getConnectionID();
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public String getSessionID() {
        return this.session.getName();
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public void metricsAcknowledge(MessageReference messageReference, Transaction transaction) {
        this.metrics.addAcknowledge(messageReference.getMessage().getEncodeSize(), transaction);
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public List<MessageReference> getDeliveringMessages() {
        ArrayList arrayList;
        synchronized (this.lock) {
            int i = 0;
            List<MessageReference> inTXMessagesForConsumer = this.session.getInTXMessagesForConsumer(this.id);
            if (inTXMessagesForConsumer != null) {
                i = inTXMessagesForConsumer.size();
            }
            arrayList = new ArrayList(i + this.deliveringRefs.size());
            if (inTXMessagesForConsumer != null) {
                arrayList.addAll(inTXMessagesForConsumer);
            }
            arrayList.addAll(this.deliveringRefs);
        }
        return arrayList;
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public boolean supportsDirectDelivery() {
        return this.callback.supportsDirectDelivery();
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public void errorProcessing(Throwable th, MessageReference messageReference) {
        this.messageQueue.errorProcessing(this, th, messageReference);
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public HandleStatus handle(MessageReference messageReference) throws Exception {
        AtomicInteger atomicInteger = this.availableCredits;
        if ((this.callback != null && !this.callback.hasCredits(this, messageReference)) || (atomicInteger != null && atomicInteger.get() <= 0)) {
            if (logger.isDebugEnabled()) {
                logger.debug("{} is busy for the lack of credits. Current credits = {} Can't receive reference {}", this, this.availableCredits, messageReference);
            }
            return HandleStatus.BUSY;
        }
        if (this.server.hasBrokerMessagePlugins() && !this.server.callBrokerMessagePluginsCanAccept(this, messageReference)) {
            logger.trace("Reference {} is not allowed to be consumed by {} due to message plugin filter.", messageReference, this);
            return HandleStatus.NO_MATCH;
        }
        synchronized (this.lock) {
            if ((this.callback != null && !this.callback.isWritable(this, this.protocolContext)) || !this.started || this.transferring) {
                return HandleStatus.BUSY;
            }
            if (this.largeMessageDeliverer != null) {
                if (logger.isDebugEnabled()) {
                    logger.debug("{} is busy delivering large message {}, can't deliver reference {}", this, this.largeMessageDeliverer, messageReference);
                }
                return HandleStatus.BUSY;
            }
            Message message = messageReference.getMessage();
            if (!message.acceptsConsumer(sequentialID())) {
                return HandleStatus.NO_MATCH;
            }
            if (this.filter != null && !this.filter.match(message)) {
                logger.trace("Reference {} is a noMatch on consumer {}", messageReference, this);
                return HandleStatus.NO_MATCH;
            }
            logger.trace("ServerConsumerImpl::{} Handling reference {}", this, messageReference);
            if (!this.browseOnly) {
                if (!this.preAcknowledge) {
                    this.deliveringRefs.add(messageReference);
                }
                this.metrics.addMessage(messageReference.getMessage().getEncodeSize());
                messageReference.handled();
                messageReference.setConsumerId(this.id);
                messageReference.incrementDeliveryCount();
                if (this.strictUpdateDeliveryCount && !messageReference.isPaged() && messageReference.getMessage().isDurable() && messageReference.getQueue().isDurable() && !messageReference.getQueue().isInternalQueue() && !messageReference.isPaged()) {
                    this.storageManager.updateDeliveryCount(messageReference);
                }
                if ((message instanceof CoreLargeServerMessage) && this.supportLargeMessage) {
                    this.largeMessageDeliverer = new CoreLargeMessageDeliverer(messageReference);
                }
                if (this.preAcknowledge) {
                    messageReference.getQueue().acknowledge(messageReference, this);
                    this.metrics.addAcknowledge(messageReference.getMessage().getEncodeSize(), null);
                }
            }
            this.pendingDelivery.countUp();
            return HandleStatus.HANDLED;
        }
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public void proceedDeliver(MessageReference messageReference) throws Exception {
        try {
            if (AuditLogger.isMessageLoggingEnabled()) {
                AuditLogger.coreConsumeMessage(this.session.getRemotingConnection().getSubject(), this.session.getRemotingConnection().getRemoteAddress(), getQueueName().toString(), messageReference.toString());
            }
            if (this.server.hasBrokerMessagePlugins()) {
                this.server.callBrokerMessagePlugins(activeMQServerMessagePlugin -> {
                    activeMQServerMessagePlugin.beforeDeliver(this, messageReference);
                });
            }
            if ((messageReference.getMessage() instanceof CoreLargeServerMessage) && this.supportLargeMessage) {
                if (this.largeMessageDeliverer == null) {
                    this.largeMessageDeliverer = new CoreLargeMessageDeliverer(messageReference);
                }
                this.largeMessageDeliverer.deliver();
            } else {
                deliverStandardMessage(messageReference);
            }
        } finally {
            this.pendingDelivery.countDown();
            this.callback.afterDelivery();
            if (this.server.hasBrokerMessagePlugins()) {
                this.server.callBrokerMessagePlugins(activeMQServerMessagePlugin2 -> {
                    activeMQServerMessagePlugin2.afterDeliver(this, messageReference);
                });
            }
        }
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public Binding getBinding() {
        return this.binding;
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public Filter getFilter() {
        return this.filter;
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer, org.apache.activemq.artemis.core.PriorityAware
    public int getPriority() {
        return this.priority;
    }

    @Override // org.apache.activemq.artemis.core.server.ConsumerInfo
    public SimpleString getFilterString() {
        if (this.filter == null) {
            return null;
        }
        return this.filter.getFilterString();
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public synchronized void close(boolean z) throws Exception {
        if (this.isClosed) {
            return;
        }
        this.isClosed = true;
        if (logger.isTraceEnabled()) {
            logger.trace("ServerConsumerImpl::{} being closed with failed={}", this, Boolean.valueOf(z), new Exception("trace"));
        }
        if (this.server.hasBrokerConsumerPlugins()) {
            this.server.callBrokerConsumerPlugins(activeMQServerConsumerPlugin -> {
                activeMQServerConsumerPlugin.beforeCloseConsumer(this, z);
            });
        }
        setStarted(false);
        CoreLargeMessageDeliverer coreLargeMessageDeliverer = this.largeMessageDeliverer;
        if (coreLargeMessageDeliverer != null) {
            coreLargeMessageDeliverer.finish();
        }
        List<MessageReference> cancelRefs = cancelRefs(z, false, null);
        TransactionImpl transactionImpl = new TransactionImpl(this.storageManager);
        cancelRefs.forEach(messageReference -> {
            logger.trace("ServerConsumerImpl::{} cancelling reference {}", this, messageReference);
            messageReference.getQueue().cancel(transactionImpl, messageReference, true);
        });
        transactionImpl.rollback();
        removeItself();
        addLingerRefs();
        if (!this.browseOnly) {
            TypedProperties typedProperties = new TypedProperties();
            typedProperties.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, this.binding.getAddress());
            typedProperties.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, this.binding.getClusterName());
            typedProperties.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, this.binding.getRoutingName());
            typedProperties.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, this.filter == null ? null : this.filter.getFilterString());
            typedProperties.putIntProperty(ManagementHelper.HDR_DISTANCE, this.binding.getDistance());
            typedProperties.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, this.messageQueue.getConsumerCount());
            typedProperties.putSimpleStringProperty(ManagementHelper.HDR_USER, SimpleString.toSimpleString(this.session.getUsername()));
            typedProperties.putSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS, SimpleString.toSimpleString(this.session.getRemotingConnection().getRemoteAddress()));
            typedProperties.putSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME, SimpleString.toSimpleString(this.session.getName()));
            if (this.session.getRemotingConnection().getClientID() != null) {
                typedProperties.putSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID, SimpleString.toSimpleString(this.session.getRemotingConnection().getClientID()));
            }
            typedProperties.putLongProperty(ManagementHelper.HDR_CONSUMER_NAME, getID());
            this.managementService.sendNotification(new Notification(null, CoreNotificationType.CONSUMER_CLOSED, typedProperties));
        }
        this.messageQueue.recheckRefCount(this.session.getSessionContext());
        if (this.server.hasBrokerConsumerPlugins()) {
            this.server.callBrokerConsumerPlugins(activeMQServerConsumerPlugin2 -> {
                activeMQServerConsumerPlugin2.afterCloseConsumer(this, z);
            });
        }
        this.messageQueue.getExecutor().execute(() -> {
            this.protocolContext = null;
            this.callback = null;
            this.session = null;
        });
    }

    private void addLingerRefs() throws Exception {
        List<MessageReference> inTXMessagesForConsumer;
        if (this.browseOnly || (inTXMessagesForConsumer = this.session.getInTXMessagesForConsumer(this.id)) == null || inTXMessagesForConsumer.isEmpty()) {
            return;
        }
        this.session.addLingerConsumer(this);
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public void removeItself() throws Exception {
        if (this.browseOnly) {
            this.browserDeliverer.close();
        } else {
            this.messageQueue.removeConsumer(this);
            this.messageQueue.deliverAsync();
        }
        this.session.removeConsumer(this.id);
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public void forceDelivery(long j) {
        forceDelivery(j, () -> {
            CoreMessage address = new CoreMessage(this.storageManager.generateID(), 50).putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, j).setAddress(this.messageQueue.getName());
            MessageReference createReference = MessageReference.Factory.createReference(address, this.messageQueue);
            createReference.setDeliveryCount(0);
            applyPrefixForLegacyConsumer(address);
            this.callback.sendMessage(createReference, this, 0);
        });
    }

    public synchronized void forceDelivery(final long j, final Runnable runnable) {
        promptDelivery();
        this.messageQueue.getExecutor().execute(new Runnable() { // from class: org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    synchronized (ServerConsumerImpl.this.lock) {
                        if (ServerConsumerImpl.this.transferring) {
                            ServerConsumerImpl.this.messageQueue.getExecutor().execute(new Runnable() { // from class: org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl.1.1
                                @Override // java.lang.Runnable
                                public void run() {
                                    ServerConsumerImpl.this.forceDelivery(j, runnable);
                                }
                            });
                        } else {
                            runnable.run();
                        }
                    }
                } catch (Exception e) {
                    ActiveMQServerLogger.LOGGER.errorSendingForcedDelivery(e);
                }
            }
        });
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public List<MessageReference> cancelRefs(boolean z, boolean z2, Transaction transaction) throws Exception {
        boolean z3 = z2;
        try {
            try {
                CoreLargeMessageDeliverer coreLargeMessageDeliverer = this.largeMessageDeliverer;
                if (coreLargeMessageDeliverer != null) {
                    coreLargeMessageDeliverer.finish();
                }
            } catch (Throwable th) {
                ActiveMQServerLogger.LOGGER.errorResttingLargeMessage(this.largeMessageDeliverer, th);
                this.largeMessageDeliverer = null;
            }
            synchronized (this.lock) {
                if (this.deliveringRefs.isEmpty()) {
                    return Collections.emptyList();
                }
                ArrayList arrayList = new ArrayList(this.deliveringRefs.size());
                while (true) {
                    MessageReference poll = this.deliveringRefs.poll();
                    if (poll == null) {
                        return arrayList;
                    }
                    this.metrics.addAcknowledge(poll.getMessage().getEncodeSize(), transaction);
                    if (z3) {
                        poll.acknowledge(transaction, this);
                        z3 = false;
                    } else {
                        arrayList.add(poll);
                        updateDeliveryCountForCanceledRef(poll, z);
                    }
                    if (logger.isTraceEnabled()) {
                        logger.trace("ServerConsumerImpl::{} Preparing Cancelling list for messageID = {}, ref = {}", this, Long.valueOf(poll.getMessage().getMessageID()), poll);
                    }
                }
            }
        } finally {
            this.largeMessageDeliverer = null;
        }
    }

    protected void updateDeliveryCountForCanceledRef(MessageReference messageReference, boolean z) {
        if (this.callback.updateDeliveryCountAfterCancel(this, messageReference, z) || z) {
            return;
        }
        messageReference.decrementDeliveryCount();
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public void setStarted(boolean z) {
        synchronized (this.lock) {
            this.started = this.browseOnly || z;
        }
        if (z) {
            promptDelivery();
        } else {
            flushDelivery();
        }
    }

    private boolean flushDelivery() {
        try {
            if (this.pendingDelivery.await(30L, TimeUnit.SECONDS)) {
                return true;
            }
            ActiveMQServerLogger.LOGGER.timeoutLockingConsumer(toString(), this.session.getRemotingConnection().getTransportConnection().getRemoteAddress());
            if (this.server == null) {
                return false;
            }
            this.server.threadDump();
            return false;
        } catch (Exception e) {
            ActiveMQServerLogger.LOGGER.failedToFinishDelivery(e);
            return false;
        }
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public void setTransferring(boolean z) {
        synchronized (this.lock) {
            this.transferring = z;
        }
        if (z) {
            FutureLatch futureLatch = new FutureLatch();
            this.messageQueue.getExecutor().execute(futureLatch);
            if (!futureLatch.await(10000L)) {
                ActiveMQServerLogger.LOGGER.errorTransferringConsumer();
            }
        }
        if (z) {
            flushDelivery();
        } else {
            promptDelivery();
        }
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public void receiveCredits(int i) {
        if (i == -1) {
            logger.debug("{}:: FlowControl::Received disable flow control message", this);
            this.availableCredits = null;
            promptDelivery();
        } else {
            if (i == 0) {
                logger.debug("{}:: FlowControl::Received reset flow control message", this);
                this.availableCredits.set(0);
                return;
            }
            int andAdd = this.availableCredits.getAndAdd(i);
            if (logger.isDebugEnabled()) {
                logger.debug("{}::FlowControl::Received {} credits, previous value = {} currentValue = {}", this, Integer.valueOf(i), Integer.valueOf(andAdd), Integer.valueOf(this.availableCredits.get()));
            }
            if (andAdd > 0 || andAdd + i <= 0) {
                return;
            }
            logger.trace("{}::calling promptDelivery from receiving credits", this);
            promptDelivery();
        }
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public Queue getQueue() {
        return this.messageQueue;
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public synchronized List<MessageReference> scanDeliveringReferences(boolean z, Function<MessageReference, Boolean> function, Function<MessageReference, Boolean> function2) {
        LinkedList linkedList = new LinkedList();
        boolean z2 = false;
        synchronized (this.lock) {
            Iterator<MessageReference> it = this.deliveringRefs.iterator();
            while (it.hasNext()) {
                MessageReference next = it.next();
                if (!z2 && function.apply(next).booleanValue()) {
                    z2 = true;
                }
                if (z2) {
                    if (z) {
                        it.remove();
                    }
                    linkedList.add(next);
                    if (function2.apply(next).booleanValue()) {
                        break;
                    }
                }
            }
        }
        return linkedList;
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public synchronized List<Long> acknowledge(Transaction transaction, long j) throws Exception {
        MessageReference poll;
        if (this.browseOnly) {
            return null;
        }
        boolean z = false;
        if (transaction == null) {
            z = true;
            transaction = new TransactionImpl(this.storageManager);
        }
        try {
            ArrayList arrayList = new ArrayList();
            do {
                synchronized (this.lock) {
                    poll = this.deliveringRefs.poll();
                }
                if (logger.isTraceEnabled()) {
                    logger.trace("ACKing ref {} on tx={}, consumer={}", poll, transaction, this);
                }
                if (poll == null) {
                    ActiveMQIllegalStateException consumerNoReference = ActiveMQMessageBundle.BUNDLE.consumerNoReference(Long.valueOf(this.id), Long.valueOf(j), this.messageQueue.getName());
                    transaction.markAsRollbackOnly(consumerNoReference);
                    throw consumerNoReference;
                }
                poll.acknowledge(transaction, this);
                arrayList.add(Long.valueOf(poll.getMessageID()));
                this.metrics.addAcknowledge(poll.getMessage().getEncodeSize(), transaction);
            } while (poll.getMessageID() != j);
            if (z) {
                transaction.commit();
            }
            return arrayList;
        } catch (ActiveMQException e) {
            if (z) {
                transaction.rollback();
            } else {
                transaction.markAsRollbackOnly(e);
            }
            throw e;
        } catch (Throwable th) {
            ActiveMQServerLogger.LOGGER.errorAckingMessage((Exception) th);
            ActiveMQIllegalStateException activeMQIllegalStateException = new ActiveMQIllegalStateException(th.getMessage());
            if (z) {
                transaction.rollback();
            } else {
                transaction.markAsRollbackOnly(activeMQIllegalStateException);
            }
            throw activeMQIllegalStateException;
        }
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public synchronized void individualAcknowledge(Transaction transaction, long j) throws Exception {
        if (this.browseOnly) {
            return;
        }
        boolean z = false;
        if (logger.isTraceEnabled()) {
            logger.trace("individualACK messageID={}", Long.valueOf(j));
        }
        if (transaction == null) {
            logger.trace("individualACK starting new TX");
            z = true;
            transaction = new TransactionImpl(this.storageManager);
        }
        try {
            MessageReference removeReferenceByID = removeReferenceByID(j);
            if (logger.isTraceEnabled()) {
                logger.trace("ACKing ref {} on tx={}, consumer={}", removeReferenceByID, transaction, this);
            }
            if (removeReferenceByID == null) {
                ActiveMQIllegalStateException consumerNoReference = ActiveMQMessageBundle.BUNDLE.consumerNoReference(Long.valueOf(this.id), Long.valueOf(j), this.messageQueue.getName());
                transaction.markAsRollbackOnly(consumerNoReference);
                throw consumerNoReference;
            }
            if (RefCountMessage.isRefTraceEnabled()) {
                RefCountMessage.deferredDebug(removeReferenceByID.getMessage(), "Individually acked on tx={}", Long.valueOf(transaction.getID()));
            }
            this.metrics.addAcknowledge(removeReferenceByID.getMessage().getEncodeSize(), transaction);
            removeReferenceByID.acknowledge(transaction, this);
            if (z) {
                transaction.commit();
            }
        } catch (ActiveMQException e) {
            if (z) {
                transaction.rollback();
            } else if (transaction != null) {
                transaction.markAsRollbackOnly(e);
            }
            throw e;
        } catch (Throwable th) {
            ActiveMQServerLogger.LOGGER.errorAckingMessage((Exception) th);
            ActiveMQIllegalStateException activeMQIllegalStateException = new ActiveMQIllegalStateException(th.getMessage());
            if (z) {
                transaction.rollback();
            } else if (transaction != null) {
                transaction.markAsRollbackOnly(activeMQIllegalStateException);
            }
            throw activeMQIllegalStateException;
        }
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public synchronized void individualCancel(long j, boolean z) throws Exception {
        if (this.browseOnly) {
            return;
        }
        MessageReference removeReferenceByID = removeReferenceByID(j);
        if (removeReferenceByID == null) {
            throw new IllegalStateException("Cannot find ref to ack " + j);
        }
        if (!z) {
            removeReferenceByID.decrementDeliveryCount();
        }
        this.metrics.addAcknowledge(removeReferenceByID.getMessage().getEncodeSize(), null);
        removeReferenceByID.getQueue().cancel(removeReferenceByID, System.currentTimeMillis());
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public synchronized void reject(long j) throws Exception {
        MessageReference removeReferenceByID;
        if (this.browseOnly || (removeReferenceByID = removeReferenceByID(j)) == null) {
            return;
        }
        this.metrics.addAcknowledge(removeReferenceByID.getMessage().getEncodeSize(), null);
        removeReferenceByID.getQueue().sendToDeadLetterAddress(null, removeReferenceByID);
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public synchronized void backToDelivering(MessageReference messageReference) {
        synchronized (this.lock) {
            if (RefCountMessage.isRefTraceEnabled()) {
                RefCountMessage.deferredDebug(messageReference.getMessage(), "Adding message back to delivering", new Object[0]);
            }
            logger.trace("Message {} back to delivering", messageReference);
            this.deliveringRefs.addFirst(messageReference);
            this.metrics.addMessage(messageReference.getMessage().getEncodeSize());
        }
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public synchronized MessageReference removeReferenceByID(long j) throws Exception {
        if (this.browseOnly) {
            return null;
        }
        synchronized (this.lock) {
            if (this.deliveringRefs.isEmpty()) {
                logger.trace("removeReferenceByID {} return null", Long.valueOf(j));
                return null;
            }
            if (this.deliveringRefs.peek().getMessage().getMessageID() == j) {
                MessageReference poll = this.deliveringRefs.poll();
                if (logger.isTraceEnabled()) {
                    logger.trace("Remove Message By ID {} return ref {} after peek call", Long.valueOf(j), poll);
                }
                return poll;
            }
            MessageReference removeDeliveringRefById = removeDeliveringRefById(j);
            if (logger.isTraceEnabled()) {
                logger.trace("Remove Message By ID {} return ref {} after scan call", Long.valueOf(j), removeDeliveringRefById);
            }
            return removeDeliveringRefById;
        }
    }

    private MessageReference removeDeliveringRefById(long j) {
        logger.trace("RemoveDeiveringRefByID {}", Long.valueOf(j));
        Iterator<MessageReference> it = this.deliveringRefs.iterator();
        MessageReference messageReference = null;
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            MessageReference next = it.next();
            if (next.getMessage().getMessageID() == j) {
                it.remove();
                messageReference = next;
                logger.trace("Returning {}", next);
                break;
            }
        }
        return messageReference;
    }

    public AtomicInteger getAvailableCredits() {
        return this.availableCredits;
    }

    public String toString() {
        long j = this.id;
        Filter filter = this.filter;
        Binding binding = this.binding;
        boolean z = this.isClosed;
        return "ServerConsumerImpl [id=" + j + ", filter=" + j + ", binding=" + filter + ", closed=" + binding + "]";
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public String toManagementString() {
        Object connectionID = getConnectionID();
        String sessionID = getSessionID();
        long j = this.id;
        Filter filter = this.filter;
        this.binding.toManagementString();
        return "ServerConsumer [id=" + connectionID + ":" + sessionID + ":" + j + ", filter=" + connectionID + ", binding=" + filter + "]";
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public void disconnect() {
        this.callback.disconnect(this, "Queue deleted: " + getQueue().getName());
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public void failed(Throwable th) {
        try {
            close(true);
        } catch (Throwable th2) {
            logger.warn(th2.getMessage(), th2);
        }
        if (this.callback != null) {
            this.callback.disconnect(this, th.getMessage());
        }
    }

    public float getRate() {
        float currentTimeMillis = ((float) (System.currentTimeMillis() - this.consumerRateCheckTime.getAndSet(System.currentTimeMillis()))) / 1000.0f;
        long messagesAcknowledged = this.metrics.getMessagesAcknowledged();
        if (currentTimeMillis != Const.default_value_float) {
            return BigDecimal.valueOf(((float) (messagesAcknowledged - this.messageConsumedSnapshot.getAndSet(messagesAcknowledged))) / currentTimeMillis).setScale(2, 0).floatValue();
        }
        this.messageConsumedSnapshot.getAndSet(messagesAcknowledged);
        return Const.default_value_float;
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public void promptDelivery() {
        if (this.largeMessageDeliverer != null) {
            resumeLargeMessage();
        } else {
            forceDelivery();
        }
    }

    private void forceDelivery() {
        if (this.browseOnly) {
            this.messageQueue.getExecutor().execute(this.browserDeliverer);
        } else {
            this.messageQueue.deliverAsync();
        }
    }

    private void resumeLargeMessage() {
        this.messageQueue.getExecutor().execute(this.resumeLargeMessageRunnable);
    }

    private void deliverStandardMessage(MessageReference messageReference) {
        applyPrefixForLegacyConsumer(messageReference.getMessage());
        int sendMessage = this.callback.sendMessage(messageReference, this, messageReference.getDeliveryCount());
        if (this.availableCredits != null) {
            this.availableCredits.addAndGet(-sendMessage);
            if (logger.isTraceEnabled()) {
                logger.trace("{}::FlowControl::delivery standard taking {} from credits, available now is {}", this, Integer.valueOf(sendMessage), this.availableCredits);
            }
        }
    }

    private void applyPrefixForLegacyConsumer(Message message) {
        if (this.requiresLegacyPrefix) {
            if (this.anycast) {
                if (message.getAddress().startsWith(PacketImpl.OLD_QUEUE_PREFIX.toString())) {
                    return;
                }
                message.setAddress(PacketImpl.OLD_QUEUE_PREFIX + message.getAddress());
            } else {
                if (message.getAddress().startsWith(PacketImpl.OLD_TOPIC_PREFIX.toString())) {
                    return;
                }
                message.setAddress(PacketImpl.OLD_TOPIC_PREFIX + message.getAddress());
            }
        }
    }

    public void setPreAcknowledge(boolean z) {
        this.preAcknowledge = z;
    }

    @Override // org.apache.activemq.artemis.core.server.ConsumerInfo
    public long getSequentialID() {
        return this.sequentialID;
    }

    @Override // org.apache.activemq.artemis.core.server.ConsumerInfo
    public SimpleString getQueueName() {
        return getQueue().getName();
    }

    @Override // org.apache.activemq.artemis.core.server.ConsumerInfo
    public RoutingType getQueueType() {
        return getQueue().getRoutingType();
    }

    @Override // org.apache.activemq.artemis.core.server.ConsumerInfo
    public SimpleString getQueueAddress() {
        return getQueue().getAddress();
    }

    @Override // org.apache.activemq.artemis.core.server.ConsumerInfo
    public String getSessionName() {
        return this.session.getName();
    }

    @Override // org.apache.activemq.artemis.core.server.ConsumerInfo
    public String getConnectionClientID() {
        return this.session.getRemotingConnection().getClientID();
    }

    @Override // org.apache.activemq.artemis.core.server.ConsumerInfo
    public String getConnectionProtocolName() {
        return this.session.getRemotingConnection().getProtocolName();
    }

    @Override // org.apache.activemq.artemis.core.server.ConsumerInfo
    public String getConnectionLocalAddress() {
        return this.session.getRemotingConnection().getTransportConnection().getLocalAddress();
    }

    @Override // org.apache.activemq.artemis.core.server.ConsumerInfo
    public String getConnectionRemoteAddress() {
        if (this.session == null || this.session.getRemotingConnection() == null || this.session.getRemotingConnection().getTransportConnection() == null) {
            return null;
        }
        return this.session.getRemotingConnection().getTransportConnection().getRemoteAddress();
    }

    @Override // org.apache.activemq.artemis.core.server.ConsumerInfo
    public long getMessagesInTransitSize() {
        return this.metrics.getMessagesInTransitSize();
    }

    @Override // org.apache.activemq.artemis.core.server.ConsumerInfo
    public int getMessagesInTransit() {
        return this.deliveringRefs.size();
    }

    @Override // org.apache.activemq.artemis.core.server.ConsumerInfo
    public long getLastDeliveredTime() {
        return this.metrics.getLastDeliveredTime();
    }

    @Override // org.apache.activemq.artemis.core.server.ConsumerInfo
    public long getLastAcknowledgedTime() {
        return this.metrics.getLastAcknowledgedTime();
    }

    @Override // org.apache.activemq.artemis.core.server.ConsumerInfo
    public long getMessagesAcknowledged() {
        return this.metrics.getMessagesAcknowledged();
    }

    @Override // org.apache.activemq.artemis.core.server.ConsumerInfo
    public long getMessagesDeliveredSize() {
        return this.metrics.getMessagesDeliveredSize();
    }

    @Override // org.apache.activemq.artemis.core.server.ConsumerInfo
    public long getMessagesDelivered() {
        return this.metrics.getMessagesDelivered();
    }

    @Override // org.apache.activemq.artemis.core.server.ConsumerInfo
    public int getMessagesAcknowledgedAwaitingCommit() {
        return this.metrics.getMessagesAcknowledgedAwaitingCommit();
    }

    public SessionCallback getCallback() {
        return this.callback;
    }
}
