package org.apache.activemq.artemis.protocol.amqp.proton;

import java.lang.invoke.MethodHandles;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RefCountMessage;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.converter.coreWrapper.ConversionException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Sender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.class */
public class ProtonServerSenderContext extends ProtonInitializable implements ProtonDeliveryHandler {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    protected static final byte[] EMPTY_DELIVERY_TAG = new byte[0];
    private final ConnectionFlushIOCallback connectionFlusher;
    protected final AMQPSessionContext protonSession;
    protected final Sender sender;
    protected final AMQPConnectionContext connection;
    protected final AMQPSessionCallback sessionSPI;
    private final Object creditsLock;
    private final boolean amqpTreatRejectAsUnmodifiedDeliveryFailed;
    private final AtomicBoolean draining;
    private SenderController controller;
    private ServerConsumer brokerConsumer;
    private ReadyListener onflowControlReady;
    private boolean closed;
    private boolean preSettle;
    private int credits;
    private AtomicInteger pending;
    private Consumer<? super MessageReference> beforeDelivery;
    protected volatile Runnable afterDelivery;
    protected volatile MessageWriter messageWriter;

    /* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext$ConnectionFlushIOCallback.class */
    private final class ConnectionFlushIOCallback implements IOCallback {
        private ConnectionFlushIOCallback() {
        }

        @Override // org.apache.activemq.artemis.core.io.IOCallback
        public void done() {
            ProtonServerSenderContext.this.connection.flush();
        }

        @Override // org.apache.activemq.artemis.core.io.IOCallback
        public void onError(int i, String str) {
            ProtonServerSenderContext.this.connection.flush();
        }
    }

    public ProtonServerSenderContext(AMQPConnectionContext aMQPConnectionContext, Sender sender, AMQPSessionContext aMQPSessionContext, AMQPSessionCallback aMQPSessionCallback) {
        this(aMQPConnectionContext, sender, aMQPSessionContext, aMQPSessionCallback, null);
    }

    public ProtonServerSenderContext(AMQPConnectionContext aMQPConnectionContext, Sender sender, AMQPSessionContext aMQPSessionContext, AMQPSessionCallback aMQPSessionCallback, SenderController senderController) {
        this.connectionFlusher = new ConnectionFlushIOCallback();
        this.creditsLock = new Object();
        this.draining = new AtomicBoolean(false);
        this.closed = false;
        this.credits = 0;
        this.pending = new AtomicInteger(0);
        this.messageWriter = SenderController.REJECTING_MESSAGE_WRITER;
        this.controller = senderController;
        this.connection = aMQPConnectionContext;
        this.sender = sender;
        this.protonSession = aMQPSessionContext;
        this.sessionSPI = aMQPSessionCallback;
        this.amqpTreatRejectAsUnmodifiedDeliveryFailed = this.connection.getProtocolManager().isAmqpTreatRejectAsUnmodifiedDeliveryFailed();
    }

    public ProtonServerSenderContext setBeforeDelivery(Consumer<? super MessageReference> consumer) {
        this.beforeDelivery = consumer;
        return this;
    }

    public ServerConsumer getBrokerConsumer() {
        return this.brokerConsumer;
    }

    protected String getClientId() {
        return this.connection.getRemoteContainer();
    }

    public AMQPSessionContext getSessionContext() {
        return this.protonSession;
    }

