package de.iip_ecosphere.platform.transport.connectors.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Envelope;
import com.sun.jna.platform.win32.WinError;
import de.iip_ecosphere.platform.transport.connectors.ReceptionCallback;
import de.iip_ecosphere.platform.transport.connectors.TransportParameter;
import de.iip_ecosphere.platform.transport.connectors.impl.AbstractTransportConnector;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:jars/transport.amqp-0.6.0-SNAPSHOT.jar:de/iip_ecosphere/platform/transport/connectors/rabbitmq/RabbitMqAmqpTransportConnector.class
 */
/* loaded from: input_file:jars/transport.amqp-0.6.0.jar:de/iip_ecosphere/platform/transport/connectors/rabbitmq/RabbitMqAmqpTransportConnector.class */
public class RabbitMqAmqpTransportConnector extends AbstractTransportConnector {
    public static final String NAME = "AMQP";
    private Connection connection;
    private Channel channel;
    private boolean tlsEnabled = false;
    private Map<String, String> tags = Collections.synchronizedMap(new HashMap());
    private boolean closing = false;
    private Map<String, String> queueStream = Collections.synchronizedMap(new HashMap());

    @Override // de.iip_ecosphere.platform.transport.connectors.TransportConnector
    public void syncSend(String str, Object obj) throws IOException {
        send(str, obj, true);
    }

    @Override // de.iip_ecosphere.platform.transport.connectors.TransportConnector
    public void asyncSend(String str, Object obj) throws IOException {
        send(str, obj, false);
    }

    private void checkStream(String str, boolean z) throws IOException {
        if (!isStreamKnown(str)) {
            this.channel.exchangeDeclare(str, BuiltinExchangeType.FANOUT, false, true, (Map<String, Object>) null);
            registerStream(str);
        }
        if (z || this.queueStream.get(str) != null) {
            return;
        }
        AMQP.Queue.DeclareOk queueDeclare = this.channel.queueDeclare();
        this.queueStream.put(str, queueDeclare.getQueue());
        this.channel.queueBind(queueDeclare.getQueue(), str, "");
    }

    private void send(String str, Object obj, boolean z) throws IOException {
        try {
            checkStream(str, true);
            this.channel.basicPublish(str, "", null, serialize(str, obj));
        } catch (AlreadyClosedException e) {
        } catch (IOException e2) {
            if (!this.closing) {
                throw e2;
            }
        }
    }

    @Override // de.iip_ecosphere.platform.transport.connectors.impl.AbstractTransportConnector, de.iip_ecosphere.platform.transport.connectors.TransportConnector
    public void setReceptionCallback(String str, ReceptionCallback<?> receptionCallback) throws IOException {
        checkStream(str, false);
        super.setReceptionCallback(str, receptionCallback);
        DeliverCallback deliverCallback = (str2, delivery) -> {
            Envelope envelope = delivery.getEnvelope();
            String exchange = envelope.getExchange();
            if (null == exchange || exchange.length() == 0) {
                exchange = envelope.getRoutingKey();
            }
            notifyCallback(exchange, delivery.getBody());
        };
        String uuid = UUID.randomUUID().toString();
        this.channel.basicConsume(this.queueStream.get(str), true, uuid, deliverCallback, str3 -> {
        });
        this.tags.put(str, uuid);
    }

    @Override // de.iip_ecosphere.platform.transport.connectors.impl.AbstractTransportConnector, de.iip_ecosphere.platform.transport.connectors.TransportConnector
    public void unsubscribe(String str, boolean z) throws IOException {
        super.unsubscribe(str, z);
        String remove = this.tags.remove(str);
        if (null != remove) {
            this.channel.basicCancel(remove);
        }
        if (z) {
            this.channel.queueDeleteNoWait(str, true, false);
        }
    }

    @Override // de.iip_ecosphere.platform.transport.connectors.TransportConnector
    public String composeStreamName(String str, String str2) {
        String str3 = (str == null || str.length() <= 0) ? str2 : str + "-" + str2;
        if (str3.length() <= 256) {
            return str3;
        }
        str3.substring(0, WinError.ERROR_INVALID_EA_NAME);
        throw new IllegalArgumentException("stream name length > 256");
    }

    @Override // de.iip_ecosphere.platform.transport.connectors.impl.AbstractTransportConnector, de.iip_ecosphere.platform.transport.connectors.TransportConnector
    public void connect(TransportParameter transportParameter) throws IOException {
        super.connect(transportParameter);
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(transportParameter.getHost());
        connectionFactory.setPort(transportParameter.getPort());
        connectionFactory.setAutomaticRecoveryEnabled(true);
        applyAuthenticationKey(transportParameter.getAuthenticationKey(), (str, str2, str3) -> {
            connectionFactory.setUsername(str);
            connectionFactory.setPassword(str2);
            return true;
        });
        if (useTls(transportParameter)) {
            try {
                connectionFactory.useSslProtocol(createTlsContext(transportParameter));
                this.tlsEnabled = true;
            } catch (IOException e) {
                LoggerFactory.getLogger(getClass()).error("AMQP: Loading keystore " + e.getMessage() + ". Trying with no TLS.");
            }
        }
        configureFactory(connectionFactory);
        try {
            LoggerFactory.getLogger(getClass()).info("AMQP: Connecting to " + transportParameter.getHost() + " " + transportParameter.getPort());
            this.connection = connectionFactory.newConnection();
            this.channel = this.connection.createChannel();
        } catch (TimeoutException e2) {
            throw new IOException(e2.getMessage(), e2);
        }
    }

    protected void configureFactory(ConnectionFactory connectionFactory) {
    }

    @Override // de.iip_ecosphere.platform.transport.connectors.impl.AbstractTransportConnector, de.iip_ecosphere.platform.transport.connectors.TransportConnector
    public void disconnect() throws IOException {
        if (this.closing) {
            return;
        }
        this.closing = true;
        super.disconnect();
        try {
            this.channel.close();
        } catch (AlreadyClosedException e) {
        } catch (TimeoutException e2) {
        }
        try {
            this.connection.close();
        } catch (AlreadyClosedException e3) {
        }
    }

    @Override // de.iip_ecosphere.platform.transport.connectors.TransportConnector
    public String getName() {
        return NAME;
    }

    @Override // de.iip_ecosphere.platform.transport.connectors.TransportConnector
    public String supportedEncryption() {
        return "TLS";
    }

    @Override // de.iip_ecosphere.platform.transport.connectors.TransportConnector
    public String enabledEncryption() {
        if (this.tlsEnabled) {
            return "TLS";
        }
        return null;
    }
}
