package org.apache.activemq.artemis.protocol.amqp.connect.federation;

import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.protocol.amqp.connect.AMQPBrokerConnection;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromQueuePolicy;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.DeleteOnClose;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Record;
import org.apache.qpid.proton.engine.Sender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.class */
public class AMQPFederationSource extends AMQPFederation {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final Symbol[] CONTROL_LINK_CAPABILITIES = {AMQPFederationConstants.FEDERATION_CONTROL_LINK};
    private final AMQPBrokerConnection brokerConnection;
    private final Map<String, FederationReceiveFromQueuePolicy> remoteQueueMatchPolicies;
    private final Map<String, FederationReceiveFromAddressPolicy> remoteAddressMatchPolicies;
    private final Map<String, Object> properties;
    private volatile AMQPFederationConfiguration configuration;

    public AMQPFederationSource(String str, Map<String, Object> map, AMQPBrokerConnection aMQPBrokerConnection) {
        super(str, aMQPBrokerConnection.getServer());
        this.remoteQueueMatchPolicies = new HashMap();
        this.remoteAddressMatchPolicies = new HashMap();
        if (map == null || map.isEmpty()) {
            this.properties = Collections.EMPTY_MAP;
        } else {
            this.properties = Collections.unmodifiableMap(new HashMap(map));
        }
        this.brokerConnection = aMQPBrokerConnection;
        this.brokerConnection.addLinkClosedInterceptor(getName(), this::invokeLinkClosedInterceptors);
    }