    public Sender getSender() {
        return this.sender;
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler
    public void onFlow(int i, boolean z) {
        if (logger.isDebugEnabled()) {
            logger.debug("flow {}, draing={}", Integer.valueOf(i), Boolean.valueOf(z));
        }
        this.connection.requireInHandler();
        setupCredit();
        ServerConsumerImpl serverConsumerImpl = (ServerConsumerImpl) this.brokerConsumer;
        if (!z) {
            serverConsumerImpl.receiveCredits(-1);
        } else if (this.draining.compareAndSet(false, true)) {
            flushDrain(serverConsumerImpl);
        }
    }

    private void flushDrain(ServerConsumerImpl serverConsumerImpl) {
        serverConsumerImpl.forceDelivery(1L, () -> {
            try {
                this.connection.runNow(() -> {
                    if (this.messageWriter.isWriting()) {
                        this.afterDelivery = () -> {
                            flushDrain(serverConsumerImpl);
                        };
                    } else {
                        drained();
                    }
                });
            } finally {
                this.draining.set(false);
            }
        });
    }

    private void drained() {
        this.connection.requireInHandler();
        this.sender.drained();
        this.connection.instantFlush();
        setupCredit();
    }

    public boolean hasCredits() {
        boolean z;
        if (this.messageWriter.isWriting() || !this.connection.flowControl(this.onflowControlReady)) {
            return false;
        }
        synchronized (this.creditsLock) {
            z = this.credits > 0 && this.sender.getLocalState() != EndpointState.CLOSED;
        }
        return z;
    }

    private void setupCredit() {
        synchronized (this.creditsLock) {
            this.credits = this.sender.getCredit() - this.pending.get();
            if (this.credits < 0) {
                this.credits = 0;
            }
        }
    }

    public void start() throws ActiveMQAMQPException {
        this.sessionSPI.start();
        try {
            if (this.brokerConsumer != null) {
                this.sessionSPI.startSender(this.brokerConsumer);
            }
        } catch (Exception e) {
            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorStartingConsumer(e.getMessage());
        }
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable
    public void initialize() throws Exception {
        this.initialized = true;
        if (this.controller == null) {
            this.controller = new DefaultSenderController(this.protonSession, this.sender, getClientId());
        }
        try {
            this.brokerConsumer = (ServerConsumer) this.controller.init(this);
            this.preSettle = this.sender.getSenderSettleMode() == SenderSettleMode.SETTLED;
            ServerConsumer serverConsumer = this.brokerConsumer;
            Objects.requireNonNull(serverConsumer);
            this.onflowControlReady = serverConsumer::promptDelivery;
        } catch (ActiveMQQueueMaxConsumerLimitReached e) {
            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage());
        } catch (ActiveMQSecurityException e2) {
            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e2.getMessage());
        } catch (ActiveMQAMQPResourceLimitExceededException e3) {
            throw e3;
        } catch (ActiveMQException e4) {
            throw e4;
        } catch (Exception e5) {
            ActiveMQAMQPInternalErrorException errorCreatingConsumer = ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e5.getMessage());
            errorCreatingConsumer.initCause(e5);
            throw errorCreatingConsumer;
        }
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler
    public void close(ErrorCondition errorCondition) throws ActiveMQAMQPException {
        if (this.closed) {
            return;
        }
        this.closed = true;
        if (errorCondition != null) {
            this.sender.setCondition(errorCondition);
        }
        this.protonSession.removeSender(this.sender);
        this.connection.runNow(() -> {
            this.sender.close();
            try {
                this.sessionSPI.closeSender(this.brokerConsumer);
            } catch (Exception e) {
                logger.warn(e.getMessage(), (Throwable) e);
            } finally {
                this.messageWriter.close();
            }
            this.sender.close();
            this.connection.flush();
        });
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler
    public void close(boolean z) throws ActiveMQAMQPException {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.connection.runLater(() -> {
            try {
                this.protonSession.removeSender(this.sender);
                this.sessionSPI.closeSender(this.brokerConsumer);
                if (z) {
                    this.controller.close();
                }
            } catch (Exception e) {
                logger.warn(e.getMessage(), (Throwable) e);
            } finally {
                this.messageWriter.close();
            }
        });
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler
    public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
        if (this.closed) {
            return;
        }
        OperationContext recoverContext = this.sessionSPI.recoverContext();
        try {
            MessageReference messageReference = (MessageReference) delivery.getContext();
            Message message = messageReference != null ? messageReference.getMessage() : null;
            DeliveryState remoteState = delivery.getRemoteState();
            if (remoteState == null || remoteState.getType() != DeliveryState.DeliveryStateType.Accepted) {
                handleExtendedDeliveryOutcomes(message, delivery, remoteState);
            } else if (!delivery.isSettled()) {
                doAck(message);
                delivery.settle();
            }
            if (!this.preSettle) {
                this.protonSession.replaceTag(delivery.getTag());
            }
        } finally {
            this.sessionSPI.afterIO(this.connectionFlusher);
            this.sessionSPI.resetContext(recoverContext);
        }
    }

    protected void doAck(Message message) throws ActiveMQAMQPIllegalStateException {
        try {
            this.sessionSPI.ack(null, this.brokerConsumer, message);
        } catch (Exception e) {
            logger.warn(e.toString(), (Throwable) e);
            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
        }
    }

    private boolean handleExtendedDeliveryOutcomes(Message message, Delivery delivery, DeliveryState deliveryState) throws ActiveMQAMQPException {
        boolean z = true;
        boolean z2 = true;
        if (deliveryState == null) {
            logger.debug("Received null disposition for delivery update: {}", deliveryState);
            return true;
        }
        switch (deliveryState.getType()) {
            case Transactional:
                TransactionalState transactionalState = (TransactionalState) deliveryState;
                ProtonTransactionImpl protonTransactionImpl = (ProtonTransactionImpl) this.sessionSPI.getTransaction(transactionalState.getTxnId(), false);
                if (transactionalState.getOutcome() != null) {
                    z = false;
                    if (transactionalState.getOutcome() instanceof Accepted) {
                        if (!delivery.remotelySettled()) {
                            TransactionalState transactionalState2 = new TransactionalState();
                            transactionalState2.setOutcome(Accepted.getInstance());
                            transactionalState2.setTxnId(transactionalState.getTxnId());
                            delivery.disposition(transactionalState2);
                        }
                        try {
                            if (RefCountMessage.isRefTraceEnabled()) {
                                Object[] objArr = new Object[1];
                                objArr[0] = protonTransactionImpl == null ? "no-tx" : Long.valueOf(protonTransactionImpl.getID());
                                RefCountMessage.deferredDebug(message, "Adding ACK message to TX {}", objArr);
                            }
                            this.sessionSPI.ack(protonTransactionImpl, this.brokerConsumer, message);
                            protonTransactionImpl.addDelivery(delivery, this);
                            break;
                        } catch (Exception e) {
                            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
                        }
                    }
                }
                break;
            case Released:
                try {
                    this.sessionSPI.cancel(this.brokerConsumer, message, false);
                    break;
                } catch (Exception e2) {
                    throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e2.getMessage());
                }
            case Rejected:
                try {
                    if (this.amqpTreatRejectAsUnmodifiedDeliveryFailed) {
                        this.sessionSPI.cancel(this.brokerConsumer, message, true);
                    } else {
                        this.sessionSPI.reject(this.brokerConsumer, message);
                    }
                    break;
                } catch (Exception e3) {
                    throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e3.getMessage());
                }
            case Modified:
                try {
                    Modified modified = (Modified) deliveryState;
                    if (Boolean.TRUE.equals(modified.getUndeliverableHere())) {
                        message.rejectConsumer(this.brokerConsumer.sequentialID());
                    }
                    if (Boolean.TRUE.equals(modified.getDeliveryFailed())) {
                        this.sessionSPI.cancel(this.brokerConsumer, message, true);
                    } else {
                        this.sessionSPI.cancel(this.brokerConsumer, message, false);
                    }
                    break;
                } catch (Exception e4) {
                    throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e4.getMessage());
                }
            default:
                logger.debug("Received null or unknown disposition for delivery update: {}", deliveryState);
                z2 = false;
                break;
        }
        if (z) {
            delivery.settle();
        }
        return z2;
    }

    public void settle(Delivery delivery) {
        this.connection.requireInHandler();
        delivery.settle();
    }

    public synchronized void checkState() {
        this.sessionSPI.resumeDelivery(this.brokerConsumer);
    }

    public int deliverMessage(MessageReference messageReference, ServerConsumer serverConsumer) throws Exception {
        if (this.closed) {
            return 0;
        }
        if (this.beforeDelivery != null) {
            this.beforeDelivery.accept(messageReference);
        }
        synchronized (this.creditsLock) {
            if (this.sender.getLocalState() == EndpointState.CLOSED) {
                return 0;
            }
            this.pending.incrementAndGet();
            this.credits--;
            MessageWriter open = this.controller.selectOutgoingMessageWriter(this, messageReference).open();
            this.messageWriter = open;
            if (!(messageReference instanceof Runnable) || !serverConsumer.allowReferenceCallback()) {
                this.connection.runNow(() -> {
                    open.accept(messageReference);
                });
                return 1;
            }
            messageReference.onDelivery(open);
            this.connection.runNow((Runnable) messageReference);
            return 1;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Delivery createDelivery(MessageReference messageReference, int i) {
        Delivery delivery = this.sender.delivery(this.preSettle ? EMPTY_DELIVERY_TAG : this.protonSession.getTag());
        delivery.setContext(messageReference);
        delivery.setMessageFormat(i);
        return delivery;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reportDeliveryError(MessageWriter messageWriter, MessageReference messageReference, Exception exc) {
        if (!(exc instanceof ConversionException) || !(this.brokerConsumer.getBinding() instanceof LocalQueueBinding)) {
            logger.warn(exc.getMessage(), (Throwable) exc);
            this.brokerConsumer.errorProcessing(exc, messageReference);
            return;
        }
        ActiveMQAMQPProtocolLogger.LOGGER.messageConversionFailed(exc);
        try {
            ((LocalQueueBinding) this.brokerConsumer.getBinding()).getQueue().sendToDeadLetterAddress(null, messageReference);
        } catch (Exception e) {
            ActiveMQAMQPProtocolLogger.LOGGER.unableToSendMessageToDLA(messageReference, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reportDeliveryComplete(MessageWriter messageWriter, MessageReference messageReference, Delivery delivery, boolean z) {
        Runnable runnable = this.afterDelivery;
        this.afterDelivery = null;
        synchronized (this.creditsLock) {
            this.pending.decrementAndGet();
        }
        if (this.preSettle) {
            try {
                this.sessionSPI.ack(null, this.brokerConsumer, messageReference.getMessage());
            } catch (Exception e) {
                logger.debug(e.getMessage(), (Throwable) e);
            }
            delivery.settle();
        } else {
            this.sender.advance();
        }
        messageWriter.close();
        if (this.closed) {
            return;
        }
        if (runnable != null) {
            runnable.run();
        }
        if (!z) {
            this.connection.flush();
        } else {
            this.brokerConsumer.promptDelivery();
            this.connection.instantFlush();
        }
    }
}
