package org.openremote.agent.protocol.mqtt;

import com.hivemq.client.mqtt.MqttClient;
import com.hivemq.client.mqtt.MqttClientSslConfig;
import com.hivemq.client.mqtt.MqttClientSslConfigBuilder;
import com.hivemq.client.mqtt.MqttClientState;
import com.hivemq.client.mqtt.exceptions.ConnectionClosedException;
import com.hivemq.client.mqtt.exceptions.ConnectionFailedException;
import com.hivemq.client.mqtt.lifecycle.MqttDisconnectSource;
import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient;
import com.hivemq.client.mqtt.mqtt3.Mqtt3ClientBuilder;
import com.hivemq.client.mqtt.mqtt3.exceptions.Mqtt3ConnAckException;
import com.hivemq.client.mqtt.mqtt3.exceptions.Mqtt3DisconnectException;
import com.hivemq.client.mqtt.mqtt3.lifecycle.Mqtt3ClientDisconnectedContext;
import com.hivemq.client.mqtt.mqtt3.message.connect.Mqtt3ConnectBuilder;
import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManagerFactory;
import org.openremote.agent.protocol.io.AbstractNettyIOClient;
import org.openremote.agent.protocol.io.IOClient;
import org.openremote.container.Container;
import org.openremote.model.asset.agent.ConnectionStatus;
import org.openremote.model.auth.UsernamePassword;
import org.openremote.model.syslog.SyslogCategory;
import org.openremote.model.util.UniqueIdentifierGenerator;
import org.openremote.model.util.ValueUtil;

