package it.wldt.adapter.mqtt.physical;

import it.wldt.adapter.mqtt.physical.topic.incoming.DigitalTwinIncomingTopic;
import it.wldt.adapter.mqtt.physical.topic.outgoing.DigitalTwinOutgoingTopic;
import it.wldt.adapter.physical.ConfigurablePhysicalAdapter;
import it.wldt.adapter.physical.event.PhysicalAssetActionWldtEvent;
import it.wldt.adapter.physical.event.PhysicalAssetEventWldtEvent;
import it.wldt.adapter.physical.event.PhysicalAssetPropertyWldtEvent;
import it.wldt.exception.EventBusException;
import it.wldt.exception.PhysicalAdapterException;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:it/wldt/adapter/mqtt/physical/MqttPhysicalAdapter.class */
public class MqttPhysicalAdapter extends ConfigurablePhysicalAdapter<MqttPhysicalAdapterConfiguration> {
    private static final Logger logger = LoggerFactory.getLogger(MqttPhysicalAdapter.class);
    private final IMqttClient mqttClient;

    public MqttPhysicalAdapter(String str, MqttPhysicalAdapterConfiguration mqttPhysicalAdapterConfiguration) throws MqttException {
        super(str, mqttPhysicalAdapterConfiguration);
        this.mqttClient = new MqttClient(((MqttPhysicalAdapterConfiguration) getConfiguration()).getBrokerConnectionString(), ((MqttPhysicalAdapterConfiguration) getConfiguration()).getClientId(), ((MqttPhysicalAdapterConfiguration) getConfiguration()).getPersistence());
    }

    public void onIncomingPhysicalAction(PhysicalAssetActionWldtEvent<?> physicalAssetActionWldtEvent) {
        logger.info("MQTT Physical Adapter received action event: {}", physicalAssetActionWldtEvent);
        ((MqttPhysicalAdapterConfiguration) getConfiguration()).getOutgoingTopicByActionKey(physicalAssetActionWldtEvent.getActionKey()).ifPresent(digitalTwinOutgoingTopic -> {
            publishOnTopic(digitalTwinOutgoingTopic, digitalTwinOutgoingTopic.applyPublishFunction(physicalAssetActionWldtEvent));
        });
    }

    public void onAdapterStart() {
        try {
            connectToMqttBroker();
            ((MqttPhysicalAdapterConfiguration) getConfiguration()).getIncomingTopics().forEach(this::subscribeClientToDigitalTwinIncomingTopic);
            logger.info("MQTT Physical Adapter - MQTT client subscribed to incoming topics");
            notifyPhysicalAdapterBound(((MqttPhysicalAdapterConfiguration) getConfiguration()).getPhysicalAssetDescription());
        } catch (PhysicalAdapterException | EventBusException e) {
            e.printStackTrace();
        }
    }

    public void onAdapterStop() {
        try {
            this.mqttClient.disconnect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    private void publishOnTopic(DigitalTwinOutgoingTopic digitalTwinOutgoingTopic, String str) {
        try {
            MqttMessage mqttMessage = new MqttMessage(str.getBytes());
            mqttMessage.setQos(digitalTwinOutgoingTopic.getQos().intValue());
            mqttMessage.setRetained(true);
            this.mqttClient.publish(digitalTwinOutgoingTopic.getTopic(), mqttMessage);
            logger.info("Physical Adapter - MQTT client published message: {} on topic: {}", str, digitalTwinOutgoingTopic.getTopic());
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    private void subscribeClientToDigitalTwinIncomingTopic(DigitalTwinIncomingTopic digitalTwinIncomingTopic) {
        try {
            this.mqttClient.subscribe(digitalTwinIncomingTopic.getTopic(), digitalTwinIncomingTopic.getQos().intValue(), (str, mqttMessage) -> {
                digitalTwinIncomingTopic.applySubscribeFunction(new String(mqttMessage.getPayload())).forEach(wldtEvent -> {
                    try {
                        if (wldtEvent instanceof PhysicalAssetEventWldtEvent) {
                            publishPhysicalAssetEventWldtEvent((PhysicalAssetEventWldtEvent) wldtEvent);
                        } else if (wldtEvent instanceof PhysicalAssetPropertyWldtEvent) {
                            publishPhysicalAssetPropertyWldtEvent((PhysicalAssetPropertyWldtEvent) wldtEvent);
                        }
                    } catch (EventBusException e) {
                        e.printStackTrace();
                    }
                });
            });
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    private void connectToMqttBroker() {
        try {
            this.mqttClient.connect(((MqttPhysicalAdapterConfiguration) getConfiguration()).getConnectOptions());
            logger.info("MQTT Physical Adapter - MQTT client connected to broker - clientId: {}", ((MqttPhysicalAdapterConfiguration) getConfiguration()).getClientId());
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}
