package org.springframework.integration.mqtt.inbound;

import java.util.Arrays;
import java.util.stream.Stream;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.acks.SimpleAcknowledgment;
import org.springframework.integration.mqtt.core.ClientManager;
import org.springframework.integration.mqtt.core.ConsumerStopAction;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoComponent;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttUtils;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.util.Assert;

/* loaded from: input_file:BOOT-INF/lib/spring-integration-mqtt-6.0.5.jar:org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.class */
public class MqttPahoMessageDrivenChannelAdapter extends AbstractMqttMessageDrivenChannelAdapter<IMqttAsyncClient, MqttConnectOptions> implements MqttCallbackExtended, MqttPahoComponent {
    private final MqttPahoClientFactory clientFactory;
    private volatile IMqttAsyncClient client;
    private volatile ConsumerStopAction consumerStopAction;
    private volatile boolean readyToSubscribeOnStart;

    /* loaded from: input_file:BOOT-INF/lib/spring-integration-mqtt-6.0.5.jar:org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter$AcknowledgmentImpl.class */
    private static class AcknowledgmentImpl implements SimpleAcknowledgment {
        private final int id;
        private final int qos;
        private final IMqttAsyncClient ackClient;

        AcknowledgmentImpl(int i, int i2, IMqttAsyncClient iMqttAsyncClient) {
            this.id = i;
            this.qos = i2;
            this.ackClient = iMqttAsyncClient;
        }

        @Override // org.springframework.integration.acks.SimpleAcknowledgment
        public void acknowledge() {
            if (this.ackClient == null) {
                throw new IllegalStateException("Client has changed");
            }
            try {
                this.ackClient.messageArrivedComplete(this.id, this.qos);
            } catch (MqttException e) {
                throw new IllegalStateException(e);
            }
        }
    }

    public MqttPahoMessageDrivenChannelAdapter(String str, String str2, String... strArr) {
        this(str, str2, new DefaultMqttPahoClientFactory(), strArr);
    }

    public MqttPahoMessageDrivenChannelAdapter(String str, String str2, MqttPahoClientFactory mqttPahoClientFactory, String... strArr) {
        super(str, str2, strArr);
        this.clientFactory = mqttPahoClientFactory;
    }

    public MqttPahoMessageDrivenChannelAdapter(String str, MqttPahoClientFactory mqttPahoClientFactory, String... strArr) {
        super(null, str, strArr);
        this.clientFactory = mqttPahoClientFactory;
    }

