package org.apache.activemq.artemis.protocol.amqp.federation.internal;

import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumer;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:artemis-amqp-protocol-2.32.0.jar:org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationAddressPolicyManager.class */
public abstract class FederationAddressPolicyManager implements ActiveMQServerBindingPlugin, ActiveMQServerAddressPlugin {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    protected final ActiveMQServer server;
    protected final FederationReceiveFromAddressPolicy policy;
    protected final FederationInternal federation;
    private volatile boolean started;
    protected final Map<FederationConsumerInfo, FederationConsumerEntry> remoteConsumers = new HashMap();
    protected final Map<DivertBinding, Set<SimpleString>> matchingDiverts = new HashMap();

    public FederationAddressPolicyManager(FederationInternal federationInternal, FederationReceiveFromAddressPolicy federationReceiveFromAddressPolicy) throws ActiveMQException {
        Objects.requireNonNull(federationInternal, "The Federation instance cannot be null");
        Objects.requireNonNull(federationReceiveFromAddressPolicy, "The Address match policy cannot be null");
        this.federation = federationInternal;
        this.policy = federationReceiveFromAddressPolicy;
        this.server = federationInternal.getServer();
        this.server.registerBrokerPlugin(this);
    }

    public synchronized void start() {
        if (this.started) {
            return;
        }
        this.started = true;
        this.server.registerBrokerPlugin(this);
        scanAllBindings();
    }

    public synchronized void stop() {
        if (this.started) {
            this.started = false;
            this.server.unRegisterBrokerPlugin(this);
            this.remoteConsumers.forEach((federationConsumerInfo, federationConsumerEntry) -> {
                federationConsumerEntry.getConsumer().close();
            });
            this.remoteConsumers.clear();
            this.matchingDiverts.clear();
        }
    }

    @Override // org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin
    public synchronized void afterAddAddress(AddressInfo addressInfo, boolean z) {
        if (this.started && this.policy.isEnableDivertBindings() && this.policy.test(addressInfo)) {
            try {
                this.server.getPostOffice().getDirectBindings(addressInfo.getName()).stream().filter(binding -> {
                    return binding instanceof DivertBinding;
                }).forEach(this::afterAddBinding);
            } catch (Exception e) {
                ActiveMQServerLogger.LOGGER.federationBindingsLookupError(addressInfo.getName(), e);
            }
        }
    }

    @Override // org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin
    public synchronized void afterAddBinding(Binding binding) {
        if (this.started) {
            checkBindingForMatch(binding);
        }
    }

    @Override // org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin
    public synchronized void beforeRemoveBinding(SimpleString simpleString, Transaction transaction, boolean z) {
        Binding binding = this.server.getPostOffice().getBinding(simpleString);
        AddressInfo addressInfo = this.server.getPostOffice().getAddressInfo(binding.getAddress());
        if (binding instanceof QueueBinding) {
            tryRemoveDemandOnAddress(addressInfo);
            if (this.policy.isEnableDivertBindings()) {
                this.matchingDiverts.entrySet().forEach(entry -> {
                    if (isAddressInDivertForwards(binding.getAddress(), ((DivertBinding) entry.getKey()).getDivert().getForwardAddress())) {
                        AddressInfo addressInfo2 = this.server.getPostOffice().getAddressInfo(((DivertBinding) entry.getKey()).getAddress());
                        if (((Set) entry.getValue()).remove(((QueueBinding) binding).getQueue().getName())) {
                            tryRemoveDemandOnAddress(addressInfo2);
                        }
                    }
                });
                return;
            }
            return;
        }
        if (this.policy.isEnableDivertBindings() || (binding instanceof DivertBinding)) {
            DivertBinding divertBinding = (DivertBinding) binding;
            Set<SimpleString> remove = this.matchingDiverts.remove(binding);
            if (remove != null) {
                try {
                    remove.forEach(simpleString2 -> {
                        tryRemoveDemandOnAddress(addressInfo);
                    });
                } catch (Exception e) {
                    ActiveMQServerLogger.LOGGER.federationBindingsLookupError(divertBinding.getDivert().getForwardAddress(), e);
                }
            }
        }
    }

    protected final void tryRemoveDemandOnAddress(AddressInfo addressInfo) {
        FederationConsumerInfo createConsumerInfo = createConsumerInfo(addressInfo);
        FederationConsumerEntry federationConsumerEntry = this.remoteConsumers.get(createConsumerInfo);
        if (federationConsumerEntry == null || !federationConsumerEntry.reduceDemand()) {
            return;
        }
        FederationConsumerInternal consumer = federationConsumerEntry.getConsumer();
        try {
            signalBeforeCloseFederationConsumer(consumer);
            consumer.close();
            signalAfterCloseFederationConsumer(consumer);
            this.remoteConsumers.remove(createConsumerInfo);
        } catch (Throwable th) {
            this.remoteConsumers.remove(createConsumerInfo);
            throw th;
        }
    }

    protected final void scanAllBindings() {
        this.server.getPostOffice().getAllBindings().filter(binding -> {
            return (binding instanceof QueueBinding) || (this.policy.isEnableDivertBindings() && (binding instanceof DivertBinding));
        }).forEach(binding2 -> {
            checkBindingForMatch(binding2);
        });
    }

    protected final void checkBindingForMatch(Binding binding) {
        if (binding instanceof QueueBinding) {
            QueueBinding queueBinding = (QueueBinding) binding;
            reactIfBindingMatchesPolicy(this.server.getPostOffice().getAddressInfo(binding.getAddress()), queueBinding);
            reactIfQueueBindingMatchesAnyDivertTarget(queueBinding);
        } else if (binding instanceof DivertBinding) {
            reactIfAnyQueueBindingMatchesDivertTarget((DivertBinding) binding);
        }
    }

