package org.apache.activemq.artemis.protocol.amqp.proton;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transaction.Declare;
import org.apache.qpid.proton.amqp.transaction.Declared;
import org.apache.qpid.proton.amqp.transaction.Discharge;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.jboss.logging.Logger;

/* loaded from: input_file:artemis-amqp-protocol-1.5.3.jar:org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.class */
public class ProtonTransactionHandler implements ProtonDeliveryHandler {
    private static final Logger log = Logger.getLogger((Class<?>) ProtonTransactionHandler.class);
    final AMQPSessionCallback sessionSPI;

    public ProtonTransactionHandler(AMQPSessionCallback aMQPSessionCallback) {
        this.sessionSPI = aMQPSessionCallback;
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler
    public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
        ByteBuf heapBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
        try {
            try {
                Receiver receiver = (Receiver) delivery.getLink();
                if (!delivery.isReadable()) {
                    delivery.settle();
                    heapBuffer.release();
                    return;
                }
                DeliveryUtil.readDelivery(receiver, heapBuffer);
                receiver.advance();
                Object value = ((AmqpValue) DeliveryUtil.decodeMessageImpl(heapBuffer).getBody()).getValue();
                if (value instanceof Declare) {
                    Binary newTransaction = this.sessionSPI.newTransaction();
                    Declared declared = new Declared();
                    declared.setTxnId(newTransaction);
                    delivery.disposition(declared);
                    delivery.settle();
                } else if (value instanceof Discharge) {
                    Discharge discharge = (Discharge) value;
                    Binary txnId = discharge.getTxnId();
                    if (discharge.getFail().booleanValue()) {
                        try {
                            this.sessionSPI.rollbackTX(txnId, true);
                            delivery.disposition(new Accepted());
                        } catch (Exception e) {
                            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorRollingbackCoordinator(e.getMessage());
                        }
                    } else {
                        try {
                            this.sessionSPI.commitTX(txnId);
                            delivery.disposition(new Accepted());
                        } catch (ActiveMQAMQPException e2) {
                            throw e2;
                        } catch (Exception e3) {
                            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCommittingCoordinator(e3.getMessage());
                        }
                    }
                }
                delivery.settle();
                heapBuffer.release();
            } catch (Throwable th) {
                delivery.settle();
                heapBuffer.release();
                throw th;
            }
        } catch (ActiveMQAMQPException e4) {
            delivery.disposition(createRejected(e4.getAmqpError(), e4.getMessage()));
            delivery.settle();
            heapBuffer.release();
        } catch (Exception e5) {
            log.warn(e5.getMessage(), e5);
            delivery.disposition(createRejected(Symbol.getSymbol("failed"), e5.getMessage()));
            delivery.settle();
            heapBuffer.release();
        }
    }

    private Rejected createRejected(Symbol symbol, String str) {
        Rejected rejected = new Rejected();
        ErrorCondition errorCondition = new ErrorCondition();
        errorCondition.setCondition(symbol);
        errorCondition.setDescription(str);
        rejected.setError(errorCondition);
        return rejected;
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler
    public void onFlow(int i, boolean z) {
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler
    public void close(boolean z) throws ActiveMQAMQPException {
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler
    public void close(ErrorCondition errorCondition) throws ActiveMQAMQPException {
    }
}
