package org.apache.activemq.artemis.protocol.amqp.proton;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import java.lang.invoke.MethodHandles;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.message.LargeBodyReader;
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Sender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.class */
public class AMQPTunneledCoreLargeMessageWriter implements MessageWriter {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final byte DATA_DESCRIPTOR = 117;
    private static final int DATA_SECTION_ENCODING_BYTES = 8;
    private final ProtonServerSenderContext serverSender;
    private final AMQPConnectionContext connection;
    private final Sender protonSender;
    private DeliveryAnnotations annotations;
    private MessageReference reference;
    private LargeServerMessageImpl message;
    private Delivery delivery;
    private int frameSize;
    private ByteBuf encodingBuffer;
    private long position;
    private int dataSectionRemaining;
    private volatile State state = State.CLOSED;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter$State.class */
    public enum State {
        STREAMING_DELIVERY_ANNOTATIONS,
        STREAMING_CORE_HEADERS,
        STREAMING_BODY,
        DONE,
        CLOSED
    }

    public AMQPTunneledCoreLargeMessageWriter(ProtonServerSenderContext protonServerSenderContext) {
        this.serverSender = protonServerSenderContext;
        this.connection = protonServerSenderContext.getSessionContext().getAMQPConnectionContext();
        this.protonSender = protonServerSenderContext.getSender();
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.MessageWriter
    public boolean isWriting() {
        return this.state != State.CLOSED;
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.MessageWriter
    public void close() {
        if (this.state != State.CLOSED) {
            try {
                if (this.message != null) {
                    this.message.usageDown();
                }
            } finally {
                reset(State.CLOSED);
            }
        }
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.MessageWriter
    public AMQPTunneledCoreLargeMessageWriter open() {
        if (this.state != State.CLOSED) {
            throw new IllegalStateException("Trying to open an AMQP Large Message writer that was not closed");
        }
        reset(State.STREAMING_DELIVERY_ANNOTATIONS);
        return this;
    }

    private void reset(State state) {
        this.message = null;
        this.reference = null;
        this.delivery = null;
        this.position = 0L;
        this.dataSectionRemaining = 0;
        this.state = state;
        this.encodingBuffer = null;
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.MessageWriter
    public void writeBytes(MessageReference messageReference) {
        if (this.protonSender.getLocalState() == EndpointState.CLOSED) {
            logger.debug("Not delivering message {} as the sender is closed and credits were available, if you see too many of these it means clients are issuing credits and closing the connection with pending credits a lot of times", messageReference);
            return;
        }
        if (this.state == State.CLOSED) {
            throw new IllegalStateException("Cannot write to an AMQP Large Message Writer that has been closed");
        }
        if (this.state == State.DONE) {
            throw new IllegalStateException("Cannot write to an AMQP Large Message Writer that was already used to write a message and was not reset");
        }
        this.reference = messageReference;
        this.message = (LargeServerMessageImpl) messageReference.getMessage();
        this.annotations = (DeliveryAnnotations) this.reference.getProtocolData(DeliveryAnnotations.class);
        this.delivery = this.serverSender.createDelivery(messageReference, AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_LARGE_MESSAGE_FORMAT);
        this.frameSize = (this.protonSender.getSession().getConnection().getTransport().getOutboundFrameSizeLimit() - 50) - (this.delivery.getTag() != null ? this.delivery.getTag().length : 0);
        this.message.usageUp();
        tryDelivering();
    }

    private void resume() {
        this.connection.runNow(this::tryDelivering);
    }

    private ByteBuf getOrCreateDeliveryAnnotationsBuffer() {
        if (this.encodingBuffer == null) {
            this.encodingBuffer = Unpooled.buffer();
            EncoderImpl encoder = TLSEncode.getEncoder();
            try {
                encoder.setByteBuffer(new NettyWritable(this.encodingBuffer));
                encoder.writeObject(this.annotations);
            } finally {
                encoder.setByteBuffer((WritableBuffer) null);
            }
        }
        return this.encodingBuffer;
    }

    private ByteBuf getOrCreateMessageHeaderBuffer() {
        if (this.encodingBuffer == null) {
            int headersAndPropertiesEncodeSize = this.message.getHeadersAndPropertiesEncodeSize();
            int i = headersAndPropertiesEncodeSize + 8;
            this.encodingBuffer = Unpooled.buffer(i, i);
            writeDataSectionTypeInfo(this.encodingBuffer, headersAndPropertiesEncodeSize);
            this.message.encodeHeadersAndProperties(this.encodingBuffer);
        }
        return this.encodingBuffer;
    }

    private boolean trySendDeliveryAnnotations(ByteBuf byteBuf, NettyReadable nettyReadable) {
        while (this.protonSender.getLocalState() != EndpointState.CLOSED && this.state == State.STREAMING_DELIVERY_ANNOTATIONS) {
            if (this.annotations == null || this.annotations.getValue() == null || this.annotations.getValue().isEmpty()) {
                this.state = State.STREAMING_CORE_HEADERS;
            } else {
                if (!this.connection.flowControl(this::resume)) {
                    break;
                }
                ByteBuf orCreateDeliveryAnnotationsBuffer = getOrCreateDeliveryAnnotationsBuffer();
                int min = (int) Math.min(byteBuf.writableBytes(), orCreateDeliveryAnnotationsBuffer.readableBytes() - this.position);
                this.position += min;
                orCreateDeliveryAnnotationsBuffer.readBytes(byteBuf, min);
                if (!byteBuf.isWritable()) {
                    this.protonSender.send(nettyReadable);
                    byteBuf.clear();
                    this.connection.instantFlush();
                }
                if (!orCreateDeliveryAnnotationsBuffer.isReadable()) {
                    this.encodingBuffer = null;
                    this.position = 0L;
                    this.state = State.STREAMING_CORE_HEADERS;
                }
            }
        }
        return this.state == State.STREAMING_CORE_HEADERS;
    }

    private boolean trySendHeadersAndProperties(ByteBuf byteBuf, NettyReadable nettyReadable) {
        while (this.protonSender.getLocalState() != EndpointState.CLOSED && this.state == State.STREAMING_CORE_HEADERS && this.connection.flowControl(this::resume)) {
            ByteBuf orCreateMessageHeaderBuffer = getOrCreateMessageHeaderBuffer();
            int min = (int) Math.min(byteBuf.writableBytes(), orCreateMessageHeaderBuffer.readableBytes() - this.position);
            this.position += min;
            orCreateMessageHeaderBuffer.readBytes(byteBuf, min);
            if (!byteBuf.isWritable()) {
                this.protonSender.send(nettyReadable);
                byteBuf.clear();
                this.connection.instantFlush();
            }
            if (!orCreateMessageHeaderBuffer.isReadable()) {
                this.encodingBuffer = null;
                this.position = 0L;
                this.state = State.STREAMING_BODY;
            }
        }
        return this.state == State.STREAMING_BODY;
    }

    private boolean tryDeliveryMessageBody(ByteBuf byteBuf, NettyReadable nettyReadable) throws ActiveMQException {
        LargeBodyReader largeBodyReader = this.message.getLargeBodyReader();
        try {
            largeBodyReader.open();
            largeBodyReader.position(this.position);
            long size = largeBodyReader.getSize();
            while (this.protonSender.getLocalState() != EndpointState.CLOSED && this.state == State.STREAMING_BODY && this.connection.flowControl(this::resume)) {
                if (this.dataSectionRemaining == 0) {
                    this.dataSectionRemaining = (int) Math.min(2147483647L, size - this.position);
                    if (byteBuf.writableBytes() < 8) {
                        this.protonSender.send(nettyReadable);
                        byteBuf.clear();
                    }
                    writeDataSectionTypeInfo(byteBuf, this.dataSectionRemaining);
                }
                int readInto = largeBodyReader.readInto(byteBuf.internalNioBuffer(byteBuf.writerIndex(), byteBuf.writableBytes()));
                byteBuf.writerIndex(byteBuf.writerIndex() + readInto);
                this.position += readInto;
                this.dataSectionRemaining -= readInto;
                if (!byteBuf.isWritable() || this.position == size) {
                    this.protonSender.send(nettyReadable);
                    byteBuf.clear();
                    if (this.position < size) {
                        this.connection.instantFlush();
                    } else {
                        this.state = State.DONE;
                    }
                }
            }
            boolean z = this.state == State.DONE;
            if (largeBodyReader != null) {
                largeBodyReader.close();
            }
            return z;
        } catch (Throwable th) {
            if (largeBodyReader != null) {
                try {
                    largeBodyReader.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:4:0x002c. Please report as an issue. */
    private void tryDelivering() {
        ByteBuf directBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(this.frameSize, this.frameSize);
        try {
            NettyReadable nettyReadable = new NettyReadable(directBuffer);
            directBuffer.ensureWritable(this.frameSize);
            switch (this.state) {
                case STREAMING_DELIVERY_ANNOTATIONS:
                    if (!trySendDeliveryAnnotations(directBuffer, nettyReadable)) {
                        return;
                    }
                case STREAMING_CORE_HEADERS:
                    if (!trySendHeadersAndProperties(directBuffer, nettyReadable)) {
                        return;
                    }
                case STREAMING_BODY:
                    if (tryDeliveryMessageBody(directBuffer, nettyReadable)) {
                        this.serverSender.reportDeliveryComplete(this, this.reference, this.delivery, true);
                        return;
                    }
                    return;
                default:
                    throw new IllegalStateException("The writer already wrote a message and was not reset");
            }
        } catch (Exception e) {
            this.serverSender.reportDeliveryError(this, this.reference, e);
        } finally {
            directBuffer.release();
        }
    }

    private void writeDataSectionTypeInfo(ByteBuf byteBuf, int i) {
        byteBuf.writeByte(0);
        byteBuf.writeByte(83);
        byteBuf.writeByte(117);
        byteBuf.writeByte(-80);
        byteBuf.writeInt(i);
    }
}