    protected final void reactIfAnyQueueBindingMatchesDivertTarget(DivertBinding divertBinding) {
        if (this.policy.isEnableDivertBindings()) {
            AddressInfo addressInfo = this.server.getPostOffice().getAddressInfo(divertBinding.getAddress());
            if (testIfAddressMatchesPolicy(addressInfo) && this.matchingDiverts.get(divertBinding) == null) {
                HashSet hashSet = new HashSet();
                this.matchingDiverts.put(divertBinding, hashSet);
                SimpleString forwardAddress = divertBinding.getDivert().getForwardAddress();
                try {
                    for (SimpleString simpleString : forwardAddress.split(',')) {
                        this.server.getPostOffice().getBindingsForAddress(simpleString).getBindings().stream().filter(binding -> {
                            return binding instanceof QueueBinding;
                        }).map(binding2 -> {
                            return (QueueBinding) binding2;
                        }).forEach(queueBinding -> {
                            if (!isPluginBlockingFederationConsumerCreate(divertBinding.getDivert(), queueBinding.getQueue()) && reactIfBindingMatchesPolicy(addressInfo, queueBinding)) {
                                hashSet.add(queueBinding.getQueue().getName());
                            }
                        });
                    }
                } catch (Exception e) {
                    ActiveMQServerLogger.LOGGER.federationBindingsLookupError(forwardAddress, e);
                }
            }
        }
    }

    protected final void reactIfQueueBindingMatchesAnyDivertTarget(QueueBinding queueBinding) {
        if (this.policy.isEnableDivertBindings()) {
            SimpleString address = queueBinding.getAddress();
            SimpleString name = queueBinding.getQueue().getName();
            this.matchingDiverts.entrySet().forEach(entry -> {
                SimpleString forwardAddress = ((DivertBinding) entry.getKey()).getDivert().getForwardAddress();
                DivertBinding divertBinding = (DivertBinding) entry.getKey();
                if (((Set) entry.getValue()).contains(name) || !isAddressInDivertForwards(address, forwardAddress) || isPluginBlockingFederationConsumerCreate(divertBinding.getDivert(), queueBinding.getQueue()) || !reactIfBindingMatchesPolicy(this.server.getPostOffice().getAddressInfo(divertBinding.getAddress()), queueBinding)) {
                    return;
                }
                ((Set) entry.getValue()).add(name);
            });
        }
    }

    private static boolean isAddressInDivertForwards(SimpleString simpleString, SimpleString simpleString2) {
        for (SimpleString simpleString3 : simpleString2.split(',')) {
            if (simpleString.equals(simpleString3)) {
                return true;
            }
        }
        return false;
    }

    protected final boolean reactIfBindingMatchesPolicy(AddressInfo addressInfo, QueueBinding queueBinding) {
        if (!testIfAddressMatchesPolicy(addressInfo)) {
            return false;
        }
        logger.trace("Federation Address Policy matched on for demand on address: {} : binding: {}", addressInfo, queueBinding);
        FederationConsumerInfo createConsumerInfo = createConsumerInfo(addressInfo);
        if (this.remoteConsumers.containsKey(createConsumerInfo)) {
            logger.trace("Federation Address Policy manager found existing demand for address: {}", addressInfo);
            this.remoteConsumers.get(createConsumerInfo).addDemand();
            return true;
        }
        if (isPluginBlockingFederationConsumerCreate(addressInfo) || isPluginBlockingFederationConsumerCreate(queueBinding.getQueue())) {
            return false;
        }
        logger.trace("Federation Address Policy manager creating remote consumer for address: {}", addressInfo);
        signalBeforeCreateFederationConsumer(createConsumerInfo);
        FederationConsumerInternal createFederationConsumer = createFederationConsumer(createConsumerInfo);
        FederationConsumerEntry createConsumerEntry = createConsumerEntry(createFederationConsumer);
        createFederationConsumer.setRemoteClosedHandler(federationConsumerInternal -> {
            synchronized (this) {
                try {
                    this.remoteConsumers.remove(federationConsumerInternal.getConsumerInfo());
                    federationConsumerInternal.close();
                } catch (Throwable th) {
                    federationConsumerInternal.close();
                    throw th;
                }
            }
        });
        this.remoteConsumers.put(createConsumerInfo, createConsumerEntry);
        createFederationConsumer.start();
        signalAfterCreateFederationConsumer(createFederationConsumer);
        return true;
    }

    protected boolean testIfAddressMatchesPolicy(AddressInfo addressInfo) {
        return this.policy.test(addressInfo);
    }

    protected abstract FederationConsumerInfo createConsumerInfo(AddressInfo addressInfo);

    protected FederationConsumerEntry createConsumerEntry(FederationConsumerInternal federationConsumerInternal) {
        return new FederationConsumerEntry(federationConsumerInternal);
    }

    protected abstract FederationConsumerInternal createFederationConsumer(FederationConsumerInfo federationConsumerInfo);

    protected abstract void signalBeforeCreateFederationConsumer(FederationConsumerInfo federationConsumerInfo);

    protected abstract void signalAfterCreateFederationConsumer(FederationConsumer federationConsumer);

    protected abstract void signalBeforeCloseFederationConsumer(FederationConsumer federationConsumer);

    protected abstract void signalAfterCloseFederationConsumer(FederationConsumer federationConsumer);

    protected abstract boolean isPluginBlockingFederationConsumerCreate(AddressInfo addressInfo);

    protected abstract boolean isPluginBlockingFederationConsumerCreate(Divert divert, Queue queue);

    protected abstract boolean isPluginBlockingFederationConsumerCreate(Queue queue);
}
