package org.apache.activemq.artemis.core.server.cluster.impl;

import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import net.bytebuddy.utility.JavaConstant;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.TransformerConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.commons.configuration2.tree.DefaultExpressionEngineSymbols;
import org.fusesource.jansi.AnsiRenderer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.class */
public class ClusterConnectionBridge extends BridgeImpl {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final ClusterConnection clusterConnection;
    private final ClusterManager clusterManager;
    private final MessageFlowRecord flowRecord;
    private final SimpleString managementAddress;
    private final SimpleString managementNotificationAddress;
    private ClientConsumer notifConsumer;
    private final SimpleString idsHeaderName;
    private final long targetNodeEventUID;
    private final StorageManager storageManager;
    private final ServerLocatorInternal discoveryLocator;
    private final String storeAndForwardPrefix;
    private TopologyMemberImpl member;

    public ClusterConnectionBridge(ClusterConnection clusterConnection, ClusterManager clusterManager, ServerLocatorInternal serverLocatorInternal, ServerLocatorInternal serverLocatorInternal2, int i, int i2, long j, double d, long j2, UUID uuid, long j3, String str, SimpleString simpleString, Queue queue, Executor executor, Filter filter, SimpleString simpleString2, ScheduledExecutorService scheduledExecutorService, TransformerConfiguration transformerConfiguration, boolean z, String str2, String str3, ActiveMQServer activeMQServer, SimpleString simpleString3, SimpleString simpleString4, MessageFlowRecord messageFlowRecord, TransportConfiguration transportConfiguration, String str4, StorageManager storageManager) throws ActiveMQException {
        super(serverLocatorInternal, new BridgeConfiguration().setName(simpleString == null ? null : simpleString.toString()).setInitialConnectAttempts(i).setReconnectAttempts(i2).setReconnectAttemptsOnSameNode(0).setRetryInterval(j).setRetryIntervalMultiplier(d).setMaxRetryInterval(j2).setFilterString(filter == null ? null : filter.toString()).setForwardingAddress(simpleString2 == null ? null : simpleString2.toString()).setUseDuplicateDetection(z).setUser(str2).setPassword(str3).setTransformerConfiguration(transformerConfiguration).setRoutingType(ComponentConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultBridgeRoutingType())), uuid, queue, executor, scheduledExecutorService, activeMQServer);
        this.discoveryLocator = serverLocatorInternal2;
        this.idsHeaderName = Message.HDR_ROUTE_TO_IDS.concat(simpleString);
        this.clusterConnection = clusterConnection;
        this.clusterManager = clusterManager;
        this.targetNodeEventUID = j3;
        this.targetNodeID = str;
        this.managementAddress = simpleString3;
        this.managementNotificationAddress = simpleString4;
        this.flowRecord = messageFlowRecord;
        if (logger.isTraceEnabled()) {
            logger.trace("Setting up bridge between {} and {}", clusterConnection.getConnector(), serverLocatorInternal, new Exception("trace"));
        }
        this.storeAndForwardPrefix = str4;
        this.storageManager = storageManager;
    }

    @Override // org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl
    protected ClientSessionFactoryInternal createSessionFactory() throws Exception {
        this.serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(this.serverLocator, this.storageManager));
        ClientSessionFactoryInternal clientSessionFactoryInternal = (ClientSessionFactoryInternal) this.serverLocator.createSessionFactory(this.targetNodeID);
        if (clientSessionFactoryInternal == null) {
            clientSessionFactoryInternal = reconnectOnOriginalNode();
            if (clientSessionFactoryInternal == null) {
                return null;
            }
        }
        setSessionFactory(clientSessionFactoryInternal);
        if (clientSessionFactoryInternal == null) {
            return null;
        }
        clientSessionFactoryInternal.setReconnectAttempts(0);
        clientSessionFactoryInternal.getConnection().addFailureListener(this);
        return clientSessionFactoryInternal;
    }

    @Override // org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl
    protected Message beforeForward(Message message, SimpleString simpleString) {
        Message copy = message.copy();
        logger.trace("Clustered bridge  copied message {} as {} before delivery", message, copy);
        HashSet<SimpleString> hashSet = new HashSet(copy.getPropertyNames());
        byte[] extraBytesProperty = message.getExtraBytesProperty(this.idsHeaderName);
        if (extraBytesProperty == null) {
            ActiveMQServerLogger.LOGGER.noQueueIdDefined(message, copy, this.idsHeaderName);
            throw new IllegalStateException("no queueIDs defined");
        }
        for (SimpleString simpleString2 : hashSet) {
            if (simpleString2.startsWith(Message.HDR_ROUTE_TO_IDS)) {
                copy.removeProperty(simpleString2);
            }
        }
        copy.putExtraBytesProperty(Message.HDR_ROUTE_TO_IDS, extraBytesProperty);
        return super.beforeForwardingNoCopy(copy, simpleString);
    }

    private void setupNotificationConsumer() throws Exception {
        if (this.flowRecord != null) {
            if (logger.isDebugEnabled()) {
                logger.debug("Setting up notificationConsumer between {} and {} clusterConnection = {} on server {}", this.clusterConnection.getConnector(), this.flowRecord.getBridge().getForwardingConnection(), this.clusterConnection.getName(), this.clusterConnection.getServer());
            }
            this.flowRecord.reset();
            if (this.notifConsumer != null) {
                try {
                    logger.debug("Closing notification Consumer for reopening {} on bridge {}", this.notifConsumer, getName());
                    this.notifConsumer.close();
                    this.notifConsumer = null;
                } catch (ActiveMQException e) {
                    ActiveMQServerLogger.LOGGER.errorClosingConsumer(e);
                }
            }
            SimpleString of = SimpleString.of("notif." + UUIDGenerator.getInstance().generateStringUUID() + "." + this.clusterConnection.getServer().toString().replaceAll("::", JavaConstant.Dynamic.DEFAULT_NAME));
            this.sessionConsumer.createQueue(QueueConfiguration.of(of).setAddress(this.managementNotificationAddress).setFilterString(SimpleString.of("(" + ManagementHelper.HDR_BINDING_TYPE + " <> " + BindingType.DIVERT.toInt() + " OR " + ManagementHelper.HDR_BINDING_TYPE + " IS NULL) AND " + ManagementHelper.HDR_NOTIFICATION_TYPE + " IN ('" + CoreNotificationType.SESSION_CREATED + "', '" + CoreNotificationType.BINDING_ADDED + "', '" + CoreNotificationType.BINDING_REMOVED + "', '" + CoreNotificationType.BINDING_UPDATED + "', '" + CoreNotificationType.CONSUMER_CREATED + "', '" + CoreNotificationType.CONSUMER_CLOSED + "', '" + CoreNotificationType.PROPOSAL + "', '" + CoreNotificationType.PROPOSAL_RESPONSE + "', '" + CoreNotificationType.UNPROPOSAL + "') AND " + ManagementHelper.HDR_DISTANCE + " < " + this.flowRecord.getMaxHops() + " AND (" + createSelectorFromAddress(appendIgnoresToFilter(this.flowRecord.getAddress())) + ") AND (" + createPermissiveManagementNotificationToFilter() + ")")).setDurable(false).setTemporary(true).setRoutingType(RoutingType.MULTICAST));
            this.notifConsumer = this.sessionConsumer.createConsumer(of);
            this.notifConsumer.setMessageHandler(this.flowRecord);
            this.sessionConsumer.start();
            ClientMessage createMessage = this.sessionConsumer.createMessage(false);
            if (logger.isTraceEnabled()) {
                logger.trace("Requesting sendQueueInfoToQueue through {}", this, new Exception("trace"));
            }
            ManagementHelper.putOperationInvocation(createMessage, ResourceNames.BROKER, "sendQueueInfoToQueue", of.toString(), this.flowRecord.getAddress());
            ClientProducer createProducer = this.sessionConsumer.createProducer(this.managementAddress);
            try {
                logger.debug("Cluster connection bridge on {} requesting information on queues", this.clusterConnection);
                createProducer.send(createMessage);
                if (createProducer != null) {
                    createProducer.close();
                }
            } catch (Throwable th) {
                if (createProducer != null) {
                    try {
                        createProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    public static String createSelectorFromAddress(String str) {
        StringBuilder sb = new StringBuilder();
        if (str.contains(AnsiRenderer.CODE_LIST_SEPARATOR)) {
            return buildSelectorFromArray(str.split(AnsiRenderer.CODE_LIST_SEPARATOR));
        }
        if (str.startsWith("!")) {
            sb.append(ManagementHelper.HDR_ADDRESS + " NOT LIKE '" + str.substring(1, str.length()) + "%'");
        } else {
            sb.append(ManagementHelper.HDR_ADDRESS + " LIKE '" + str + "%'");
        }
        return sb.toString();
    }

    public static String buildSelectorFromArray(String[] strArr) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (String str : strArr) {
            if (str.startsWith("!")) {
                arrayList2.add(str.substring(1, str.length()));
            } else {
                arrayList.add(str);
            }
        }
        StringBuilder sb = new StringBuilder(DefaultExpressionEngineSymbols.DEFAULT_INDEX_START);
        if (arrayList.size() > 0) {
            if (arrayList2.size() > 0) {
                sb.append(DefaultExpressionEngineSymbols.DEFAULT_INDEX_START);
            }
            for (int i = 0; i < arrayList.size(); i++) {
                sb.append("(" + ManagementHelper.HDR_ADDRESS + " LIKE '" + ((String) arrayList.get(i)) + "%')");
                if (i < arrayList.size() - 1) {
                    sb.append(" OR ");
                }
            }
            if (arrayList2.size() > 0) {
                sb.append(")");
            }
        }
        if (arrayList2.size() > 0) {
            if (arrayList.size() > 0) {
                sb.append(" AND (");
            }
            for (int i2 = 0; i2 < arrayList2.size(); i2++) {
                sb.append("(" + ManagementHelper.HDR_ADDRESS + " NOT LIKE '" + ((String) arrayList2.get(i2)) + "%')");
                if (i2 < arrayList2.size() - 1) {
                    sb.append(" AND ");
                }
            }
            if (arrayList.size() > 0) {
                sb.append(")");
            }
        }
        sb.append(")");
        return sb.toString();
    }

    private String appendIgnoresToFilter(String str) {
        if (str != null && !str.isEmpty()) {
            str = str + ",";
        }
        String str2 = (str + "!" + this.storeAndForwardPrefix) + ",!" + this.managementAddress;
        Iterator<String> it = this.clusterManager.getProtocolIgnoredAddresses().iterator();
        while (it.hasNext()) {
            str2 = str2 + ",!" + it.next();
        }
        return str2;
    }

    private String createPermissiveManagementNotificationToFilter() {
        return ManagementHelper.HDR_NOTIFICATION_TYPE + " = '" + CoreNotificationType.SESSION_CREATED + "' OR (" + ((CharSequence) ManagementHelper.HDR_ADDRESS) + " NOT LIKE '" + ((CharSequence) this.managementNotificationAddress) + "%')";
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl
    public void nodeUP(TopologyMember topologyMember, boolean z) {
        if (topologyMember == null || this.targetNodeID == null || this.targetNodeID.equals(topologyMember.getNodeId())) {
            super.nodeUP(topologyMember, z);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl
    public void afterConnect() throws Exception {
        super.afterConnect();
        setupNotificationConsumer();
    }

    @Override // org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl
    protected void tryScheduleRetryReconnect(ActiveMQExceptionType activeMQExceptionType) {
        if (activeMQExceptionType != ActiveMQExceptionType.DISCONNECTED) {
            scheduleRetryConnect();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl
    public void fail(boolean z, boolean z2) {
        logger.debug("Cluster Bridge {} failed, permanently={}", getName(), Boolean.valueOf(z));
        super.fail(z, z2);
        if (!z) {
            this.clusterConnection.disconnectRecord(this.targetNodeID);
            return;
        }
        logger.debug("cluster node for bridge {} is permanently down", getName());
        this.clusterConnection.removeRecord(this.targetNodeID);
        if (z2) {
            this.executor.execute(() -> {
                logger.debug("Scaling down queue {}", this.queue);
                try {
                    this.queue.deleteQueue(true);
                    this.queue.removeAddress();
                } catch (ActiveMQAddressDoesNotExistException e) {
                    logger.debug("ActiveMQAddressDoesNotExistException during scale down for queue {}", this.queue);
                } catch (Exception e2) {
                    logger.warn(e2.getMessage(), (Throwable) e2);
                }
            });
        }
    }
}
