package org.apache.activemq.artemis.protocol.amqp.proton;

import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPLargeMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Sender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:artemis-amqp-protocol-2.32.0.jar:org/apache/activemq/artemis/protocol/amqp/proton/DefaultSenderController.class */
public class DefaultSenderController implements SenderController {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final AMQPConnectionContext connection;
    private final AMQPSessionCallback sessionSPI;
    private final Sender protonSender;
    private final String clientId;
    private AMQPMessageWriter standardMessageWriter;
    private AMQPLargeMessageWriter largeMessageWriter;
    private boolean shared;
    private boolean global;
    private boolean multicast;
    private SimpleString queue;
    private SimpleString tempQueueName;
    private String selector;
    private RoutingType routingTypeToUse = RoutingType.ANYCAST;
    private boolean isVolatile;

    public DefaultSenderController(AMQPSessionContext aMQPSessionContext, Sender sender, String str) {
        this.connection = aMQPSessionContext.getAMQPConnectionContext();
        this.sessionSPI = aMQPSessionContext.getSessionSPI();
        this.protonSender = sender;
        this.clientId = str;
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.SenderController
    public Consumer init(ProtonServerSenderContext protonServerSenderContext) throws Exception {
        boolean z;
        SimpleString simpleString;
        Map.Entry<Symbol, DescribedType> findFilter;
        validateConnectionState();
        this.standardMessageWriter = new AMQPMessageWriter(protonServerSenderContext);
        this.largeMessageWriter = new AMQPLargeMessageWriter(protonServerSenderContext);
        Source source = (Source) this.protonSender.getRemoteSource();
        HashMap hashMap = new HashMap();
        this.protonSender.setSenderSettleMode(this.protonSender.getRemoteSenderSettleMode());
        this.protonSender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
        if (source != null && (findFilter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.JMS_SELECTOR_FILTER_IDS)) != null) {
            this.selector = findFilter.getValue().getDescribed().toString();
            try {
                SelectorParser.parse(this.selector);
                hashMap.put(findFilter.getKey(), findFilter.getValue());
            } catch (FilterException e) {
                throw new ActiveMQAMQPException(AmqpError.INVALID_FIELD, "Invalid filter", ActiveMQExceptionType.INVALID_FILTER_EXPRESSION);
            }
        }
        if (source == null) {
            String name = this.protonSender.getName();
            this.global = AmqpSupport.verifyDesiredCapability(this.protonSender, AmqpSupport.GLOBAL);
            this.shared = AmqpSupport.verifyDesiredCapability(this.protonSender, AmqpSupport.SHARED);
            this.queue = AmqpSupport.createQueueName(this.connection.isUseCoreSubscriptionNaming(), this.clientId, name, true, this.global, false);
            QueueQueryResult queueQuery = this.sessionSPI.queueQuery(this.queue, RoutingType.MULTICAST, false);
            this.multicast = true;
            this.routingTypeToUse = RoutingType.MULTICAST;
            if (!queueQuery.isExists()) {
                throw new ActiveMQAMQPNotFoundException("Unknown subscription link: " + this.protonSender.getName());
            }
            source = new Source();
            source.setAddress(this.queue.toString());
            source.setDurable(TerminusDurability.UNSETTLED_STATE);
            source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
            source.setDistributionMode(AmqpSupport.COPY);
            source.setCapabilities(AmqpSupport.TOPIC_CAPABILITY);
            SimpleString filterString = queueQuery.getFilterString();
            if (filterString != null) {
                this.selector = filterString.toString();
                boolean z2 = false;
                String str = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + this.protonSender.getSession().getConnection().getRemoteContainer() + "'";
                if (this.selector.endsWith(str)) {
                    if (this.selector.length() > str.length()) {
                        this.selector = this.selector.substring(0, this.selector.length() - (" AND " + str).length());
                    } else {
                        this.selector = null;
                    }
                    z2 = true;
                }
                if (z2) {
                    hashMap.put(AmqpSupport.NO_LOCAL_NAME, AmqpNoLocalFilter.NO_LOCAL);
                }
                if (this.selector != null && !this.selector.trim().isEmpty()) {
                    hashMap.put(AmqpSupport.JMS_SELECTOR_NAME, new AmqpJmsSelectorFilter(this.selector));
                }
            }
            this.protonSender.setSource(source);
        } else if (source.getDynamic()) {
            this.queue = SimpleString.toSimpleString(UUID.randomUUID().toString());
            this.tempQueueName = this.queue;
            try {
                this.sessionSPI.createTemporaryQueue(this.queue, RoutingType.ANYCAST);
                source.setAddress(this.queue.toString());
            } catch (Exception e2) {
                throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e2.getMessage());
            }
        } else {
            SimpleString simpleString2 = null;
            this.shared = AmqpSupport.verifySourceCapability(source, AmqpSupport.SHARED);
            this.global = AmqpSupport.verifySourceCapability(source, AmqpSupport.GLOBAL);
            if (CompositeAddress.isFullyQualified(source.getAddress())) {
                z = true;
                simpleString = SimpleString.toSimpleString(CompositeAddress.extractAddressName(source.getAddress()));
                simpleString2 = SimpleString.toSimpleString(CompositeAddress.extractQueueName(source.getAddress()));
            } else {
                z = false;
                simpleString = SimpleString.toSimpleString(source.getAddress());
            }
            if (AmqpSupport.verifySourceCapability(source, AmqpSupport.TOPIC_CAPABILITY) || AmqpSupport.verifySourceCapability(source, AmqpSupport.QUEUE_CAPABILITY)) {
                this.multicast = AmqpSupport.verifySourceCapability(source, AmqpSupport.TOPIC_CAPABILITY);
                try {
                    AddressQueryResult addressQuery = this.sessionSPI.addressQuery(simpleString, this.multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, true);
                    if (!addressQuery.isExists()) {
                        throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
                    }
                    Set<RoutingType> routingTypes = addressQuery.getRoutingTypes();
                    if (this.multicast && !routingTypes.contains(RoutingType.MULTICAST)) {
                        throw new ActiveMQAMQPIllegalStateException("Address " + simpleString + " is not configured for topic support");
                    }
                    if (!this.multicast && !routingTypes.contains(RoutingType.ANYCAST) && simpleString2 == null) {
                        throw new ActiveMQAMQPIllegalStateException("Address " + simpleString + " is not configured for queue support");
                    }
                } catch (ActiveMQSecurityException e3) {
                    throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e3.getMessage());
                } catch (ActiveMQAMQPException e4) {
                    throw e4;
                } catch (Exception e5) {
                    throw new ActiveMQAMQPInternalErrorException(e5.getMessage(), e5);
                }
            } else {
                RoutingType defaultRoutingType = this.sessionSPI.getDefaultRoutingType(simpleString);
                this.routingTypeToUse = defaultRoutingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : defaultRoutingType;
                try {
                    AddressQueryResult addressQuery2 = this.sessionSPI.addressQuery(simpleString, this.routingTypeToUse, true);
                    if (!addressQuery2.isExists()) {
                        throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
                    }
                    Set<RoutingType> routingTypes2 = addressQuery2.getRoutingTypes();
                    if (routingTypes2.contains(RoutingType.MULTICAST) && routingTypes2.size() == 1) {
                        this.multicast = true;
                    } else {
                        this.multicast = false;
                    }
                } catch (ActiveMQSecurityException e6) {
                    throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e6.getMessage());
                } catch (ActiveMQAMQPException e7) {
                    throw e7;
                } catch (Exception e8) {
                    throw new ActiveMQAMQPInternalErrorException(e8.getMessage(), e8);
                }
            }
            this.routingTypeToUse = this.multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST;
            if (this.multicast) {
                Map.Entry<Symbol, DescribedType> findFilter2 = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS);
                if (findFilter2 != null) {
                    String str2 = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + this.protonSender.getSession().getConnection().getRemoteContainer() + "'";
                    if (this.selector != null) {
                        this.selector += " AND " + str2;
                    } else {
                        this.selector = str2;
                    }
                    hashMap.put(findFilter2.getKey(), findFilter2.getValue());
                }
                SimpleString simpleString3 = SimpleString.toSimpleString(this.selector);
                this.queue = getMatchingQueue(simpleString2, simpleString, RoutingType.MULTICAST, simpleString3, z);
                if (this.queue != null) {
                    this.multicast = false;
                } else if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) {
                    this.queue = AmqpSupport.createQueueName(this.connection.isUseCoreSubscriptionNaming(), this.clientId, this.protonSender.getName(), this.shared, this.global, false);
                    QueueQueryResult queueQuery2 = this.sessionSPI.queueQuery(this.queue, this.routingTypeToUse, false);
                    if (queueQuery2.isExists()) {
                        if (!queueQuery2.isConfigurationManaged().booleanValue() && (!Objects.equals(queueQuery2.getAddress(), this.sessionSPI.removePrefix(simpleString)) || !Objects.equals(queueQuery2.getFilterString(), simpleString3))) {
                            if (queueQuery2.getConsumerCount() != 0) {
                                throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist");
                            }
                            this.sessionSPI.deleteQueue(this.queue);
                            if (this.shared) {
                                this.sessionSPI.createSharedDurableQueue(simpleString, RoutingType.MULTICAST, this.queue, simpleString3);
                            } else {
                                this.sessionSPI.createUnsharedDurableQueue(simpleString, RoutingType.MULTICAST, this.queue, simpleString3);
                            }
                        }
                    } else if (this.shared) {
                        this.sessionSPI.createSharedDurableQueue(simpleString, RoutingType.MULTICAST, this.queue, simpleString3);
                    } else {
                        this.sessionSPI.createUnsharedDurableQueue(simpleString, RoutingType.MULTICAST, this.queue, simpleString3);
                    }
                } else {
                    this.isVolatile = true;
                    if (!this.shared || this.protonSender.getName() == null) {
                        this.queue = SimpleString.toSimpleString(UUID.randomUUID().toString());
                        this.tempQueueName = this.queue;
                        try {
                            this.sessionSPI.createTemporaryQueue(simpleString, this.queue, RoutingType.MULTICAST, simpleString3);
                        } catch (Exception e9) {
                            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e9.getMessage());
                        }
                    } else {
                        this.queue = AmqpSupport.createQueueName(this.connection.isUseCoreSubscriptionNaming(), this.clientId, this.protonSender.getName(), this.shared, this.global, this.isVolatile);
                        QueueQueryResult queueQuery3 = this.sessionSPI.queueQuery(this.queue, this.routingTypeToUse, false);
                        if ((!queueQuery3.isExists() || !Objects.equals(queueQuery3.getAddress(), simpleString) || !Objects.equals(queueQuery3.getFilterString(), simpleString3)) && !queueQuery3.isConfigurationManaged().booleanValue()) {
                            this.sessionSPI.createSharedVolatileQueue(simpleString, RoutingType.MULTICAST, this.queue, simpleString3);
                        }
                    }
                }
            } else if (simpleString2 != null) {
                if (this.sessionSPI.queueQuery(CompositeAddress.toFullyQualified(simpleString, simpleString2), null, false, null).isExists()) {
                    this.routingTypeToUse = null;
                }
                SimpleString matchingQueue = getMatchingQueue(simpleString2, simpleString, this.routingTypeToUse, null, false);
                if (matchingQueue == null) {
                    throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
                }
                this.queue = matchingQueue;
            } else {
                SimpleString matchingQueue2 = this.sessionSPI.getMatchingQueue(simpleString, RoutingType.ANYCAST);
                if (matchingQueue2 != null) {
                    this.queue = matchingQueue2;
                } else {
                    this.queue = simpleString;
                }
            }
            if (this.queue == null) {
                throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressNotSet();
            }
            try {
                if (!this.sessionSPI.queueQuery(this.queue, this.routingTypeToUse, !this.multicast).isExists()) {
                    throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
                }
            } catch (ActiveMQAMQPNotFoundException e10) {
                throw e10;
            } catch (Exception e11) {
                throw new ActiveMQAMQPInternalErrorException(e11.getMessage(), e11);
            }
        }
        source.setFilter(hashMap.isEmpty() ? null : hashMap);
        return (Consumer) this.sessionSPI.createSender(protonServerSenderContext, this.queue, this.multicast ? null : this.selector, (this.multicast || source.getDistributionMode() == null || !source.getDistributionMode().equals(AmqpSupport.COPY)) ? false : true);
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.SenderController
    public void close() throws Exception {
        Source source = (Source) this.protonSender.getSource();
        if (source == null || source.getAddress() == null || !this.multicast) {
            if (source == null || !source.getDynamic()) {
                return;
            }
            if (source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END) {
                try {
                    this.sessionSPI.removeTemporaryQueue(SimpleString.toSimpleString(source.getAddress()));
                    return;
                } catch (Exception e) {
                    return;
                }
            }
            return;
        }
        SimpleString simpleString = SimpleString.toSimpleString(source.getAddress());
        if (this.sessionSPI.queueQuery(simpleString, this.routingTypeToUse, false).isExists() && source.getDynamic()) {
            this.sessionSPI.deleteQueue(simpleString);
            return;
        }
        if (source.getDurable() == TerminusDurability.NONE && this.tempQueueName != null && (source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {
            this.sessionSPI.removeTemporaryQueue(this.tempQueueName);
            return;
        }
        String name = this.protonSender.getName();
        if (name.contains("|")) {
            name = name.split("\\|")[0];
        }
        SimpleString createQueueName = AmqpSupport.createQueueName(this.connection.isUseCoreSubscriptionNaming(), this.clientId, name, this.shared, this.global, this.isVolatile);
        QueueQueryResult queueQuery = this.sessionSPI.queueQuery(createQueueName, this.multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false);
        if (queueQuery.isExists() && !this.isVolatile && queueQuery.getConsumerCount() == 0) {
            this.sessionSPI.deleteQueue(createQueueName);
        }
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.SenderController
    public MessageWriter selectOutgoingMessageWriter(ProtonServerSenderContext protonServerSenderContext, MessageReference messageReference) {
        return messageReference.getMessage() instanceof AMQPLargeMessage ? this.largeMessageWriter : this.standardMessageWriter;
    }

    protected SimpleString getMatchingQueue(SimpleString simpleString, SimpleString simpleString2, RoutingType routingType, SimpleString simpleString3, boolean z) throws Exception {
        if (simpleString == null) {
            return null;
        }
        QueueQueryResult queueQuery = this.sessionSPI.queueQuery(CompositeAddress.toFullyQualified(simpleString2, simpleString), routingType, true, simpleString3);
        if (!queueQuery.isExists()) {
            throw new ActiveMQAMQPNotFoundException("Queue: '" + simpleString + "' does not exist");
        }
        if (!queueQuery.getAddress().equals(simpleString2)) {
            throw new ActiveMQAMQPNotFoundException("Queue: '" + simpleString + "' does not exist for address '" + simpleString2 + "'");
        }
        if (!z || simpleString3 == null || queueQuery.getFilterString() == null || simpleString3.equals(queueQuery.getFilterString())) {
            return this.sessionSPI.getMatchingQueue(simpleString2, simpleString, routingType);
        }
        throw new ActiveMQIllegalStateException("Queue: " + simpleString + " filter mismatch [" + simpleString3 + "] is different than existing filter [" + queueQuery.getFilterString() + "]");
    }

    private void validateConnectionState() throws ActiveMQException {
        ProtonHandler handler = this.connection == null ? null : this.connection.getHandler();
        Connection connection = handler == null ? null : handler.getConnection();
        if (connection == null) {
            if (logger.isDebugEnabled()) {
                logger.debug("validateConnectionState:: connection={}, handler={}, qpidConnection={}", this.connection, handler, connection);
            }
            ActiveMQAMQPProtocolLogger.LOGGER.invalidAMQPConnectionState("null", "null");
            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.invalidAMQPConnectionState("null");
        }
        if (connection.getRemoteState() == EndpointState.CLOSED) {
            ActiveMQAMQPProtocolLogger.LOGGER.invalidAMQPConnectionState(connection.getRemoteState(), this.connection.getRemoteAddress());
            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.invalidAMQPConnectionState(connection.getRemoteState());
        }
    }
}