    public AMQPBrokerConnection getBrokerConnection() {
        return this.brokerConnection;
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederation
    public int getLinkAttachTimeout() {
        return this.configuration.getLinkAttachTimeout();
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederation
    public synchronized AMQPSessionContext getSessionContext() {
        if (this.connected) {
            return this.session;
        }
        throw new IllegalStateException("Cannot access session while federation is not connected");
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederation
    public synchronized AMQPConnectionContext getConnectionContext() {
        if (this.connected) {
            return this.connection;
        }
        throw new IllegalStateException("Cannot access connection while federation is not connected");
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederation
    public synchronized int getReceiverCredits() {
        if (this.connected) {
            return this.configuration.getReceiverCredits();
        }
        throw new IllegalStateException("Cannot access connection configuration, federation is not connected");
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederation
    public synchronized int getReceiverCreditsLow() {
        if (this.connected) {
            return this.configuration.getReceiverCreditsLow();
        }
        throw new IllegalStateException("Cannot access connection configuration, federation is not connected");
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederation
    public synchronized int getLargeMessageThreshold() {
        if (this.connected) {
            return this.configuration.getLargeMessageThreshold();
        }
        throw new IllegalStateException("Cannot access connection configuration, federation is not connected");
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederation
    public boolean isCoreMessageTunnelingEnabled() {
        if (this.connected) {
            return this.configuration.isCoreMessageTunnelingEnabled();
        }
        throw new IllegalStateException("Cannot access connection configuration, federation is not connected");
    }

    public synchronized AMQPFederationSource addRemoteQueueMatchPolicy(FederationReceiveFromQueuePolicy federationReceiveFromQueuePolicy) {
        this.remoteQueueMatchPolicies.putIfAbsent(federationReceiveFromQueuePolicy.getPolicyName(), federationReceiveFromQueuePolicy);
        return this;
    }

    public synchronized AMQPFederationSource addRemoteAddressMatchPolicy(FederationReceiveFromAddressPolicy federationReceiveFromAddressPolicy) {
        this.remoteAddressMatchPolicies.putIfAbsent(federationReceiveFromAddressPolicy.getPolicyName(), federationReceiveFromAddressPolicy);
        return this;
    }

    public synchronized void handleConnectionDropped() throws ActiveMQException {
        this.connected = false;
        AtomicReference atomicReference = new AtomicReference();
        this.queueMatchPolicies.forEach((str, federationQueuePolicyManager) -> {
            try {
                federationQueuePolicyManager.stop();
            } catch (Exception e) {
                atomicReference.compareAndExchange(null, e);
            }
        });
        this.addressMatchPolicies.forEach((str2, federationAddressPolicyManager) -> {
            try {
                federationAddressPolicyManager.stop();
            } catch (Exception e) {
                atomicReference.compareAndExchange(null, e);
            }
        });
        this.connection = null;
        this.session = null;
        if (atomicReference.get() != null) {
            Exception exc = (Exception) atomicReference.get();
            if (!(exc instanceof ActiveMQException)) {
                throw ((ActiveMQException) new ActiveMQException(exc.getMessage()).initCause(exc));
            }
            throw ((ActiveMQException) exc);
        }
    }

    public synchronized void handleConnectionRestored(AMQPConnectionContext aMQPConnectionContext, AMQPSessionContext aMQPSessionContext) throws ActiveMQException {
        Record attachments = aMQPSessionContext.getSession().getConnection().attachments();
        if (attachments.get(AMQPFederation.FEDERATION_INSTANCE_RECORD, AMQPFederation.class) != null) {
            throw new ActiveMQAMQPIllegalStateException("An existing federation instance was found on the connection");
        }
        this.connection = aMQPConnectionContext;
        this.session = aMQPSessionContext;
        this.configuration = new AMQPFederationConfiguration(aMQPConnectionContext, this.properties);
        attachments.set(AMQPFederation.FEDERATION_INSTANCE_RECORD, AMQPFederationSource.class, this);
        asyncCreateControlLink();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederation
    public void signalResourceCreateError(Exception exc) {
        this.brokerConnection.connectError(exc);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederation
    public void signalError(Exception exc) {
        this.brokerConnection.runtimeError(exc);
    }

    protected boolean interceptLinkClosedEvent(Link link) {
        return false;
    }

    private void asyncCreateControlLink() {
        this.connection.runLater(() -> {
            try {
                Sender sender = this.session.getSession().sender("Federation:" + getName() + ":" + UUIDGenerator.getInstance().generateStringUUID());
                AMQPFederationCommandDispatcher aMQPFederationCommandDispatcher = new AMQPFederationCommandDispatcher(sender, getServer(), this.session.getSessionSPI());
                Target target = new Target();
                target.setDynamic(true);
                target.setCapabilities(Symbol.valueOf("temporary-topic"));
                target.setDurable(TerminusDurability.NONE);
                target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
                HashMap hashMap = new HashMap();
                hashMap.put(AmqpSupport.LIFETIME_POLICY, DeleteOnClose.getInstance());
                target.setDynamicNodeProperties(hashMap);
                HashMap hashMap2 = new HashMap();
                hashMap2.put(AMQPFederationConstants.FEDERATION_CONFIGURATION, this.configuration.toConfigurationMap());
                sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
                sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
                sender.setDesiredCapabilities(CONTROL_LINK_CAPABILITIES);
                sender.setProperties(hashMap2);
                sender.setTarget(target);
                sender.setSource(new Source());
                sender.open();
                AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                ScheduledFuture<?> schedule = this.brokerConnection.getConnectionTimeout() > 0 ? this.brokerConnection.getServer().getScheduledPool().schedule(() -> {
                    atomicBoolean.set(true);
                    this.brokerConnection.connectError(ActiveMQAMQPProtocolMessageBundle.BUNDLE.brokerConnectionTimeout());
                }, this.brokerConnection.getConnectionTimeout(), TimeUnit.MILLISECONDS) : null;
                sender.attachments().set(AmqpSupport.AMQP_LINK_INITIALIZER_KEY, Runnable.class, () -> {
                    try {
                        if (atomicBoolean.get()) {
                            return;
                        }
                        if (schedule != null) {
                            schedule.cancel(false);
                        }
                        if (sender.getRemoteTarget() == null) {
                            this.brokerConnection.connectError(ActiveMQAMQPProtocolMessageBundle.BUNDLE.federationControlLinkRefused(sender.getName()));
                            return;
                        }
                        if (!AmqpSupport.verifyOfferedCapabilities(sender)) {
                            this.brokerConnection.connectError(ActiveMQAMQPProtocolMessageBundle.BUNDLE.missingOfferedCapability(Arrays.toString(CONTROL_LINK_CAPABILITIES)));
                            return;
                        }
                        try {
                            this.session.getSessionSPI().addMetaData("federation-name", getName());
                            this.session.addSender(sender, new ProtonServerSenderContext(this.connection, sender, this.session, this.session.getSessionSPI(), aMQPFederationCommandDispatcher));
                            this.connected = true;
                            this.remoteQueueMatchPolicies.forEach((str, federationReceiveFromQueuePolicy) -> {
                                try {
                                    aMQPFederationCommandDispatcher.sendPolicy(federationReceiveFromQueuePolicy);
                                } catch (Exception e) {
                                    this.brokerConnection.error(e);
                                }
                            });
                            this.remoteAddressMatchPolicies.forEach((str2, federationReceiveFromAddressPolicy) -> {
                                try {
                                    aMQPFederationCommandDispatcher.sendPolicy(federationReceiveFromAddressPolicy);
                                } catch (Exception e) {
                                    this.brokerConnection.error(e);
                                }
                            });
                            this.scheduler.execute(() -> {
                                synchronized (this) {
                                    if (isStarted()) {
                                        this.queueMatchPolicies.forEach((str3, federationQueuePolicyManager) -> {
                                            federationQueuePolicyManager.start();
                                        });
                                        this.addressMatchPolicies.forEach((str4, federationAddressPolicyManager) -> {
                                            federationAddressPolicyManager.start();
                                        });
                                    }
                                }
                            });
                        } catch (ActiveMQAMQPException e) {
                            throw e;
                        } catch (Exception e2) {
                            logger.trace("Exception on add of federation Metadata: ", (Throwable) e2);
                            throw new ActiveMQAMQPInternalErrorException("Error while configuring interal session metadata");
                        }
                    } catch (Exception e3) {
                        this.brokerConnection.error(e3);
                    }
                });
            } catch (Exception e) {
                this.brokerConnection.error(e);
            }
            this.connection.flush();
        });
    }
}
