package org.apache.activemq.artemis.protocol.amqp.proton;

import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPLargeMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;

/* loaded from: input_file:BOOT-INF/lib/artemis-amqp-protocol-2.19.1.jar:org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.class */
public abstract class ProtonAbstractReceiver extends ProtonInitializable implements ProtonDeliveryHandler {
    protected final AMQPConnectionContext connection;
    protected final AMQPSessionContext protonSession;
    protected final Receiver receiver;
    protected final int amqpCredits;
    protected final int minCreditRefresh;
    protected final int minLargeMessageSize;
    final RoutingContext routingContext;
    protected final AMQPSessionCallback sessionSPI;
    protected volatile AMQPLargeMessage currentLargeMessage;
    protected final Runnable creditRunnable;
    protected final boolean useModified;
    protected int pendingSettles = 0;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/artemis-amqp-protocol-2.19.1.jar:org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver$FlowControlRunner.class */
    public static class FlowControlRunner implements Runnable {
        final int refill;
        final int threshold;
        final Receiver receiver;
        final AMQPConnectionContext connection;
        final ProtonAbstractReceiver context;

        FlowControlRunner(int i, int i2, Receiver receiver, AMQPConnectionContext aMQPConnectionContext, ProtonAbstractReceiver protonAbstractReceiver) {
            this.refill = i;
            this.threshold = i2;
            this.receiver = receiver;
            this.connection = aMQPConnectionContext;
            this.context = protonAbstractReceiver;
        }

        @Override // java.lang.Runnable
        public void run() {
            int calculatedUpdateRefill;
            if (!this.connection.isHandler()) {
                this.connection.runLater(this);
                return;
            }
            this.connection.requireInHandler();
            int i = this.context != null ? this.context.pendingSettles : 0;
            if (!ProtonAbstractReceiver.isBellowThreshold(this.receiver.getCredit(), i, this.threshold) || (calculatedUpdateRefill = ProtonAbstractReceiver.calculatedUpdateRefill(this.refill, this.receiver.getCredit(), i)) <= 0) {
                return;
            }
            this.receiver.flow(calculatedUpdateRefill);
            this.connection.instantFlush();
        }
    }

    public static boolean isBellowThreshold(int i, int i2, int i3) {
        return i <= i3 - i2;
    }

    public static int calculatedUpdateRefill(int i, int i2, int i3) {
        return (i - i2) - i3;
    }