/* loaded from: input_file:org/openremote/agent/protocol/mqtt/AbstractMQTT_IOClient.class */
public abstract class AbstractMQTT_IOClient<S> implements IOClient<MQTTMessage<S>> {
    public static final Logger LOG = SyslogCategory.getLogger(SyslogCategory.PROTOCOL, AbstractMQTT_IOClient.class);
    protected String clientId;
    protected String host;
    protected int port;
    protected boolean secure;
    protected boolean cleanSession;
    protected UsernamePassword usernamePassword;
    protected URI websocketURI;
    protected Mqtt3AsyncClient client;
    protected final Set<Consumer<ConnectionStatus>> connectionStatusConsumers;
    protected final ConcurrentMap<String, Set<Consumer<MQTTMessage<S>>>> topicConsumerMap;
    protected ScheduledExecutorService executorService;
    protected boolean disconnected;
    protected final AtomicBoolean connected;
    protected Consumer<String> topicSubscribeFailureConsumer;
    protected ConnectionStatus currentStatus;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractMQTT_IOClient(String str, int i, boolean z, boolean z2, UsernamePassword usernamePassword, URI uri, MQTTLastWill mQTTLastWill, KeyManagerFactory keyManagerFactory, TrustManagerFactory trustManagerFactory) {
        this(UniqueIdentifierGenerator.generateId(), str, i, z, z2, usernamePassword, uri, mQTTLastWill, keyManagerFactory, trustManagerFactory);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractMQTT_IOClient(String str, String str2, int i, boolean z, boolean z2, UsernamePassword usernamePassword, URI uri, MQTTLastWill mQTTLastWill, KeyManagerFactory keyManagerFactory, TrustManagerFactory trustManagerFactory) {
        this.connectionStatusConsumers = new CopyOnWriteArraySet();
        this.topicConsumerMap = new ConcurrentHashMap();
        this.disconnected = true;
        this.connected = new AtomicBoolean(false);
        this.clientId = str;
        this.host = str2;
        this.port = i;
        this.secure = z;
        this.cleanSession = z2;
        this.usernamePassword = usernamePassword;
        this.websocketURI = uri;
        this.executorService = Container.SCHEDULED_EXECUTOR;
        Mqtt3ClientBuilder mqtt3ClientBuilder = (Mqtt3ClientBuilder) MqttClient.builder().useMqttVersion3().identifier(str).addConnectedListener(mqttClientConnectedContext -> {
            onConnectionStatusChanged(null);
        }).addDisconnectedListener(mqttClientDisconnectedContext -> {
            boolean z3 = mqttClientDisconnectedContext.getSource() == MqttDisconnectSource.USER;
            if (this.disconnected) {
                mqttClientDisconnectedContext.getReconnector().reconnect(false);
            } else if (this.usernamePassword != null) {
                ((Mqtt3ConnectBuilder.Nested) ((Mqtt3ClientDisconnectedContext) mqttClientDisconnectedContext).getReconnector().connectWith().simpleAuth().username(usernamePassword.getUsername()).password(usernamePassword.getPassword().getBytes()).applySimpleAuth()).applyConnect();
            }
            if (mqttClientDisconnectedContext.getCause() instanceof Mqtt3DisconnectException) {
                LOG.info("Client disconnect '" + getClientUri() + "': initiator=" + String.valueOf(mqttClientDisconnectedContext.getSource()));
            } else if (mqttClientDisconnectedContext.getCause() instanceof Mqtt3ConnAckException) {
                LOG.info("Connection rejected by the broker '" + getClientUri() + "': reasonCode=" + String.valueOf(mqttClientDisconnectedContext.getCause().getMqttMessage().getReturnCode()) + ", initiator=" + String.valueOf(mqttClientDisconnectedContext.getSource()));
            } else if (mqttClientDisconnectedContext.getCause() instanceof ConnectionClosedException) {
                LOG.info("Connection closed by " + String.valueOf(mqttClientDisconnectedContext.getSource()) + " '" + getClientUri() + "': initiator=" + String.valueOf(mqttClientDisconnectedContext.getSource()));
            } else if (mqttClientDisconnectedContext.getCause() instanceof ConnectionFailedException) {
                LOG.log(Level.INFO, "Connection failed '" + getClientUri() + "': initiator=" + String.valueOf(mqttClientDisconnectedContext.getSource()), mqttClientDisconnectedContext.getCause());
            }
            onConnectionStatusChanged(ConnectionStatus.DISCONNECTED);
        }).automaticReconnect().initialDelay(AbstractNettyIOClient.RECONNECT_DELAY_INITIAL_MILLIS, TimeUnit.MILLISECONDS).maxDelay(AbstractNettyIOClient.RECONNECT_DELAY_MAX_MILLIS, TimeUnit.MILLISECONDS).applyAutomaticReconnect();
        if (z) {
            MqttClientSslConfigBuilder builder = MqttClientSslConfig.builder();
            builder = keyManagerFactory != null ? (MqttClientSslConfigBuilder) builder.keyManagerFactory(keyManagerFactory) : builder;
            mqtt3ClientBuilder = (Mqtt3ClientBuilder) mqtt3ClientBuilder.sslConfig((trustManagerFactory != null ? (MqttClientSslConfigBuilder) builder.trustManagerFactory(trustManagerFactory) : builder).build());
        }
        Mqtt3ClientBuilder serverPort = uri != null ? (Mqtt3ClientBuilder) mqtt3ClientBuilder.serverHost(uri.getHost()).serverPort(uri.getPort()).webSocketConfig().serverPath(uri.getPath()).queryString(uri.getQuery()).applyWebSocketConfig() : mqtt3ClientBuilder.serverHost(str2).serverPort(i);
        if (mQTTLastWill != null) {
            serverPort.willPublish().topic(mQTTLastWill.getTopic()).payload(((String) ValueUtil.getStringCoerced(mQTTLastWill.getPayload()).orElse("")).getBytes()).retain(mQTTLastWill.isRetain()).applyWillPublish();
        }
        try {
            this.client = serverPort.buildAsync();
        } catch (Exception e) {
            LOG.log(Level.WARNING, "Invalid MQTT client config for client '" + getClientUri() + "'", (Throwable) e);
            this.client = null;
        }
    }

    @Override // org.openremote.agent.protocol.io.IOClient
    public void sendMessage(MQTTMessage<S> mQTTMessage) {
        if (this.client == null) {
            LOG.info("Cannot send message as client is invalid: " + getClientUri());
        } else if (getConnectionStatus() != ConnectionStatus.CONNECTED) {
            LOG.info("Cannot send message as client is not connected: " + getClientUri());
        } else {
            ((CompletableFuture) this.client.publishWith().topic(mQTTMessage.topic).payload(messageToBytes(mQTTMessage.payload)).send()).whenComplete((mqtt3Publish, th) -> {
                if (th != null) {
                    LOG.log(Level.INFO, "Failed to publish to MQTT broker '" + getClientUri() + "'", th);
                } else {
                    LOG.finest("Published message to MQTT broker '" + getClientUri() + "'");
                }
            });
        }
    }

    @Override // org.openremote.agent.protocol.io.IOClient
    public void addMessageConsumer(Consumer<MQTTMessage<S>> consumer) {
        addMessageConsumer("#", consumer);
    }

    public boolean addMessageConsumer(String str, Consumer<MQTTMessage<S>> consumer) {
        if (this.client == null) {
            return false;
        }
        Set<Consumer<MQTTMessage<S>>> computeIfAbsent = this.topicConsumerMap.computeIfAbsent(str, str2 -> {
            return new HashSet();
        });
        if (!computeIfAbsent.isEmpty()) {
            computeIfAbsent.add(consumer);
            return true;
        }
        if (doClientSubscription(str, computeIfAbsent)) {
            computeIfAbsent.add(consumer);
            return true;
        }
        this.topicConsumerMap.remove(str);
        return false;
    }

    public void setTopicSubscribeFailureConsumer(Consumer<String> consumer) {
        this.topicSubscribeFailureConsumer = consumer;
    }

    protected void onSubscribeFailed(String str) {
        if (this.topicSubscribeFailureConsumer != null) {
            this.topicSubscribeFailureConsumer.accept(str);
        }
    }

    protected boolean doClientSubscription(String str, Set<Consumer<MQTTMessage<S>>> set) {
        synchronized (this.connected) {
            if (!this.connected.get()) {
                return true;
            }
            Consumer consumer = mQTTMessage -> {
                if (this.topicConsumerMap.containsKey(str)) {
                    set.forEach(consumer2 -> {
                        try {
                            consumer2.accept(mQTTMessage);
                        } catch (Exception e) {
                            LOG.log(Level.WARNING, "Message consumer threw an exception", (Throwable) e);
                        }
                    });
                }
            };
            try {
                LOG.fine("Subscribed to topic '" + str + "' on client '" + getClientUri() + "'");
                return true;
            } catch (Exception e) {
                LOG.log(Level.WARNING, "Failed to subscribe to topic '" + str + "' on client '" + getClientUri() + "': " + e.getMessage());
                this.executorService.execute(() -> {
                    onSubscribeFailed(str);
                });
                return false;
            }
        }
    }

    @Override // org.openremote.agent.protocol.io.IOClient
    public void removeMessageConsumer(Consumer<MQTTMessage<S>> consumer) {
        removeMessageConsumer("#", consumer);
    }

    public void removeMessageConsumer(String str, Consumer<MQTTMessage<S>> consumer) {
        this.topicConsumerMap.computeIfPresent(str, (str2, set) -> {
            if (!set.remove(consumer) || !set.isEmpty()) {
                return set;
            }
            removeSubscription(str);
            return null;
        });
    }

    @Override // org.openremote.agent.protocol.io.IOClient
    public void removeAllMessageConsumers() {
        HashSet hashSet = new HashSet(this.topicConsumerMap.keySet());
        this.topicConsumerMap.clear();
        hashSet.forEach(this::removeSubscription);
    }

    protected void removeSubscription(String str) {
        if (this.client != null) {
            ((CompletableFuture) this.client.unsubscribeWith().topicFilter(str).send()).whenComplete((r7, th) -> {
                if (th != null) {
                    LOG.log(Level.WARNING, "Failed to unsubscribe to topic '" + str + "' on client '" + getClientUri() + "'", th);
                } else {
                    LOG.fine("Unsubscribed from topic '" + str + "' on client '" + getClientUri() + "'");
                }
            });
        }
    }

    @Override // org.openremote.agent.protocol.io.IOClient
    public void addConnectionStatusConsumer(Consumer<ConnectionStatus> consumer) {
        this.connectionStatusConsumers.add(consumer);
    }

    @Override // org.openremote.agent.protocol.io.IOClient
    public void removeConnectionStatusConsumer(Consumer<ConnectionStatus> consumer) {
        this.connectionStatusConsumers.remove(consumer);
    }

    @Override // org.openremote.agent.protocol.io.IOClient
    public void removeAllConnectionStatusConsumers() {
        this.connectionStatusConsumers.clear();
    }

    @Override // org.openremote.agent.protocol.io.IOClient
    public ConnectionStatus getConnectionStatus() {
        if (this.client == null) {
            return ConnectionStatus.ERROR;
        }
        MqttClientState state = this.client.getState();
        return state == MqttClientState.CONNECTED ? ConnectionStatus.CONNECTED : state == MqttClientState.DISCONNECTED ? ConnectionStatus.DISCONNECTED : ConnectionStatus.WAITING;
    }

    @Override // org.openremote.agent.protocol.io.IOClient
    public void connect() {
        synchronized (this) {
            if (this.client == null) {
                return;
            }
            if (!this.disconnected) {
                LOG.finest("Must be disconnected before calling connect: " + getClientUri());
                return;
            }
            LOG.fine("Connecting MQTT Client: " + getClientUri());
            this.disconnected = false;
            LOG.info("Establishing connection: " + getClientUri());
            Mqtt3ConnectBuilder.Send keepAlive = this.client.connectWith().cleanSession(true).keepAlive(5);
            if (this.usernamePassword != null) {
                keepAlive = (Mqtt3ConnectBuilder.Send) keepAlive.simpleAuth().username(this.usernamePassword.getUsername()).password(this.usernamePassword.getPassword().getBytes()).applySimpleAuth();
            }
            ((CompletableFuture) keepAlive.send()).whenComplete((mqtt3ConnAck, th) -> {
                if (th != null) {
                    LOG.log(Level.INFO, "Connection failed:" + getClientUri(), th.getMessage());
                    return;
                }
                synchronized (this.connected) {
                    this.connected.set(true);
                    if (!this.cleanSession && !mqtt3ConnAck.isSessionPresent()) {
                        HashMap hashMap = new HashMap(this.topicConsumerMap);
                        this.executorService.execute(() -> {
                            hashMap.forEach((str, set) -> {
                                if (doClientSubscription(str, set)) {
                                    return;
                                }
                                this.topicConsumerMap.remove(str);
                            });
                        });
                    }
                }
            });
        }
    }

    protected void onConnectionStatusChanged(ConnectionStatus connectionStatus) {
        ConnectionStatus connectionStatus2 = connectionStatus != null ? connectionStatus : getConnectionStatus();
        if (this.currentStatus == connectionStatus2) {
            return;
        }
        this.currentStatus = connectionStatus2;
        LOG.info("Client '" + getClientUri() + "' connection status changed: " + String.valueOf(connectionStatus2));
        this.connectionStatusConsumers.forEach(consumer -> {
            try {
                consumer.accept(connectionStatus2);
            } catch (Exception e) {
                LOG.log(Level.WARNING, "Connection status change handler threw an exception: " + getClientUri(), (Throwable) e);
            }
        });
    }

    @Override // org.openremote.agent.protocol.io.IOClient
    public void disconnect() {
        if (this.client == null) {
            return;
        }
        synchronized (this) {
            if (this.disconnected) {
                LOG.finest("Already disconnected: " + getClientUri());
                return;
            }
            LOG.finest("Disconnecting IO client: " + getClientUri());
            this.disconnected = true;
            this.client.disconnect().whenComplete((r6, th) -> {
                this.connected.set(false);
                if (th != null) {
                    LOG.log(Level.INFO, "Disconnect error '" + getClientUri() + "':" + th.getMessage());
                }
                if (this.cleanSession) {
                    removeAllMessageConsumers();
                }
                onConnectionStatusChanged(ConnectionStatus.DISCONNECTED);
            });
        }
    }

    @Override // org.openremote.agent.protocol.io.IOClient
    public String getClientUri() {
        if (this.websocketURI != null) {
            return "mqtt_" + String.valueOf(this.websocketURI) + "?clientId=" + this.clientId;
        }
        return "mqtt" + (this.secure ? "s://" : "://") + this.host + ":" + this.port + "/?clientId=" + this.clientId;
    }

    public abstract byte[] messageToBytes(S s);

    public abstract S messageFromBytes(byte[] bArr);
}
