package org.apache.activemq.artemis.protocol.amqp.connect.federation;

import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Predicate;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromQueuePolicy;
import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationAddressPolicyManager;
import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationInternal;
import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationQueuePolicyManager;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.qpid.proton.engine.Link;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:artemis-amqp-protocol-2.36.0.jar:org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.class */
public abstract class AMQPFederation implements FederationInternal {
    public static final String FEDERATION_INSTANCE_RECORD = "FEDERATION_INSTANCE_RECORD";
    protected final Map<String, FederationQueuePolicyManager> queueMatchPolicies = new ConcurrentHashMap();
    protected final Map<String, FederationAddressPolicyManager> addressMatchPolicies = new ConcurrentHashMap();
    protected final Map<String, Predicate<Link>> linkClosedinterceptors = new ConcurrentHashMap();
    protected final WildcardConfiguration wildcardConfiguration;
    protected final ScheduledExecutorService scheduler;
    protected final String name;
    protected final ActiveMQServer server;
    protected AMQPFederationEventDispatcher eventDispatcher;
    protected AMQPFederationEventProcessor eventProcessor;
    protected volatile AMQPConnectionContext connection;
    protected volatile AMQPSessionContext session;
    protected boolean started;
    protected volatile boolean connected;
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final WildcardConfiguration DEFAULT_WILDCARD_CONFIGURATION = new WildcardConfiguration();

    public AMQPFederation(String str, ActiveMQServer activeMQServer) {
        Objects.requireNonNull(str, "Federation name cannot be null");
        Objects.requireNonNull(activeMQServer, "Provided server instance cannot be null");
        this.name = str;
        this.server = activeMQServer;
        this.scheduler = activeMQServer.getScheduledPool();
        if (activeMQServer.getConfiguration().getWildcardConfiguration() != null) {
            this.wildcardConfiguration = activeMQServer.getConfiguration().getWildcardConfiguration();
        } else {
            this.wildcardConfiguration = DEFAULT_WILDCARD_CONFIGURATION;
        }
    }

    public WildcardConfiguration getWildcardConfiguration() {
        return this.wildcardConfiguration;
    }

