package org.apache.activemq.artemis.protocol.amqp.proton;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.lang.invoke.MethodHandles;
import org.apache.activemq.artemis.core.message.LargeBodyReader;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPLargeMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Sender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:artemis-amqp-protocol-2.32.0.jar:org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageWriter.class */
public class AMQPLargeMessageWriter implements MessageWriter {
    private static final Logger logger;
    private final ProtonServerSenderContext serverSender;
    private final AMQPConnectionContext connection;
    private final AMQPSessionCallback sessionSPI;
    private final Sender protonSender;
    private MessageReference reference;
    private AMQPLargeMessage message;
    private Delivery delivery;
    private long position;
    private boolean initialPacketHandled;
    private volatile boolean closed = true;
    static final /* synthetic */ boolean $assertionsDisabled;

    public AMQPLargeMessageWriter(ProtonServerSenderContext protonServerSenderContext) {
        this.serverSender = protonServerSenderContext;
        this.connection = protonServerSenderContext.getSessionContext().getAMQPConnectionContext();
        this.sessionSPI = protonServerSenderContext.getSessionContext().getSessionSPI();
        this.protonSender = protonServerSenderContext.getSender();
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.MessageWriter
    public boolean isWriting() {
        return !this.closed;
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.MessageWriter
    public void close() {
        if (this.closed) {
            return;
        }
        try {
            if (this.message != null) {
                this.message.usageDown();
            }
        } finally {
            reset(true);
        }
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.MessageWriter
    public AMQPLargeMessageWriter open() {
        if (!this.closed) {
            throw new IllegalStateException("Trying to open an AMQP Large Message writer that was not closed");
        }
        reset(false);
        return this;
    }

    private void reset(boolean z) {
        this.message = null;
        this.reference = null;
        this.delivery = null;
        this.position = 0L;
        this.initialPacketHandled = false;
        this.closed = z;
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.MessageWriter
    public void writeBytes(MessageReference messageReference) {
        if (this.protonSender.getLocalState() == EndpointState.CLOSED) {
            logger.debug("Not delivering message {} as the sender is closed and credits were available, if you see too many of these it means clients are issuing credits and closing the connection with pending credits a lot of times", messageReference);
            return;
        }
        if (this.closed) {
            throw new IllegalStateException("Cannot write to an AMQP Large Message Writer that has been closed");
        }
        this.reference = messageReference;
        this.message = (AMQPLargeMessage) messageReference.getMessage();
        if (this.sessionSPI.invokeOutgoing(this.message, (ActiveMQProtonRemotingConnection) this.sessionSPI.getTransportConnection().getProtocolConnection()) != null) {
            return;
        }
        this.delivery = this.serverSender.createDelivery(messageReference, (int) this.message.getMessageFormat());
        this.message.usageUp();
        tryDelivering();
    }

    private void resume() {
        this.connection.runNow(this::tryDelivering);
    }

    private void tryDelivering() {
        int outboundFrameSizeLimit = (this.protonSender.getSession().getConnection().getTransport().getOutboundFrameSizeLimit() - 50) - (this.delivery.getTag() != null ? this.delivery.getTag().length : 0);
        try {
            ByteBuf directBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(outboundFrameSizeLimit, outboundFrameSizeLimit);
            NettyReadable nettyReadable = new NettyReadable(directBuffer);
            try {
                LargeBodyReader largeBodyReader = this.message.getLargeBodyReader();
                try {
                    largeBodyReader.open();
                    largeBodyReader.position(this.position);
                    long size = largeBodyReader.getSize();
                    directBuffer.ensureWritable(outboundFrameSizeLimit);
                    if (!this.initialPacketHandled && this.protonSender.getLocalState() != EndpointState.CLOSED) {
                        if (!deliverInitialPacket(largeBodyReader, directBuffer)) {
                            if (largeBodyReader != null) {
                                largeBodyReader.close();
                            }
                            return;
                        }
                        this.initialPacketHandled = true;
                    }
                    while (this.protonSender.getLocalState() != EndpointState.CLOSED && this.position < size) {
                        if (!this.connection.flowControl(this::resume)) {
                            if (largeBodyReader != null) {
                                largeBodyReader.close();
                            }
                            directBuffer.release();
                            return;
                        }
                        directBuffer.clear();
                        int readInto = largeBodyReader.readInto(directBuffer.internalNioBuffer(0, outboundFrameSizeLimit));
                        directBuffer.writerIndex(readInto);
                        this.protonSender.send(nettyReadable);
                        this.position += readInto;
                        if (readInto > 0 && this.position < size) {
                            this.connection.instantFlush();
                        }
                    }
                    if (largeBodyReader != null) {
                        largeBodyReader.close();
                    }
                    directBuffer.release();
                    this.serverSender.reportDeliveryComplete(this, this.reference, this.delivery, true);
                } catch (Throwable th) {
                    if (largeBodyReader != null) {
                        try {
                            largeBodyReader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
                directBuffer.release();
            }
        } catch (Exception e) {
            this.serverSender.reportDeliveryError(this, this.reference, e);
        }
    }

    private boolean deliverInitialPacket(LargeBodyReader largeBodyReader, ByteBuf byteBuf) throws Exception {
        if (!$assertionsDisabled && (this.position != 0 || largeBodyReader.position() != 0 || this.initialPacketHandled)) {
            throw new AssertionError();
        }
        if (!this.connection.flowControl(this::resume)) {
            return false;
        }
        byteBuf.clear();
        this.message.checkReference(this.reference);
        DeliveryAnnotations deliveryAnnotations = (DeliveryAnnotations) this.reference.getProtocolData(DeliveryAnnotations.class);
        try {
            replaceInitialHeader(deliveryAnnotations, largeBodyReader, new NettyWritable(byteBuf));
            int i = 0;
            int writableBytes = byteBuf.writableBytes();
            if (writableBytes != 0) {
                int writerIndex = byteBuf.writerIndex();
                i = largeBodyReader.readInto(byteBuf.internalNioBuffer(writerIndex, writableBytes));
                if (i > 0) {
                    byteBuf.writerIndex(writerIndex + i);
                }
            }
            this.protonSender.send(new NettyReadable(byteBuf));
            if (i > 0) {
                this.position += i;
            }
            this.connection.instantFlush();
            return true;
        } catch (IndexOutOfBoundsException e) {
            if (!$assertionsDisabled && this.position != 0) {
                throw new AssertionError("this shouldn't happen unless replaceInitialHeader is updating position before modifying frameBuffer");
            }
            logger.debug("Delivery of message failed with an overFlowException, retrying again with expandable buffer");
            sendAndFlushInitialPacket(deliveryAnnotations, largeBodyReader);
            return true;
        }
    }

    private void sendAndFlushInitialPacket(DeliveryAnnotations deliveryAnnotations, LargeBodyReader largeBodyReader) throws Exception {
        ByteBuf directBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(AMQPMessageBrokerAccessor.getRemainingBodyPosition(this.message) * 2);
        try {
            replaceInitialHeader(deliveryAnnotations, largeBodyReader, new NettyWritable(directBuffer));
            this.protonSender.send(new NettyReadable(directBuffer));
            directBuffer.release();
            this.connection.instantFlush();
        } catch (Throwable th) {
            directBuffer.release();
            this.connection.instantFlush();
            throw th;
        }
    }

    private int replaceInitialHeader(DeliveryAnnotations deliveryAnnotations, LargeBodyReader largeBodyReader, WritableBuffer writableBuffer) throws Exception {
        TLSEncode.getEncoder().setByteBuffer(writableBuffer);
        try {
            int writeHeaderAndAnnotations = writeHeaderAndAnnotations(deliveryAnnotations);
            if (this.message.isReencoded()) {
                writeHeaderAndAnnotations = writeMessageAnnotationsPropertiesAndApplicationProperties(largeBodyReader, this.message);
            }
            largeBodyReader.position(writeHeaderAndAnnotations);
            this.position = writeHeaderAndAnnotations;
            int i = (int) this.position;
            TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
            return i;
        } catch (Throwable th) {
            TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
            throw th;
        }
    }

    private int writeMessageAnnotationsPropertiesAndApplicationProperties(LargeBodyReader largeBodyReader, AMQPLargeMessage aMQPLargeMessage) throws Exception {
        int remainingBodyPosition = AMQPMessageBrokerAccessor.getRemainingBodyPosition(aMQPLargeMessage);
        if (!$assertionsDisabled && remainingBodyPosition <= 0) {
            throw new AssertionError();
        }
        writeMessageAnnotationsPropertiesAndApplicationPropertiesInternal(aMQPLargeMessage);
        return remainingBodyPosition;
    }

    private void writeMessageAnnotationsPropertiesAndApplicationPropertiesInternal(AMQPLargeMessage aMQPLargeMessage) {
        MessageAnnotations decodedMessageAnnotations = AMQPMessageBrokerAccessor.getDecodedMessageAnnotations(aMQPLargeMessage);
        if (decodedMessageAnnotations != null) {
            TLSEncode.getEncoder().writeObject(decodedMessageAnnotations);
        }
        Properties currentProperties = AMQPMessageBrokerAccessor.getCurrentProperties(aMQPLargeMessage);
        if (currentProperties != null) {
            TLSEncode.getEncoder().writeObject(currentProperties);
        }
        ApplicationProperties decodedApplicationProperties = AMQPMessageBrokerAccessor.getDecodedApplicationProperties(aMQPLargeMessage);
        if (decodedApplicationProperties != null) {
            TLSEncode.getEncoder().writeObject(decodedApplicationProperties);
        }
    }

    private int writeHeaderAndAnnotations(DeliveryAnnotations deliveryAnnotations) {
        Header currentHeader = AMQPMessageBrokerAccessor.getCurrentHeader(this.message);
        if (currentHeader != null) {
            TLSEncode.getEncoder().writeObject(currentHeader);
        }
        if (deliveryAnnotations != null) {
            TLSEncode.getEncoder().writeObject(deliveryAnnotations);
        }
        return this.message.getPositionAfterDeliveryAnnotations();
    }

    static {
        $assertionsDisabled = !AMQPLargeMessageWriter.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    }
}
