package org.apache.activemq.artemis.protocol.amqp.connect;

import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederatedBrokerConnectionElement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationAddressPolicyElement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationQueuePolicyElement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.remoting.CertificateUtil;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.BrokerConnection;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationPolicySupport;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationSource;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerAggregation;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPLargeMessageWriter;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPMessageWriter;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreLargeMessageWriter;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreMessageWriter;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.MessageWriter;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.activemq.artemis.protocol.amqp.proton.SenderController;
import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL;
import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory;
import org.apache.activemq.artemis.protocol.amqp.sasl.scram.SCRAMClientSASL;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.security.scram.SCRAM;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:artemis-amqp-protocol-2.32.0.jar:org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.class */
public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, ActiveMQServerQueuePlugin, BrokerConnection {
    public static final boolean DEFAULT_CORE_MESSAGE_TUNNELING_ENABLED = true;
    private final AMQPBrokerConnectConfiguration brokerConnectConfiguration;
    private final ProtonProtocolManager protonProtocolManager;
    private final ActiveMQServer server;
    private final NettyConnector bridgesConnector;
    private NettyConnection connection;
    private Session session;
    private AMQPSessionContext sessionContext;
    private ActiveMQProtonRemotingConnection protonRemotingConnection;
    private final AMQPBrokerConnectionManager bridgeManager;
    private AMQPMirrorControllerSource mirrorControllerSource;
    private AMQPFederationSource brokerFederation;
    private int lastRetryCounter;
    private volatile ScheduledFuture<?> reconnectFuture;
    final Executor connectExecutor;
    final ScheduledExecutorService scheduledExecutorService;
    String host;
    int port;
    private static final String EXTERNAL = "EXTERNAL";
    private static final String PLAIN = "PLAIN";
    private static final String ANONYMOUS = "ANONYMOUS";
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final byte[] EMPTY = new byte[0];
    private volatile boolean started = false;
    private int retryCounter = 0;
    private boolean connecting = false;
    private final Set<Queue> senders = new HashSet();
    private final Set<Queue> receivers = new HashSet();
    private final Map<String, Predicate<Link>> linkClosedInterceptors = new ConcurrentHashMap();

    /* loaded from: input_file:artemis-amqp-protocol-2.32.0.jar:org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection$AMQPOutgoingController.class */
    private class AMQPOutgoingController implements SenderController {
        final Queue queue;
        final Sender sender;
        final AMQPSessionCallback sessionSPI;
        protected boolean tunnelCoreMessages;
        protected AMQPMessageWriter standardMessageWriter;
        protected AMQPLargeMessageWriter largeMessageWriter;
        protected AMQPTunneledCoreMessageWriter coreMessageWriter;
        protected AMQPTunneledCoreLargeMessageWriter coreLargeMessageWriter;

        AMQPOutgoingController(Queue queue, Sender sender, AMQPSessionCallback aMQPSessionCallback) {
            this.queue = queue;
            this.sessionSPI = aMQPSessionCallback;
            this.sender = sender;
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.proton.SenderController
        public Consumer init(ProtonServerSenderContext protonServerSenderContext) throws Exception {
            SimpleString name = this.queue.getName();
            this.tunnelCoreMessages = AmqpSupport.verifyCapabilities(this.sender.getDesiredCapabilities(), AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT) && AmqpSupport.verifyOfferedCapabilities(this.sender, AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT);
            return (Consumer) this.sessionSPI.createSender(protonServerSenderContext, name, null, false);
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.proton.SenderController
        public void close() throws Exception {
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.proton.SenderController
        public MessageWriter selectOutgoingMessageWriter(ProtonServerSenderContext protonServerSenderContext, MessageReference messageReference) {
            MessageWriter messageWriter;
            MessageWriter messageWriter2;
            MessageWriter messageWriter3;
            MessageWriter messageWriter4;
            MessageWriter messageWriter5;
            MessageWriter messageWriter6;
            Message message = messageReference.getMessage();
            if (message instanceof AMQPMessage) {
                if (message.isLargeMessage()) {
                    if (this.largeMessageWriter != null) {
                        messageWriter6 = this.largeMessageWriter;
                    } else {
                        AMQPLargeMessageWriter aMQPLargeMessageWriter = new AMQPLargeMessageWriter(protonServerSenderContext);
                        messageWriter6 = aMQPLargeMessageWriter;
                        this.largeMessageWriter = aMQPLargeMessageWriter;
                    }
                    messageWriter2 = messageWriter6;
                } else {
                    if (this.standardMessageWriter != null) {
                        messageWriter5 = this.standardMessageWriter;
                    } else {
                        AMQPMessageWriter aMQPMessageWriter = new AMQPMessageWriter(protonServerSenderContext);
                        messageWriter5 = aMQPMessageWriter;
                        this.standardMessageWriter = aMQPMessageWriter;
                    }
                    messageWriter2 = messageWriter5;
                }
            } else if (!this.tunnelCoreMessages) {
                if (this.standardMessageWriter != null) {
                    messageWriter = this.standardMessageWriter;
                } else {
                    AMQPMessageWriter aMQPMessageWriter2 = new AMQPMessageWriter(protonServerSenderContext);
                    messageWriter = aMQPMessageWriter2;
                    this.standardMessageWriter = aMQPMessageWriter2;
                }
                messageWriter2 = messageWriter;
            } else if (message.isLargeMessage()) {
                if (this.coreLargeMessageWriter != null) {
                    messageWriter4 = this.coreLargeMessageWriter;
                } else {
                    AMQPTunneledCoreLargeMessageWriter aMQPTunneledCoreLargeMessageWriter = new AMQPTunneledCoreLargeMessageWriter(protonServerSenderContext);
                    messageWriter4 = aMQPTunneledCoreLargeMessageWriter;
                    this.coreLargeMessageWriter = aMQPTunneledCoreLargeMessageWriter;
                }
                messageWriter2 = messageWriter4;
            } else {
                if (this.coreMessageWriter != null) {
                    messageWriter3 = this.coreMessageWriter;
                } else {
                    AMQPTunneledCoreMessageWriter aMQPTunneledCoreMessageWriter = new AMQPTunneledCoreMessageWriter(protonServerSenderContext);
                    messageWriter3 = aMQPTunneledCoreMessageWriter;
                    this.coreMessageWriter = aMQPTunneledCoreMessageWriter;
                }
                messageWriter2 = messageWriter3;
            }
            return messageWriter2;
        }
    }

    /* loaded from: input_file:artemis-amqp-protocol-2.32.0.jar:org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection$AnonymousSASLMechanism.class */
    private static class AnonymousSASLMechanism implements ClientSASL {
        private AnonymousSASLMechanism() {
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL
        public String getName() {
            return "ANONYMOUS";
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL
        public byte[] getInitialResponse() {
            return AMQPBrokerConnection.EMPTY;
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL
        public byte[] getResponse(byte[] bArr) {
            return AMQPBrokerConnection.EMPTY;
        }
    }

    /* loaded from: input_file:artemis-amqp-protocol-2.32.0.jar:org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection$ExternalSASLMechanism.class */
    private static class ExternalSASLMechanism implements ClientSASL {
        private ExternalSASLMechanism() {
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL
        public String getName() {
            return "EXTERNAL";
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL
        public byte[] getInitialResponse() {
            return AMQPBrokerConnection.EMPTY;
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL
        public byte[] getResponse(byte[] bArr) {
            return AMQPBrokerConnection.EMPTY;
        }

        public static boolean isApplicable(NettyConnection nettyConnection) {
            return CertificateUtil.getLocalPrincipalFromConnection(nettyConnection) != null;
        }
    }

    /* loaded from: input_file:artemis-amqp-protocol-2.32.0.jar:org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection$PlainSASLMechanism.class */
    private static class PlainSASLMechanism implements ClientSASL {
        private final byte[] initialResponse;

        PlainSASLMechanism(String str, String str2) {
            byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
            byte[] bytes2 = str2.getBytes(StandardCharsets.UTF_8);
            byte[] bArr = new byte[bytes.length + bytes2.length + 2];
            System.arraycopy(bytes, 0, bArr, 1, bytes.length);
            System.arraycopy(bytes2, 0, bArr, bytes.length + 2, bytes2.length);
            this.initialResponse = bArr;
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL
        public String getName() {
            return "PLAIN";
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL
        public byte[] getInitialResponse() {
            return this.initialResponse;
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL
        public byte[] getResponse(byte[] bArr) {
            return AMQPBrokerConnection.EMPTY;
        }

        public static boolean isApplicable(String str, String str2) {
            return str != null && str.length() > 0 && str2 != null && str2.length() > 0;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:artemis-amqp-protocol-2.32.0.jar:org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection$SaslFactory.class */
    public static final class SaslFactory implements ClientSASLFactory {
        private final NettyConnection connection;
        private final AMQPBrokerConnectConfiguration brokerConnectConfiguration;

        SaslFactory(NettyConnection nettyConnection, AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration) {
            this.connection = nettyConnection;
            this.brokerConnectConfiguration = aMQPBrokerConnectConfiguration;
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory
        public ClientSASL chooseMechanism(String[] strArr) {
            List emptyList = strArr == null ? Collections.emptyList() : Arrays.asList(strArr);
            if (emptyList.contains("EXTERNAL") && ExternalSASLMechanism.isApplicable(this.connection)) {
                return new ExternalSASLMechanism();
            }
            if (SCRAMClientSASL.isApplicable(this.brokerConnectConfiguration.getUser(), this.brokerConnectConfiguration.getPassword())) {
                for (SCRAM scram : SCRAM.values()) {
                    if (emptyList.contains(scram.getName())) {
                        return new SCRAMClientSASL(scram, this.brokerConnectConfiguration.getUser(), this.brokerConnectConfiguration.getPassword());
                    }
                }
            }
            if (emptyList.contains("PLAIN") && PlainSASLMechanism.isApplicable(this.brokerConnectConfiguration.getUser(), this.brokerConnectConfiguration.getPassword())) {
                return new PlainSASLMechanism(this.brokerConnectConfiguration.getUser(), this.brokerConnectConfiguration.getPassword());
            }
            if (emptyList.contains("ANONYMOUS")) {
                return new AnonymousSASLMechanism();
            }
            return null;
        }
    }

    public AMQPBrokerConnection(AMQPBrokerConnectionManager aMQPBrokerConnectionManager, AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration, ProtonProtocolManager protonProtocolManager, ActiveMQServer activeMQServer, NettyConnector nettyConnector) {
        this.bridgeManager = aMQPBrokerConnectionManager;
        this.brokerConnectConfiguration = aMQPBrokerConnectConfiguration;
        this.protonProtocolManager = protonProtocolManager;
        this.server = activeMQServer;
        this.bridgesConnector = nettyConnector;
        this.connectExecutor = activeMQServer.getExecutorFactory().getExecutor();
        this.scheduledExecutorService = activeMQServer.getScheduledPool();
    }

    @Override // org.apache.activemq.artemis.core.server.BrokerConnection
    public String getName() {
        return this.brokerConnectConfiguration.getName();
    }

    @Override // org.apache.activemq.artemis.core.server.BrokerConnection
    public String getProtocol() {
        return ProtonProtocolManagerFactory.AMQP_PROTOCOL_NAME;
    }

    @Override // org.apache.activemq.artemis.core.server.BrokerConnection
    public AMQPBrokerConnectConfiguration getConfiguration() {
        return this.brokerConnectConfiguration;
    }

    @Override // org.apache.activemq.artemis.core.server.ActiveMQComponent
    public boolean isStarted() {
        return this.started;
    }

    public boolean isConnecting() {
        return this.connecting;
    }

    public int getConnectionTimeout() {
        return this.bridgesConnector.getConnectTimeoutMillis();
    }

    @Override // org.apache.activemq.artemis.core.server.ActiveMQComponent
    public void stop() {
        if (this.started) {
            this.started = false;
            if (this.protonRemotingConnection != null) {
                this.protonRemotingConnection.fail(new ActiveMQException("Stopping Broker Connection"));
                this.protonRemotingConnection = null;
                this.connection = null;
            }
            ScheduledFuture<?> scheduledFuture = this.reconnectFuture;
            this.reconnectFuture = null;
            if (scheduledFuture != null) {
                scheduledFuture.cancel(true);
            }
            if (this.brokerFederation != null) {
                try {
                    this.brokerFederation.stop();
                } catch (ActiveMQException e) {
                }
            }
        }
    }

    @Override // org.apache.activemq.artemis.core.server.ActiveMQComponent
    public void start() throws Exception {
        if (this.started) {
            return;
        }
        this.started = true;
        this.server.getConfiguration().registerBrokerPlugin(this);
        try {
            if (this.brokerConnectConfiguration != null && this.brokerConnectConfiguration.getConnectionElements() != null) {
                for (AMQPBrokerConnectionElement aMQPBrokerConnectionElement : this.brokerConnectConfiguration.getConnectionElements()) {
                    AMQPBrokerConnectionAddressType type = aMQPBrokerConnectionElement.getType();
                    if (type == AMQPBrokerConnectionAddressType.MIRROR) {
                        installMirrorController((AMQPMirrorBrokerConnectionElement) aMQPBrokerConnectionElement, this.server);
                    } else if (type == AMQPBrokerConnectionAddressType.FEDERATION) {
                        installFederation((AMQPFederatedBrokerConnectionElement) aMQPBrokerConnectionElement, this.server);
                    }
                }
            }
            this.connectExecutor.execute(() -> {
                doConnect();
            });
        } catch (Throwable th) {
            logger.warn(th.getMessage(), th);
        }
    }

    public ActiveMQServer getServer() {
        return this.server;
    }

    public NettyConnection getConnection() {
        return this.connection;
    }

    @Override // org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin
    public void afterCreateQueue(Queue queue) {
        this.connectExecutor.execute(() -> {
            Iterator<AMQPBrokerConnectionElement> it = this.brokerConnectConfiguration.getConnectionElements().iterator();
            while (it.hasNext()) {
                validateMatching(queue, it.next());
            }
        });
    }

    public void validateMatching(Queue queue, AMQPBrokerConnectionElement aMQPBrokerConnectionElement) {
        if (aMQPBrokerConnectionElement.getType() == AMQPBrokerConnectionAddressType.MIRROR || aMQPBrokerConnectionElement.getType() == AMQPBrokerConnectionAddressType.FEDERATION) {
            return;
        }
        if (aMQPBrokerConnectionElement.getQueueName() != null) {
            if (queue.getName().equals(aMQPBrokerConnectionElement.getQueueName())) {
                createLink(queue, aMQPBrokerConnectionElement);
            }
        } else if (aMQPBrokerConnectionElement.match(queue.getAddress(), this.server.getConfiguration().getWildcardConfiguration())) {
            createLink(queue, aMQPBrokerConnectionElement);
        }
    }

    public void createLink(Queue queue, AMQPBrokerConnectionElement aMQPBrokerConnectionElement) {
        if (aMQPBrokerConnectionElement.getType() == AMQPBrokerConnectionAddressType.PEER) {
            Symbol[] symbolArr = {AMQPMirrorControllerSource.QPID_DISPATCH_WAYPOINT_CAPABILITY};
            connectSender(queue, queue.getAddress().toString(), null, null, null, null, symbolArr, null);
            connectReceiver(this.protonRemotingConnection, this.session, this.sessionContext, queue, symbolArr);
        } else {
            if (aMQPBrokerConnectionElement.getType() == AMQPBrokerConnectionAddressType.SENDER) {
                connectSender(queue, queue.getAddress().toString(), null, null, null, null, null, null);
            }
            if (aMQPBrokerConnectionElement.getType() == AMQPBrokerConnectionAddressType.RECEIVER) {
                connectReceiver(this.protonRemotingConnection, this.session, this.sessionContext, queue, new Symbol[0]);
            }
        }
    }

    SimpleString getMirrorSNF(AMQPMirrorBrokerConnectionElement aMQPMirrorBrokerConnectionElement) {
        SimpleString mirrorSNF = aMQPMirrorBrokerConnectionElement.getMirrorSNF();
        if (mirrorSNF == null) {
            mirrorSNF = SimpleString.toSimpleString(ProtonProtocolManager.getMirrorAddress(this.brokerConnectConfiguration.getName()));
            aMQPMirrorBrokerConnectionElement.setMirrorSNF(mirrorSNF);
        }
        return mirrorSNF;
    }

    public AMQPBrokerConnection addLinkClosedInterceptor(String str, Predicate<Link> predicate) {
        this.linkClosedInterceptors.put(str, predicate);
        return this;
    }

    public AMQPBrokerConnection removeLinkClosedInterceptor(String str) {
        this.linkClosedInterceptors.remove(str);
        return this;
    }

    private void linkClosed(Link link) {
        for (Map.Entry<String, Predicate<Link>> entry : this.linkClosedInterceptors.entrySet()) {
            if (entry.getValue().test(link)) {
                logger.trace("Remote link[{}] close intercepted and handled by interceptor: {}", link.getName(), entry.getKey());
                return;
            }
        }
        if (link.getLocalState() == EndpointState.ACTIVE) {
            error(ActiveMQAMQPProtocolMessageBundle.BUNDLE.brokerConnectionRemoteLinkClosed(), this.lastRetryCounter);
        }
    }

    private void doConnect() {
        try {
            this.connecting = true;
            List<TransportConfiguration> transportConfigurations = this.brokerConnectConfiguration.getTransportConfigurations();
            TransportConfiguration transportConfiguration = transportConfigurations.get(this.retryCounter % transportConfigurations.size());
            String stringProperty = ConfigurationHelper.getStringProperty("host", "localhost", transportConfiguration.getParams());
            int intProperty = ConfigurationHelper.getIntProperty("port", TransportConstants.DEFAULT_PORT, transportConfiguration.getParams());
            this.host = stringProperty;
            this.port = intProperty;
            this.connection = this.bridgesConnector.createConnection(null, stringProperty, intProperty);
            if (this.connection == null) {
                retryConnection();
                return;
            }
            this.lastRetryCounter = this.retryCounter;
            this.retryCounter = 0;
            this.reconnectFuture = null;
            this.senders.clear();
            this.receivers.clear();
            ConnectionEntry createOutgoingConnectionEntry = this.protonProtocolManager.createOutgoingConnectionEntry(this.connection, new SaslFactory(this.connection, this.brokerConnectConfiguration));
            this.server.getRemotingService().addConnectionEntry(this.connection, createOutgoingConnectionEntry);
            this.protonRemotingConnection = (ActiveMQProtonRemotingConnection) createOutgoingConnectionEntry.connection;
            this.protonRemotingConnection.getAmqpConnection().addLinkRemoteCloseListener(getName(), this::linkClosed);
            this.connection.getChannel().pipeline().addLast(new AMQPBrokerConnectionChannelHandler(this.bridgesConnector.getChannelGroup(), this.protonRemotingConnection.getAmqpConnection().getHandler(), this, this.server.getExecutorFactory().getExecutor()));
            this.session = this.protonRemotingConnection.getAmqpConnection().getHandler().getConnection().session();
            this.sessionContext = this.protonRemotingConnection.getAmqpConnection().getSessionExtension(this.session);
            this.protonRemotingConnection.getAmqpConnection().runLater(() -> {
                this.protonRemotingConnection.getAmqpConnection().open();
                this.session.open();
                this.protonRemotingConnection.getAmqpConnection().flush();
            });
            if (this.brokerConnectConfiguration.getConnectionElements() != null) {
                this.server.getPostOffice().getAllBindings().forEach(binding -> {
                    if (binding instanceof QueueBinding) {
                        Queue queue = ((QueueBinding) binding).getQueue();
                        Iterator<AMQPBrokerConnectionElement> it = this.brokerConnectConfiguration.getConnectionElements().iterator();
                        while (it.hasNext()) {
                            validateMatching(queue, it.next());
                        }
                    }
                });
                for (AMQPBrokerConnectionElement aMQPBrokerConnectionElement : this.brokerConnectConfiguration.getConnectionElements()) {
                    if (aMQPBrokerConnectionElement.getType() == AMQPBrokerConnectionAddressType.MIRROR) {
                        AMQPMirrorBrokerConnectionElement aMQPMirrorBrokerConnectionElement = (AMQPMirrorBrokerConnectionElement) aMQPBrokerConnectionElement;
                        Queue locateQueue = this.server.locateQueue(getMirrorSNF(aMQPMirrorBrokerConnectionElement));
                        Symbol[] symbolArr = isCoreMessageTunnelingEnabled(aMQPMirrorBrokerConnectionElement) ? new Symbol[]{AMQPMirrorControllerSource.MIRROR_CAPABILITY, AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT} : new Symbol[]{AMQPMirrorControllerSource.MIRROR_CAPABILITY};
                        Symbol[] symbolArr2 = {AMQPMirrorControllerSource.MIRROR_CAPABILITY};
                        String simpleString = locateQueue.getName().toString();
                        AMQPMirrorControllerSource aMQPMirrorControllerSource = this.mirrorControllerSource;
                        Objects.requireNonNull(aMQPMirrorControllerSource);
                        connectSender(locateQueue, simpleString, (v1) -> {
                            r3.setLink(v1);
                        }, messageReference -> {
                            AMQPMirrorControllerSource.validateProtocolData(this.protonProtocolManager.getReferenceIDSupplier(), messageReference, getMirrorSNF(aMQPMirrorBrokerConnectionElement));
                        }, this.server.getNodeID().toString(), symbolArr, null, symbolArr2);
                    } else if (aMQPBrokerConnectionElement.getType() == AMQPBrokerConnectionAddressType.FEDERATION) {
                        this.brokerFederation.handleConnectionRestored(this.protonRemotingConnection.getAmqpConnection(), this.sessionContext);
                    }
                }
            }
            this.protonRemotingConnection.getAmqpConnection().flush();
            this.bridgeManager.connected(this.connection, this);
            ActiveMQAMQPProtocolLogger.LOGGER.successReconnect(this.brokerConnectConfiguration.getName(), this.host + ":" + this.port, this.lastRetryCounter);
            this.connecting = false;
        } catch (Throwable th) {
            error(th);
        }
    }

    public void retryConnection() {
        this.lastRetryCounter = this.retryCounter;
        if (this.bridgeManager.isStarted() && this.started) {
            if (this.brokerConnectConfiguration.getReconnectAttempts() < 0 || this.retryCounter < this.brokerConnectConfiguration.getReconnectAttempts()) {
                this.retryCounter++;
                ActiveMQAMQPProtocolLogger.LOGGER.retryConnection(this.brokerConnectConfiguration.getName(), this.host + ":" + this.port, this.retryCounter, this.brokerConnectConfiguration.getReconnectAttempts());
                if (logger.isDebugEnabled()) {
                    logger.debug("Reconnecting in {}, this is the {} of {}", Integer.valueOf(this.brokerConnectConfiguration.getRetryInterval()), Integer.valueOf(this.retryCounter), Integer.valueOf(this.brokerConnectConfiguration.getReconnectAttempts()));
                }
                this.reconnectFuture = this.scheduledExecutorService.schedule(() -> {
                    this.connectExecutor.execute(() -> {
                        doConnect();
                    });
                }, this.brokerConnectConfiguration.getRetryInterval(), TimeUnit.MILLISECONDS);
                return;
            }
            this.retryCounter = 0;
            this.started = false;
            this.connecting = false;
            ActiveMQAMQPProtocolLogger.LOGGER.retryConnectionFailed(this.brokerConnectConfiguration.getName(), this.host + ":" + this.port, this.lastRetryCounter);
            if (logger.isDebugEnabled()) {
                logger.debug("no more reconnections as the retry counter reached {} out of {}", Integer.valueOf(this.retryCounter), Integer.valueOf(this.brokerConnectConfiguration.getReconnectAttempts()));
            }
        }
    }

    private static void uninstallMirrorController(AMQPMirrorBrokerConnectionElement aMQPMirrorBrokerConnectionElement, ActiveMQServer activeMQServer) {
    }

    private Queue installMirrorController(AMQPMirrorBrokerConnectionElement aMQPMirrorBrokerConnectionElement, ActiveMQServer activeMQServer) throws Exception {
        MirrorController mirrorController = activeMQServer.getMirrorController();
        if (mirrorController != null && (mirrorController instanceof AMQPMirrorControllerSource)) {
            Queue checkCurrentMirror = checkCurrentMirror(this, (AMQPMirrorControllerSource) mirrorController);
            if (checkCurrentMirror != null) {
                return checkCurrentMirror;
            }
        } else if (mirrorController != null && (mirrorController instanceof AMQPMirrorControllerAggregation)) {
            Iterator<AMQPMirrorControllerSource> it = ((AMQPMirrorControllerAggregation) mirrorController).getPartitions().iterator();
            while (it.hasNext()) {
                Queue checkCurrentMirror2 = checkCurrentMirror(this, it.next());
                if (checkCurrentMirror2 != null) {
                    return checkCurrentMirror2;
                }
            }
        }
        AddressInfo addressInfo = activeMQServer.getAddressInfo(getMirrorSNF(aMQPMirrorBrokerConnectionElement));
        if (addressInfo == null) {
            addressInfo = new AddressInfo(getMirrorSNF(aMQPMirrorBrokerConnectionElement)).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false).setTemporary(!aMQPMirrorBrokerConnectionElement.isDurable()).setInternal(true);
            activeMQServer.addAddressInfo(addressInfo);
        }
        if (addressInfo.getRoutingType() != RoutingType.ANYCAST) {
            throw new IllegalArgumentException(addressInfo.getName() + " has " + addressInfo.getRoutingType() + " instead of ANYCAST");
        }
        Queue locateQueue = activeMQServer.locateQueue(getMirrorSNF(aMQPMirrorBrokerConnectionElement));
        if (locateQueue == null) {
            locateQueue = activeMQServer.createQueue(new QueueConfiguration(getMirrorSNF(aMQPMirrorBrokerConnectionElement)).setAddress(getMirrorSNF(aMQPMirrorBrokerConnectionElement)).setRoutingType(RoutingType.ANYCAST).setDurable(Boolean.valueOf(aMQPMirrorBrokerConnectionElement.isDurable())).setInternal(true), true);
        }
        activeMQServer.registerQueueOnManagement(locateQueue, true);
        logger.debug("Mirror queue {}", locateQueue.getName());
        locateQueue.setMirrorController(true);
        QueueBinding queueBinding = (QueueBinding) activeMQServer.getPostOffice().getBinding(getMirrorSNF(aMQPMirrorBrokerConnectionElement));
        if (queueBinding == null) {
            logger.warn("Queue does not exist even after creation! {}", aMQPMirrorBrokerConnectionElement);
            throw new IllegalAccessException("Cannot start replica");
        }
        Queue queue = queueBinding.getQueue();
        if (!queue.getAddress().equals(getMirrorSNF(aMQPMirrorBrokerConnectionElement))) {
            logger.warn("Queue {} belong to a different address ({}), while we expected it to be {}", queue, queue.getAddress(), addressInfo.getName());
            throw new IllegalAccessException("Cannot start replica");
        }
        AMQPMirrorControllerSource aMQPMirrorControllerSource = new AMQPMirrorControllerSource(this.protonProtocolManager, queue, activeMQServer, aMQPMirrorBrokerConnectionElement, this);
        this.mirrorControllerSource = aMQPMirrorControllerSource;
        activeMQServer.scanAddresses(aMQPMirrorControllerSource);
        if (mirrorController == null) {
            activeMQServer.installMirrorController(aMQPMirrorControllerSource);
        } else {
            if (mirrorController instanceof AMQPMirrorControllerSource) {
                AMQPMirrorControllerAggregation aMQPMirrorControllerAggregation = new AMQPMirrorControllerAggregation();
                aMQPMirrorControllerAggregation.addPartition((AMQPMirrorControllerSource) mirrorController);
                mirrorController = aMQPMirrorControllerAggregation;
                activeMQServer.installMirrorController(aMQPMirrorControllerAggregation);
            }
            ((AMQPMirrorControllerAggregation) mirrorController).addPartition(aMQPMirrorControllerSource);
        }
        return queue;
    }

    private static Queue checkCurrentMirror(AMQPBrokerConnection aMQPBrokerConnection, AMQPMirrorControllerSource aMQPMirrorControllerSource) {
        if (aMQPMirrorControllerSource.getBrokerConnection() == aMQPBrokerConnection) {
            return aMQPMirrorControllerSource.getSnfQueue();
        }
        return null;
    }

    private void installFederation(AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement, ActiveMQServer activeMQServer) throws Exception {
        AMQPFederationSource aMQPFederationSource = new AMQPFederationSource(aMQPFederatedBrokerConnectionElement.getName(), aMQPFederatedBrokerConnectionElement.getProperties(), this);
        Set<AMQPFederationAddressPolicyElement> localAddressPolicies = aMQPFederatedBrokerConnectionElement.getLocalAddressPolicies();
        if (!localAddressPolicies.isEmpty()) {
            Iterator<AMQPFederationAddressPolicyElement> it = localAddressPolicies.iterator();
            while (it.hasNext()) {
                aMQPFederationSource.addAddressMatchPolicy(AMQPFederationPolicySupport.create(it.next(), aMQPFederationSource.getWildcardConfiguration()));
            }
        }
        Set<AMQPFederationQueuePolicyElement> localQueuePolicies = aMQPFederatedBrokerConnectionElement.getLocalQueuePolicies();
        if (!localQueuePolicies.isEmpty()) {
            Iterator<AMQPFederationQueuePolicyElement> it2 = localQueuePolicies.iterator();
            while (it2.hasNext()) {
                aMQPFederationSource.addQueueMatchPolicy(AMQPFederationPolicySupport.create(it2.next(), aMQPFederationSource.getWildcardConfiguration()));
            }
        }
        Set<AMQPFederationAddressPolicyElement> remoteAddressPolicies = aMQPFederatedBrokerConnectionElement.getRemoteAddressPolicies();
        if (!remoteAddressPolicies.isEmpty()) {
            Iterator<AMQPFederationAddressPolicyElement> it3 = remoteAddressPolicies.iterator();
            while (it3.hasNext()) {
                aMQPFederationSource.addRemoteAddressMatchPolicy(AMQPFederationPolicySupport.create(it3.next(), aMQPFederationSource.getWildcardConfiguration()));
            }
        }
        Set<AMQPFederationQueuePolicyElement> remoteQueuePolicies = aMQPFederatedBrokerConnectionElement.getRemoteQueuePolicies();
        if (!remoteQueuePolicies.isEmpty()) {
            Iterator<AMQPFederationQueuePolicyElement> it4 = remoteQueuePolicies.iterator();
            while (it4.hasNext()) {
                aMQPFederationSource.addRemoteQueueMatchPolicy(AMQPFederationPolicySupport.create(it4.next(), aMQPFederationSource.getWildcardConfiguration()));
            }
        }
        this.brokerFederation = aMQPFederationSource;
        this.brokerFederation.start();
    }

    private void connectReceiver(ActiveMQProtonRemotingConnection activeMQProtonRemotingConnection, Session session, AMQPSessionContext aMQPSessionContext, Queue queue, Symbol... symbolArr) {
        logger.debug("Connecting inbound for {}", queue);
        if (session == null) {
            logger.debug("session is null");
        } else {
            activeMQProtonRemotingConnection.getAmqpConnection().runLater(() -> {
                if (this.receivers.contains(queue)) {
                    logger.debug("Receiver for queue {} already exists, just giving up", queue);
                    return;
                }
                this.receivers.add(queue);
                Receiver receiver = session.receiver(queue.getAddress().toString() + ":" + UUIDGenerator.getInstance().generateStringUUID());
                receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
                receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
                Target target = new Target();
                target.setAddress(queue.getAddress().toString());
                receiver.setTarget(target);
                Source source = new Source();
                source.setAddress(queue.getAddress().toString());
                receiver.setSource(source);
                if (symbolArr != null) {
                    source.setCapabilities(symbolArr);
                }
                receiver.open();
                activeMQProtonRemotingConnection.getAmqpConnection().flush();
                try {
                    aMQPSessionContext.addReceiver(receiver);
                } catch (Exception e) {
                    error(e);
                }
            });
        }
    }

    private void connectSender(Queue queue, String str, java.util.function.Consumer<Sender> consumer, java.util.function.Consumer<? super MessageReference> consumer2, String str2, Symbol[] symbolArr, Symbol[] symbolArr2, Symbol[] symbolArr3) {
        logger.debug("Connecting outbound for {}", queue);
        if (this.session == null) {
            logger.debug("Session is null");
        } else {
            this.protonRemotingConnection.getAmqpConnection().runLater(() -> {
                try {
                } catch (Exception e) {
                    error(e);
                }
                if (this.senders.contains(queue)) {
                    logger.debug("Sender for queue {} already exists, just giving up", queue);
                    return;
                }
                this.senders.add(queue);
                Sender sender = this.session.sender(str + ":" + UUIDGenerator.getInstance().generateStringUUID());
                sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
                sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
                Target target = new Target();
                target.setAddress(str);
                if (symbolArr2 != null) {
                    target.setCapabilities(symbolArr2);
                }
                sender.setTarget(target);
                Source source = new Source();
                source.setAddress(queue.getAddress().toString());
                sender.setSource(source);
                if (str2 != null) {
                    HashMap hashMap = new HashMap(1, 1.0f);
                    hashMap.put(AMQPMirrorControllerSource.BROKER_ID, str2);
                    sender.setProperties(hashMap);
                }
                if (symbolArr != null) {
                    sender.setDesiredCapabilities(symbolArr);
                }
                AMQPOutgoingController aMQPOutgoingController = new AMQPOutgoingController(queue, sender, this.sessionContext.getSessionSPI());
                sender.open();
                AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                ScheduledFuture<?> schedule = this.bridgesConnector.getConnectTimeoutMillis() > 0 ? this.server.getScheduledPool().schedule(() -> {
                    atomicBoolean.set(true);
                    error(ActiveMQAMQPProtocolMessageBundle.BUNDLE.brokerConnectionTimeout(), this.lastRetryCounter);
                }, this.bridgesConnector.getConnectTimeoutMillis(), TimeUnit.MILLISECONDS) : null;
                sender.attachments().set(AmqpSupport.AMQP_LINK_INITIALIZER_KEY, Runnable.class, () -> {
                    ProtonServerSenderContext beforeDelivery = new ProtonServerSenderContext(this.protonRemotingConnection.getAmqpConnection(), sender, this.sessionContext, this.sessionContext.getSessionSPI(), aMQPOutgoingController).setBeforeDelivery(consumer2);
                    try {
                        if (!atomicBoolean.get()) {
                            if (schedule != null) {
                                schedule.cancel(false);
                            }
                            if (sender.getRemoteTarget() == null) {
                                error(ActiveMQAMQPProtocolMessageBundle.BUNDLE.senderLinkRefused(sender.getTarget().getAddress()), this.lastRetryCounter);
                                return;
                            }
                            if (symbolArr3 != null && !AmqpSupport.verifyOfferedCapabilities(sender, symbolArr3)) {
                                error(ActiveMQAMQPProtocolMessageBundle.BUNDLE.missingOfferedCapability(Arrays.toString(symbolArr3)), this.lastRetryCounter);
                                return;
                            }
                            if (str2 != null) {
                                if (sender.getRemoteProperties() == null || !sender.getRemoteProperties().containsKey(AMQPMirrorControllerSource.BROKER_ID)) {
                                    error(ActiveMQAMQPProtocolMessageBundle.BUNDLE.missingBrokerID(), this.lastRetryCounter);
                                    return;
                                } else if (sender.getRemoteProperties().get(AMQPMirrorControllerSource.BROKER_ID).equals(str2)) {
                                    error(ActiveMQAMQPProtocolMessageBundle.BUNDLE.brokerConnectionMirrorItself(), this.lastRetryCounter);
                                    return;
                                }
                            }
                            this.sessionContext.addSender(sender, beforeDelivery);
                            if (consumer != null) {
                                consumer.accept(sender);
                            }
                        }
                    } catch (Exception e2) {
                        error(e2);
                    }
                });
                this.protonRemotingConnection.getAmqpConnection().flush();
            });
        }
    }

    public void error(Throwable th) {
        error(th, 0);
    }

    public void runtimeError(Throwable th) {
        error(th, 0);
    }

    public void connectError(Throwable th) {
        error(th, this.lastRetryCounter);
    }

    protected void error(Throwable th, int i) {
        this.retryCounter = i;
        this.connecting = false;
        logger.warn(th.getMessage(), th);
        redoConnection();
    }

    public void disconnect() throws Exception {
        redoConnection();
    }

    @Override // org.apache.activemq.artemis.spi.core.remoting.BaseConnectionLifeCycleListener
    public void connectionCreated(ActiveMQComponent activeMQComponent, Connection connection, ClientProtocolManager clientProtocolManager) {
    }

    @Override // org.apache.activemq.artemis.spi.core.remoting.BaseConnectionLifeCycleListener
    public void connectionDestroyed(Object obj, boolean z) {
        this.server.getRemotingService().removeConnection(obj);
        redoConnection();
    }

    @Override // org.apache.activemq.artemis.spi.core.remoting.BaseConnectionLifeCycleListener
    public void connectionException(Object obj, ActiveMQException activeMQException) {
        redoConnection();
    }

    private void redoConnection() {
        if (this.protonRemotingConnection != null) {
            this.protonRemotingConnection.getAmqpConnection().clearLinkRemoteCloseListeners();
        }
        if (this.brokerFederation != null) {
            try {
                this.brokerFederation.handleConnectionDropped();
            } catch (ActiveMQException e) {
                logger.debug("Broker Federation on connection {} threw an error on stop before connection attempt", getName());
            }
        }
        this.connectExecutor.execute(() -> {
            if (this.connecting) {
                logger.debug("Broker connection {} was already in retry mode, exception or retry not captured", getName());
                return;
            }
            this.connecting = true;
            try {
                if (this.protonRemotingConnection != null) {
                    this.protonRemotingConnection.fail(new ActiveMQException("Connection being recreated"));
                    this.connection = null;
                    this.protonRemotingConnection = null;
                }
            } catch (Throwable th) {
                logger.warn(th.getMessage(), th);
            }
            retryConnection();
        });
    }

    @Override // org.apache.activemq.artemis.spi.core.remoting.BaseConnectionLifeCycleListener
    public void connectionReadyForWrites(Object obj, boolean z) {
        this.protonRemotingConnection.flush();
    }

    public static boolean isCoreMessageTunnelingEnabled(AMQPMirrorBrokerConnectionElement aMQPMirrorBrokerConnectionElement) {
        Object obj = aMQPMirrorBrokerConnectionElement.getProperties().get(AmqpSupport.TUNNEL_CORE_MESSAGES);
        if (obj instanceof Boolean) {
            return ((Boolean) obj).booleanValue();
        }
        if (obj instanceof String) {
            return Boolean.parseBoolean((String) obj);
        }
        return true;
    }
}