    public ScheduledExecutorService getScheduler() {
        return this.scheduler;
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.federation.Federation
    public ActiveMQServer getServer() {
        return this.server;
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.federation.Federation
    public String getName() {
        return this.name;
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.federation.Federation
    public synchronized boolean isStarted() {
        return this.started;
    }

    public abstract AMQPConnectionContext getConnectionContext();

    public abstract AMQPSessionContext getSessionContext();

    public abstract AMQPFederationConfiguration getConfiguration();

    @Override // org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationInternal
    public final synchronized void start() throws ActiveMQException {
        if (this.started) {
            return;
        }
        handleFederationStarted();
        signalFederationStarted();
        this.started = true;
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationInternal
    public final synchronized void stop() throws ActiveMQException {
        if (this.started) {
            handleFederationStopped();
            signalFederationStopped();
            this.started = false;
            try {
                try {
                    try {
                        if (this.eventDispatcher != null) {
                            this.eventDispatcher.close();
                        }
                        if (this.eventProcessor != null) {
                            this.eventProcessor.close(false);
                        }
                    } catch (ActiveMQException e) {
                        throw e;
                    }
                } catch (Exception e2) {
                    throw ((ActiveMQException) new ActiveMQException(e2.getMessage()).initCause(e2));
                }
            } finally {
                this.eventDispatcher = null;
                this.eventProcessor = null;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String prefixEventsLinkQueueName(String str) {
        StringBuilder sb = new StringBuilder();
        char delimiter = getWildcardConfiguration().getDelimiter();
        sb.append(AMQPFederationConstants.FEDERATION_BASE_VALIDATION_ADDRESS).append(delimiter).append("events").append(delimiter).append(str);
        return sb.toString();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String prefixControlLinkQueueName(String str) {
        StringBuilder sb = new StringBuilder();
        char delimiter = getWildcardConfiguration().getDelimiter();
        sb.append(AMQPFederationConstants.FEDERATION_BASE_VALIDATION_ADDRESS).append(delimiter).append(AMQPFederationConstants.FEDERATION_CONTROL_LINK_PREFIX).append(delimiter).append(str);
        return sb.toString();
    }

    public AMQPFederation addLinkClosedInterceptor(String str, Predicate<Link> predicate) {
        this.linkClosedinterceptors.put(str, predicate);
        return this;
    }

    public AMQPFederation removeLinkClosedInterceptor(String str) {
        this.linkClosedinterceptors.remove(str);
        return this;
    }

    public synchronized AMQPFederation addQueueMatchPolicy(FederationReceiveFromQueuePolicy federationReceiveFromQueuePolicy) throws ActiveMQException {
        AMQPFederationQueuePolicyManager aMQPFederationQueuePolicyManager = new AMQPFederationQueuePolicyManager(this, federationReceiveFromQueuePolicy);
        this.queueMatchPolicies.put(federationReceiveFromQueuePolicy.getPolicyName(), aMQPFederationQueuePolicyManager);
        logger.debug("AMQP Federation {} adding queue match policy: {}", getName(), federationReceiveFromQueuePolicy.getPolicyName());
        if (isStarted()) {
            this.scheduler.execute(() -> {
                aMQPFederationQueuePolicyManager.start();
            });
        }
        return this;
    }

    public synchronized AMQPFederation addAddressMatchPolicy(FederationReceiveFromAddressPolicy federationReceiveFromAddressPolicy) throws ActiveMQException {
        AMQPFederationAddressPolicyManager aMQPFederationAddressPolicyManager = new AMQPFederationAddressPolicyManager(this, federationReceiveFromAddressPolicy);
        this.addressMatchPolicies.put(federationReceiveFromAddressPolicy.getPolicyName(), aMQPFederationAddressPolicyManager);
        logger.debug("AMQP Federation {} adding address match policy: {}", getName(), federationReceiveFromAddressPolicy.getPolicyName());
        if (isStarted()) {
            this.scheduler.execute(() -> {
                aMQPFederationAddressPolicyManager.start();
            });
        }
        return this;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void registerEventSender(AMQPFederationEventDispatcher aMQPFederationEventDispatcher) {
        if (this.eventDispatcher != null) {
            throw new IllegalStateException("Federation event dipsatcher already registered on this federation instance.");
        }
        this.eventDispatcher = aMQPFederationEventDispatcher;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void registerEventReceiver(AMQPFederationEventProcessor aMQPFederationEventProcessor) {
        if (this.eventProcessor != null) {
            throw new IllegalStateException("Federation event processor already registered on this federation instance.");
        }
        this.eventProcessor = aMQPFederationEventProcessor;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void registerMissingAddress(String str) {
        if (this.eventDispatcher != null) {
            this.eventDispatcher.addAddressWatch(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void registerMissingQueue(String str) {
        if (this.eventDispatcher != null) {
            this.eventDispatcher.addQueueWatch(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void processRemoteAddressAdded(String str) {
        this.addressMatchPolicies.values().forEach(federationAddressPolicyManager -> {
            try {
                federationAddressPolicyManager.afterRemoteAddressAdded(str);
            } catch (Exception e) {
                logger.warn("Error processing remote address added event: ", (Throwable) e);
                signalError(e);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void processRemoteQueueAdded(String str, String str2) {
        this.queueMatchPolicies.values().forEach(federationQueuePolicyManager -> {
            try {
                federationQueuePolicyManager.afterRemoteQueueAdded(str, str2);
            } catch (Exception e) {
                logger.warn("Error processing remote queue added event: ", (Throwable) e);
                signalError(e);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract void signalResourceCreateError(Exception exc);

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract void signalError(Exception exc);

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleFederationStarted() throws ActiveMQException {
        if (this.connected) {
            this.queueMatchPolicies.forEach((str, federationQueuePolicyManager) -> {
                federationQueuePolicyManager.start();
            });
            this.addressMatchPolicies.forEach((str2, federationAddressPolicyManager) -> {
                federationAddressPolicyManager.start();
            });
        }
    }

    protected void handleFederationStopped() throws ActiveMQException {
        this.queueMatchPolicies.forEach((str, federationQueuePolicyManager) -> {
            federationQueuePolicyManager.stop();
        });
        this.addressMatchPolicies.forEach((str2, federationAddressPolicyManager) -> {
            federationAddressPolicyManager.stop();
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean invokeLinkClosedInterceptors(Link link) {
        for (Map.Entry<String, Predicate<Link>> entry : this.linkClosedinterceptors.entrySet()) {
            if (entry.getValue().test(link)) {
                logger.trace("Remote link[{}] close intercepted and handled by interceptor: {}", link.getName(), entry.getKey());
                return true;
            }
        }
        return false;
    }

    protected void signalFederationStarted() {
        try {
            this.server.callBrokerAMQPFederationPlugins(aMQPFederationBrokerPlugin -> {
                if (aMQPFederationBrokerPlugin instanceof ActiveMQServerAMQPFederationPlugin) {
                    ((ActiveMQServerAMQPFederationPlugin) aMQPFederationBrokerPlugin).federationStarted(this);
                }
            });
        } catch (ActiveMQException e) {
            ActiveMQServerLogger.LOGGER.federationPluginExecutionError("federationStarted", e);
        }
    }

    protected void signalFederationStopped() {
        try {
            this.server.callBrokerAMQPFederationPlugins(aMQPFederationBrokerPlugin -> {
                if (aMQPFederationBrokerPlugin instanceof ActiveMQServerAMQPFederationPlugin) {
                    ((ActiveMQServerAMQPFederationPlugin) aMQPFederationBrokerPlugin).federationStopped(this);
                }
            });
        } catch (ActiveMQException e) {
            ActiveMQServerLogger.LOGGER.federationPluginExecutionError("federationStopped", e);
        }
    }
}
