package org.openmuc.framework.datalogger.mqtt;

import com.hivemq.client.mqtt.MqttClientSslConfig;
import com.hivemq.client.mqtt.MqttClientState;
import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient;
import com.hivemq.client.mqtt.mqtt3.Mqtt3Client;
import com.hivemq.client.mqtt.mqtt3.Mqtt3ClientBuilder;
import com.hivemq.client.mqtt.mqtt3.message.connect.Mqtt3ConnectBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.openmuc.framework.data.Record;
import org.openmuc.framework.datalogger.spi.DataLoggerService;
import org.openmuc.framework.datalogger.spi.LogChannel;
import org.openmuc.framework.datalogger.spi.LogRecordContainer;
import org.openmuc.framework.lib.ssl.SslManager;
import org.openmuc.framework.parser.spi.ParserService;
import org.openmuc.framework.parser.spi.SerializationException;
import org.osgi.framework.BundleContext;
import org.osgi.framework.InvalidSyntaxException;
import org.osgi.framework.ServiceReference;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component
/* loaded from: input_file:org/openmuc/framework/datalogger/mqtt/MqttLogger.class */
public class MqttLogger implements DataLoggerService {
    private static final Logger logger = LoggerFactory.getLogger(MqttLogger.class);
    private static final List<String> LOGGED_CHANNELS = new LinkedList();
    private static final HashMap<String, ParserService> PARSERS = new HashMap<>();
    private static final Queue<byte[]> MESSAGE_BUFFER = new LinkedList();
    private String topic;
    private String parser;
    private boolean logMultiple;
    private Mqtt3AsyncClient client;
    private boolean connected = false;

    @Activate
    public void activate(ComponentContext componentContext) {
        logger.info("Activating MQTT logger");
        connect();
        listenForParser(componentContext.getBundleContext());
    }

    @Deactivate
    public void deactivate(ComponentContext componentContext) {
        logger.info("Deactivating MQTT logger");
        this.client.disconnect();
    }

    public String getId() {
        return "mqttlogger";
    }

    public void setChannelsToLog(List<LogChannel> list) {
        LOGGED_CHANNELS.clear();
        Iterator<LogChannel> it = list.iterator();
        while (it.hasNext()) {
            LOGGED_CHANNELS.add(it.next().getId());
        }
    }

    public void log(List<LogRecordContainer> list, long j) {
        ArrayList arrayList = new ArrayList();
        for (LogRecordContainer logRecordContainer : list) {
            if (LOGGED_CHANNELS.contains(logRecordContainer.getChannelId())) {
                if (!this.logMultiple || logRecordContainer.getRecord().getValue() == null) {
                    parse(Collections.singletonList(logRecordContainer));
                } else {
                    arrayList.add(logRecordContainer);
                }
            }
        }
        if (!this.logMultiple || arrayList.isEmpty()) {
            return;
        }
        parse(arrayList);
    }

    private void parse(List<LogRecordContainer> list) {
        if (!PARSERS.containsKey(this.parser)) {
            logger.error("No parser available!");
            return;
        }
        try {
            byte[] serialize = PARSERS.get(this.parser).serialize(list);
            publish(serialize);
            if (logger.isTraceEnabled()) {
                logger.trace(new String(serialize));
            }
        } catch (SerializationException e) {
            logger.error(e.getMessage());
        }
    }

    public void logEvent(List<LogRecordContainer> list, long j) {
        log(list, j);
    }

    public List<Record> getRecords(String str, long j, long j2) throws IOException {
        throw new UnsupportedOperationException();
    }

    void connect() {
        String lowerCase = MqttLogger.class.getPackage().getName().toLowerCase();
        String property = System.getProperty(lowerCase + ".host");
        int parseInt = Integer.parseInt(System.getProperty(lowerCase + ".port"));
        boolean parseBoolean = Boolean.parseBoolean(System.getProperty(lowerCase + ".ssl"));
        String property2 = System.getProperty(lowerCase + ".username");
        String property3 = System.getProperty(lowerCase + ".password");
        this.topic = System.getProperty(lowerCase + ".topic");
        this.parser = System.getProperty(lowerCase + ".parser");
        this.logMultiple = Boolean.parseBoolean(System.getProperty(lowerCase + ".multiple"));
        Mqtt3ClientBuilder serverPort = Mqtt3Client.builder().identifier(UUID.randomUUID().toString()).automaticReconnectWithDefaultConfig().addConnectedListener(mqttClientConnectedContext -> {
            while (!MESSAGE_BUFFER.isEmpty()) {
                publish(MESSAGE_BUFFER.remove());
            }
        }).addDisconnectedListener(mqttClientDisconnectedContext -> {
            if (mqttClientDisconnectedContext.getClientConfig().getState() == MqttClientState.CONNECTING) {
                mqttClientDisconnectedContext.getReconnector().reconnect(false);
            }
        }).serverHost(property).serverPort(parseInt);
        if (parseBoolean) {
            try {
                this.client = serverPort.sslConfig(MqttClientSslConfig.builder().keyManagerFactory(SslManager.getInstance().getKeyManagerFactory()).trustManagerFactory(SslManager.getInstance().getTrustManagerFactory()).handshakeTimeout(10L, TimeUnit.SECONDS).build()).buildAsync();
            } catch (Exception e) {
                logger.error("Couldn't connect with SSL enabled: {}", e.getMessage());
            }
        } else {
            this.client = serverPort.buildAsync();
        }
        if (property2 == null || property3 == null) {
            this.client.connect().whenComplete((mqtt3ConnAck, th) -> {
                if (th != null) {
                    logger.error("Something went wrong while connecting: {}", th.getMessage());
                } else {
                    logger.info("Connected to MQTT broker {}", property);
                    this.connected = true;
                }
            });
        } else {
            ((CompletableFuture) ((Mqtt3ConnectBuilder.Send) this.client.connectWith().simpleAuth().username(property2).password(property3.getBytes()).applySimpleAuth()).send()).whenComplete((mqtt3ConnAck2, th2) -> {
                if (th2 != null) {
                    logger.error("Something went wrong while connecting: {}", th2.getMessage());
                } else {
                    logger.debug("Connected to MQTT broker {}", property);
                    this.connected = true;
                }
            });
        }
    }

    private void listenForParser(BundleContext bundleContext) {
        try {
            bundleContext.addServiceListener(serviceEvent -> {
                ServiceReference serviceReference = serviceEvent.getServiceReference();
                String str = (String) serviceReference.getProperty("parserID");
                ParserService parserService = (ParserService) bundleContext.getService(serviceReference);
                if (serviceEvent.getType() == 4) {
                    logger.info("{} unregistering, removing Parser", parserService.getClass().getName());
                    PARSERS.remove(str);
                } else {
                    logger.info("{} changed, updating Parser", parserService.getClass().getName());
                    PARSERS.put(str, parserService);
                }
            }, "(objectClass=" + ParserService.class.getName() + ')');
        } catch (InvalidSyntaxException e) {
            logger.error("Service listener can't be added to framework", e);
        }
    }

    void publish(byte[] bArr) {
        logger.info(new String(bArr));
        if (this.connected) {
            ((CompletableFuture) this.client.publishWith().topic(this.topic).payload(bArr).send()).whenComplete((mqtt3Publish, th) -> {
                if (th != null) {
                    logger.debug("A message could not be sent. Adding message to buffer");
                    MESSAGE_BUFFER.add(bArr);
                }
            });
        } else {
            logger.debug("Not connected to broker yet. Adding message to buffer");
            MESSAGE_BUFFER.add(bArr);
        }
    }
}