    public ProtonAbstractReceiver(AMQPSessionCallback aMQPSessionCallback, AMQPConnectionContext aMQPConnectionContext, AMQPSessionContext aMQPSessionContext, Receiver receiver) {
        this.sessionSPI = aMQPSessionCallback;
        this.connection = aMQPConnectionContext;
        this.protonSession = aMQPSessionContext;
        this.receiver = receiver;
        this.amqpCredits = aMQPConnectionContext.getAmqpCredits();
        this.minCreditRefresh = aMQPConnectionContext.getAmqpLowCredits();
        this.minLargeMessageSize = aMQPConnectionContext.getProtocolManager().getAmqpMinLargeMessageSize();
        this.creditRunnable = createCreditRunnable(this.amqpCredits, this.minCreditRefresh, receiver, aMQPConnectionContext, this);
        this.useModified = this.connection.getProtocolManager().isUseModifiedForTransientDeliveryErrors();
        this.routingContext = new RoutingContextImpl(null).setDuplicateDetection(aMQPConnectionContext.getProtocolManager().isAmqpDuplicateDetection());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void recoverContext() {
        this.sessionSPI.recoverContext();
    }

    protected void clearLargeMessage() {
        this.connection.runNow(() -> {
            try {
            } catch (Throwable th) {
                ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(th);
            } finally {
                this.currentLargeMessage = null;
            }
            if (this.currentLargeMessage != null) {
                this.currentLargeMessage.deleteFile();
            }
        });
    }

    public static Runnable createCreditRunnable(int i, int i2, Receiver receiver, AMQPConnectionContext aMQPConnectionContext, ProtonAbstractReceiver protonAbstractReceiver) {
        return new FlowControlRunner(i, i2, receiver, aMQPConnectionContext, protonAbstractReceiver);
    }

    public static Runnable createCreditRunnable(int i, int i2, Receiver receiver, AMQPConnectionContext aMQPConnectionContext) {
        return new FlowControlRunner(i, i2, receiver, aMQPConnectionContext, null);
    }

    public void incrementSettle() {
        if (!$assertionsDisabled && this.pendingSettles < 0) {
            throw new AssertionError();
        }
        this.connection.requireInHandler();
        this.pendingSettles++;
    }

    public void settle(Delivery delivery) {
        this.connection.requireInHandler();
        this.pendingSettles--;
        if (!$assertionsDisabled && this.pendingSettles < 0) {
            throw new AssertionError();
        }
        delivery.settle();
        flow();
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler
    public void onFlow(int i, boolean z) {
        flow();
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler
    public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
        AMQPMessage createStandardMessage;
        this.connection.requireInHandler();
        Receiver receiver = (Receiver) delivery.getLink();
        if (receiver.current() != delivery) {
            return;
        }
        try {
            if (delivery.isAborted()) {
                clearLargeMessage();
                receiver.advance();
                delivery.settle();
                if (receiver.getDrain()) {
                    return;
                }
                receiver.flow(1);
                return;
            }
            if (delivery.isPartial()) {
                if (this.sessionSPI.getStorageManager() instanceof NullStorageManager) {
                    return;
                }
                if (this.currentLargeMessage != null) {
                    this.currentLargeMessage.addBytes(receiver.recv());
                    return;
                } else {
                    if (this.minLargeMessageSize <= 0 || delivery.available() < this.minLargeMessageSize) {
                        return;
                    }
                    initializeCurrentLargeMessage(delivery, receiver);
                    return;
                }
            }
            if (!(this.sessionSPI.getStorageManager() instanceof NullStorageManager) && this.currentLargeMessage == null && this.minLargeMessageSize > 0 && delivery.available() >= this.minLargeMessageSize) {
                initializeCurrentLargeMessage(delivery, receiver);
            }
            if (this.currentLargeMessage != null) {
                this.currentLargeMessage.addBytes(receiver.recv());
                receiver.advance();
                createStandardMessage = this.currentLargeMessage;
                this.currentLargeMessage.releaseResources(true, true);
                this.currentLargeMessage = null;
            } else {
                ReadableBuffer recv = receiver.recv();
                receiver.advance();
                createStandardMessage = this.sessionSPI.createStandardMessage(delivery, recv);
            }
            Transaction transaction = null;
            if (delivery.getRemoteState() instanceof TransactionalState) {
                transaction = this.sessionSPI.getTransaction(((TransactionalState) delivery.getRemoteState()).getTxnId(), false);
            }
            actualDelivery(createStandardMessage, delivery, receiver, transaction);
        } catch (Exception e) {
            throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
        }
    }

    protected void initializeCurrentLargeMessage(Delivery delivery, Receiver receiver) throws Exception {
        long generateID = this.sessionSPI.getStorageManager().generateID();
        this.currentLargeMessage = new AMQPLargeMessage(generateID, delivery.getMessageFormat(), null, this.sessionSPI.getCoreMessageObjectPools(), this.sessionSPI.getStorageManager());
        ReadableBuffer recv = receiver.recv();
        this.currentLargeMessage.parseHeader(recv);
        this.sessionSPI.getStorageManager().largeMessageCreated(generateID, this.currentLargeMessage);
        this.currentLargeMessage.addBytes(recv);
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler
    public void close(boolean z) throws ActiveMQAMQPException {
        this.protonSession.removeReceiver(this.receiver);
        clearLargeMessage();
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler
    public void close(ErrorCondition errorCondition) throws ActiveMQAMQPException {
        this.receiver.setCondition(errorCondition);
        close(false);
    }

    protected abstract void actualDelivery(AMQPMessage aMQPMessage, Delivery delivery, Receiver receiver, Transaction transaction);

    public abstract void flow();

    static {
        $assertionsDisabled = !ProtonAbstractReceiver.class.desiredAssertionStatus();
    }
}
