package it.wldt.adapter.mqtt.digital;

import it.wldt.adapter.digital.DigitalAdapter;
import it.wldt.adapter.mqtt.digital.topic.incoming.DigitalTwinIncomingTopic;
import it.wldt.adapter.mqtt.digital.topic.outgoing.DigitalTwinOutgoingTopic;
import it.wldt.adapter.mqtt.digital.topic.outgoing.EventNotificationOutgoingTopic;
import it.wldt.adapter.mqtt.digital.topic.outgoing.PropertyOutgoingTopic;
import it.wldt.core.state.DigitalTwinState;
import it.wldt.core.state.DigitalTwinStateChange;
import it.wldt.core.state.DigitalTwinStateEventNotification;
import it.wldt.core.state.DigitalTwinStateProperty;
import it.wldt.core.state.DigitalTwinStateResource;
import it.wldt.exception.EventBusException;
import it.wldt.exception.WldtDigitalTwinStateEventException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:it/wldt/adapter/mqtt/digital/MqttDigitalAdapter.class */
public class MqttDigitalAdapter extends DigitalAdapter<MqttDigitalAdapterConfiguration> {
    private static final Logger logger = LoggerFactory.getLogger(MqttDigitalAdapter.class);
    private final MqttClient mqttClient;

    public MqttDigitalAdapter(String str, MqttDigitalAdapterConfiguration mqttDigitalAdapterConfiguration) throws MqttException {
        super(str, mqttDigitalAdapterConfiguration);
        this.mqttClient = new MqttClient(((MqttDigitalAdapterConfiguration) getConfiguration()).getBrokerConnectionString(), ((MqttDigitalAdapterConfiguration) getConfiguration()).getClientId(), ((MqttDigitalAdapterConfiguration) getConfiguration()).getPersistence());
    }

    protected void onStateUpdate(DigitalTwinState digitalTwinState, DigitalTwinState digitalTwinState2, ArrayList<DigitalTwinStateChange> arrayList) {
        if (arrayList == null || arrayList.isEmpty()) {
            logger.info("No relevant DT's state changes detected !");
            return;
        }
        Iterator<DigitalTwinStateChange> it2 = arrayList.iterator();
        while (it2.hasNext()) {
            DigitalTwinStateChange next = it2.next();
            DigitalTwinStateChange.Operation operation = next.getOperation();
            DigitalTwinStateChange.ResourceType resourceType = next.getResourceType();
            DigitalTwinStateResource resource = next.getResource();
            if ((resourceType.equals(DigitalTwinStateChange.ResourceType.PROPERTY) && operation.equals(DigitalTwinStateChange.Operation.OPERATION_UPDATE)) || (resourceType.equals(DigitalTwinStateChange.ResourceType.PROPERTY_VALUE) && operation.equals(DigitalTwinStateChange.Operation.OPERATION_UPDATE_VALUE))) {
                if (resource instanceof DigitalTwinStateProperty) {
                    DigitalTwinStateProperty<?> digitalTwinStateProperty = (DigitalTwinStateProperty) resource;
                    if (((MqttDigitalAdapterConfiguration) getConfiguration()).getPropertyUpdateTopics().containsKey(digitalTwinStateProperty.getKey())) {
                        PropertyOutgoingTopic<?> propertyOutgoingTopic = ((MqttDigitalAdapterConfiguration) getConfiguration()).getPropertyUpdateTopics().get(digitalTwinStateProperty.getKey());
                        publishOnDigitalTwinOutgoingTopic(propertyOutgoingTopic, propertyOutgoingTopic.applyPublishFunction(digitalTwinStateProperty));
                    }
                }
            }
        }
    }

    protected void onEventNotificationReceived(DigitalTwinStateEventNotification<?> digitalTwinStateEventNotification) {
        logger.info("MQTT Digital Adapter({}) - received event: {}", getId(), digitalTwinStateEventNotification.getDigitalEventKey());
        if (((MqttDigitalAdapterConfiguration) getConfiguration()).getEventNotificationTopics().containsKey(digitalTwinStateEventNotification.getDigitalEventKey())) {
            EventNotificationOutgoingTopic<?> eventNotificationOutgoingTopic = ((MqttDigitalAdapterConfiguration) getConfiguration()).getEventNotificationTopics().get(digitalTwinStateEventNotification.getDigitalEventKey());
            publishOnDigitalTwinOutgoingTopic(eventNotificationOutgoingTopic, eventNotificationOutgoingTopic.applyPublishFunction(digitalTwinStateEventNotification));
        }
    }

    public void onAdapterStart() {
        connectToMqttBroker();
        ((MqttDigitalAdapterConfiguration) getConfiguration()).getActionIncomingTopics().values().forEach((v1) -> {
            subscribeClientToDigitalTwinIncomingTopic(v1);
        });
        notifyDigitalAdapterBound();
    }

    public void onAdapterStop() {
        try {
            this.mqttClient.disconnect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    public void onDigitalTwinSync(DigitalTwinState digitalTwinState) {
        try {
            digitalTwinState.getEventList().map(list -> {
                return (List) list.stream().map((v0) -> {
                    return v0.getKey();
                }).filter(str -> {
                    return ((MqttDigitalAdapterConfiguration) getConfiguration()).getEventNotificationTopics().containsKey(str);
                }).collect(Collectors.toList());
            }).ifPresent(list2 -> {
                try {
                    observeDigitalTwinEventsNotifications(list2);
                } catch (EventBusException e) {
                    e.printStackTrace();
                }
            });
        } catch (WldtDigitalTwinStateEventException e) {
            e.printStackTrace();
        }
    }

    public void onDigitalTwinUnSync(DigitalTwinState digitalTwinState) {
    }

    public void onDigitalTwinCreate() {
    }

    public void onDigitalTwinStart() {
    }

    public void onDigitalTwinStop() {
    }

    public void onDigitalTwinDestroy() {
    }

    private void publishOnDigitalTwinOutgoingTopic(DigitalTwinOutgoingTopic<?> digitalTwinOutgoingTopic, String str) {
        try {
            MqttMessage mqttMessage = new MqttMessage(str.getBytes());
            mqttMessage.setQos(digitalTwinOutgoingTopic.getQos().intValue());
            mqttMessage.setRetained(true);
            this.mqttClient.publish(digitalTwinOutgoingTopic.getTopic(), mqttMessage);
            logger.info("MQTT Digital Adapter - MQTT client published message: {} on topic: {}", str, digitalTwinOutgoingTopic.getTopic());
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    private void subscribeClientToDigitalTwinIncomingTopic(DigitalTwinIncomingTopic digitalTwinIncomingTopic) {
        try {
            this.mqttClient.subscribe(digitalTwinIncomingTopic.getTopic(), digitalTwinIncomingTopic.getQos().intValue(), (str, mqttMessage) -> {
                logger.info("MQTT Digital Adapter -receive message on topic: {}", str);
                new Thread(() -> {
                    try {
                        publishDigitalActionWldtEvent(digitalTwinIncomingTopic.applySubscribeFunction(new String(mqttMessage.getPayload())));
                    } catch (EventBusException e) {
                        e.printStackTrace();
                    }
                }).start();
            });
            logger.info("MQTT Digital Adapter - MQTT client subscribed to topic: {}", digitalTwinIncomingTopic.getTopic());
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    private void connectToMqttBroker() {
        try {
            this.mqttClient.connect(((MqttDigitalAdapterConfiguration) getConfiguration()).getConnectOptions());
            logger.info("MQTT Digital Adapter - MQTT client connected to broker - clientId: {}", ((MqttDigitalAdapterConfiguration) getConfiguration()).getClientId());
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}