    public MqttPahoMessageDrivenChannelAdapter(ClientManager<IMqttAsyncClient, MqttConnectOptions> clientManager, String... strArr) {
        super(clientManager, strArr);
        DefaultMqttPahoClientFactory defaultMqttPahoClientFactory = new DefaultMqttPahoClientFactory();
        defaultMqttPahoClientFactory.setConnectionOptions((MqttConnectOptions) clientManager.getConnectionInfo());
        this.clientFactory = defaultMqttPahoClientFactory;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.springframework.integration.mqtt.core.MqttPahoComponent, org.springframework.integration.mqtt.core.MqttComponent
    public MqttConnectOptions getConnectionInfo() {
        String url;
        MqttConnectOptions connectionOptions = this.clientFactory.getConnectionOptions();
        if (connectionOptions.getServerURIs() == null && (url = getUrl()) != null) {
            connectionOptions = MqttUtils.cloneConnectOptions(connectionOptions);
            connectionOptions.setServerURIs(new String[]{url});
        }
        return connectionOptions;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.mqtt.inbound.AbstractMqttMessageDrivenChannelAdapter, org.springframework.integration.endpoint.MessageProducerSupport, org.springframework.integration.endpoint.AbstractEndpoint, org.springframework.integration.context.IntegrationObjectSupport
    public void onInit() {
        super.onInit();
        if (getConverter() == null) {
            DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
            defaultPahoMessageConverter.setBeanFactory(getBeanFactory());
            setConverter(defaultPahoMessageConverter);
        }
    }

    @Override // org.springframework.integration.endpoint.MessageProducerSupport, org.springframework.integration.endpoint.AbstractEndpoint
    protected void doStart() {
        try {
            connect();
            if (this.readyToSubscribeOnStart) {
                subscribe();
            }
        } catch (Exception e) {
            if (getConnectionInfo().isAutomaticReconnect()) {
                try {
                    this.client.reconnect();
                    return;
                } catch (MqttException e2) {
                    this.logger.error(e2, "MQTT client failed to connect. Never happens.");
                    return;
                }
            }
            this.logger.error(e, "Exception while connecting");
            ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
            if (applicationEventPublisher != null) {
                applicationEventPublisher.publishEvent((ApplicationEvent) new MqttConnectionFailedEvent(this, e));
            }
        }
    }

    private synchronized void connect() throws MqttException {
        MqttConnectOptions connectionOptions = this.clientFactory.getConnectionOptions();
        this.consumerStopAction = this.clientFactory.getConsumerStopAction();
        if (this.consumerStopAction == null) {
            this.consumerStopAction = ConsumerStopAction.UNSUBSCRIBE_CLEAN;
        }
        ClientManager<IMqttAsyncClient, MqttConnectOptions> clientManager = getClientManager();
        if (clientManager != null) {
            this.client = clientManager.getClient();
            return;
        }
        Assert.state((getUrl() == null && connectionOptions.getServerURIs() == null) ? false : true, "If no 'url' provided, connectionOptions.getServerURIs() must not be null");
        this.client = this.clientFactory.getAsyncClientInstance(getUrl(), getClientId());
        this.client.setCallback(this);
        this.client.connect(connectionOptions).waitForCompletion(getCompletionTimeout());
        this.client.setManualAcks(isManualAcks());
    }

    @Override // org.springframework.integration.endpoint.MessageProducerSupport, org.springframework.integration.endpoint.AbstractEndpoint
    protected synchronized void doStop() {
        this.readyToSubscribeOnStart = false;
        try {
            if (this.consumerStopAction.equals(ConsumerStopAction.UNSUBSCRIBE_ALWAYS) || (this.consumerStopAction.equals(ConsumerStopAction.UNSUBSCRIBE_CLEAN) && this.clientFactory.getConnectionOptions().isCleanSession())) {
                this.client.unsubscribe(getTopic());
                this.readyToSubscribeOnStart = true;
            }
        } catch (MqttException e) {
            this.logger.error(e, "Exception while unsubscribing");
        }
        if (getClientManager() != null) {
            return;
        }
        try {
            this.client.disconnectForcibly(getDisconnectCompletionTimeout());
        } catch (MqttException e2) {
            this.logger.error(e2, "Exception while disconnecting");
        }
    }

    @Override // org.springframework.integration.mqtt.inbound.AbstractMqttMessageDrivenChannelAdapter, org.springframework.integration.endpoint.AbstractEndpoint, org.springframework.beans.factory.DisposableBean
    public void destroy() {
        super.destroy();
        if (getClientManager() == null) {
            try {
                this.client.close();
            } catch (MqttException e) {
                this.logger.error(e, "Could not close client");
            }
        }
    }

    @Override // org.springframework.integration.mqtt.inbound.AbstractMqttMessageDrivenChannelAdapter
    public void addTopic(String str, int i) {
        this.topicLock.lock();
        try {
            try {
                super.addTopic(str, i);
                if (this.client != null && this.client.isConnected()) {
                    this.client.subscribe(str, i, this::messageArrived).waitForCompletion(getCompletionTimeout());
                }
            } catch (MqttException e) {
                super.removeTopic(str);
                throw new MessagingException("Failed to subscribe to topic " + str, e);
            }
        } finally {
            this.topicLock.unlock();
        }
    }

    @Override // org.springframework.integration.mqtt.inbound.AbstractMqttMessageDrivenChannelAdapter
    public void removeTopic(String... strArr) {
        this.topicLock.lock();
        try {
            try {
                if (this.client != null && this.client.isConnected()) {
                    this.client.unsubscribe(strArr).waitForCompletion(getCompletionTimeout());
                }
                super.removeTopic(strArr);
                this.topicLock.unlock();
            } catch (MqttException e) {
                throw new MessagingException("Failed to unsubscribe from topic(s) " + Arrays.toString(strArr), e);
            }
        } catch (Throwable th) {
            this.topicLock.unlock();
            throw th;
        }
    }

    private void subscribe() {
        this.topicLock.lock();
        String[] topic = getTopic();
        ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
        try {
            try {
                if (topic.length > 0) {
                    int[] qos = getQos();
                    IMqttMessageListener iMqttMessageListener = this::messageArrived;
                    IMqttToken subscribe = this.client.subscribe(topic, qos, (IMqttMessageListener[]) Stream.of((Object[]) topic).map(str -> {
                        return iMqttMessageListener;
                    }).toArray(i -> {
                        return new IMqttMessageListener[i];
                    }));
                    subscribe.waitForCompletion(getCompletionTimeout());
                    int[] grantedQos = subscribe.getGrantedQos();
                    if (grantedQos.length == 1 && grantedQos[0] == 128) {
                        throw new MqttException(128);
                    }
                    warnInvalidQosForSubscription(topic, qos, grantedQos);
                }
                this.topicLock.unlock();
            } catch (MqttException e) {
                if (applicationEventPublisher != null) {
                    applicationEventPublisher.publishEvent((ApplicationEvent) new MqttConnectionFailedEvent(this, e));
                }
                this.logger.error(e, () -> {
                    return "Error subscribing to " + Arrays.toString(topic);
                });
                this.topicLock.unlock();
            }
            if (this.client.isConnected()) {
                String str2 = "Connected and subscribed to " + Arrays.toString(topic);
                this.logger.debug(str2);
                if (applicationEventPublisher != null) {
                    applicationEventPublisher.publishEvent((ApplicationEvent) new MqttSubscribedEvent(this, str2));
                }
            }
        } catch (Throwable th) {
            this.topicLock.unlock();
            throw th;
        }
    }

    private void warnInvalidQosForSubscription(String[] strArr, int[] iArr, int[] iArr2) {
        for (int i = 0; i < iArr.length; i++) {
            if (iArr2[i] != iArr[i]) {
                this.logger.warn(() -> {
                    return "Granted QOS different to Requested QOS; topics: " + Arrays.toString(strArr) + " requested: " + Arrays.toString(iArr) + " granted: " + Arrays.toString(iArr2);
                });
                return;
            }
        }
    }

    @Override // org.eclipse.paho.client.mqttv3.MqttCallback
    public synchronized void connectionLost(Throwable th) {
        if (!isRunning()) {
            this.readyToSubscribeOnStart = false;
            return;
        }
        this.logger.error(() -> {
            return "Lost connection: " + th.getMessage();
        });
        ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
        if (applicationEventPublisher != null) {
            applicationEventPublisher.publishEvent((ApplicationEvent) new MqttConnectionFailedEvent(this, th));
        }
    }

    @Override // org.eclipse.paho.client.mqttv3.MqttCallback
    public void messageArrived(String str, MqttMessage mqttMessage) {
        AbstractIntegrationMessageBuilder<?> messageBuilder = toMessageBuilder(str, mqttMessage);
        if (messageBuilder != null) {
            if (isManualAcks()) {
                messageBuilder.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, new AcknowledgmentImpl(mqttMessage.getId(), mqttMessage.getQos(), this.client));
            }
            Message<?> build = messageBuilder.build();
            try {
                sendMessage(build);
            } catch (RuntimeException e) {
                this.logger.error(e, () -> {
                    return "Unhandled exception for " + build;
                });
                throw e;
            }
        }
    }

    private AbstractIntegrationMessageBuilder<?> toMessageBuilder(String str, MqttMessage mqttMessage) {
        AbstractIntegrationMessageBuilder<?> abstractIntegrationMessageBuilder = null;
        Exception exc = null;
        try {
            abstractIntegrationMessageBuilder = getConverter().toMessageBuilder(str, mqttMessage);
        } catch (Exception e) {
            exc = e;
        }
        if (abstractIntegrationMessageBuilder == null && exc == null) {
            exc = new IllegalStateException("'MqttMessageConverter' returned 'null'");
        }
        if (exc != null) {
            GenericMessage genericMessage = new GenericMessage(mqttMessage);
            if (!sendErrorMessageIfNecessary(genericMessage, exc)) {
                throw (exc instanceof MessageConversionException ? (MessageConversionException) exc : new MessageConversionException(genericMessage, "Failed to convert from MQTT Message", exc));
            }
        }
        return abstractIntegrationMessageBuilder;
    }

    @Override // org.eclipse.paho.client.mqttv3.MqttCallback
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    }

    @Override // org.springframework.integration.mqtt.core.ClientManager.ConnectCallback
    public void connectComplete(boolean z) {
        connectComplete(z, getUrl());
    }

    @Override // org.eclipse.paho.client.mqttv3.MqttCallbackExtended
    public void connectComplete(boolean z, String str) {
        if (isActive()) {
            subscribe();
        } else {
            this.readyToSubscribeOnStart = true;
        }
    }
}
