package de.smartsquare.starter.mqtt;

import com.hivemq.client.mqtt.MqttClientState;
import com.hivemq.client.mqtt.datatypes.MqttUtf8String;
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import com.hivemq.client.mqtt.mqtt5.message.auth.Mqtt5SimpleAuth;
import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect;
import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import de.smartsquare.starter.mqtt.MqttSubscriberCollector;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import kotlin.Metadata;
import kotlin.collections.CollectionsKt;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.SourceDebugExtension;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* compiled from: MqttConnector.kt */
@Metadata(mv = {1, 9, 0}, k = 1, xi = 48, d1 = {"��T\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0010\u000b\n��\n\u0002\u0010\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\u0018��2\u00020\u0001B%\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007\u0012\u0006\u0010\b\u001a\u00020\t¢\u0006\u0002\u0010\nJ\u000e\u0010\u0010\u001a\b\u0012\u0004\u0012\u00020\u00120\u0011H\u0002J\b\u0010\u0013\u001a\u00020\u0014H\u0016J\b\u0010\u0015\u001a\u00020\u0016H\u0016J\u0010\u0010\u0017\u001a\u00020\u00162\u0006\u0010\u0018\u001a\u00020\u0019H\u0016J\f\u0010\u001a\u001a\u0006\u0012\u0002\b\u00030\u0011H\u0002R\u0013\u0010\u0002\u001a\u00070\u000b¢\u0006\u0002\b\fX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\b\u001a\u00020\tX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0006\u001a\u00020\u0007X\u0082\u0004¢\u0006\u0002\n��R\u0016\u0010\r\u001a\n \u000f*\u0004\u0018\u00010\u000e0\u000eX\u0082\u0004¢\u0006\u0002\n��¨\u0006\u001b"}, d2 = {"Lde/smartsquare/starter/mqtt/Mqtt5Connector;", "Lde/smartsquare/starter/mqtt/MqttConnector;", "client", "Lcom/hivemq/client/mqtt/mqtt5/Mqtt5Client;", "collector", "Lde/smartsquare/starter/mqtt/MqttSubscriberCollector;", "handler", "Lde/smartsquare/starter/mqtt/MqttHandler;", "config", "Lde/smartsquare/starter/mqtt/MqttProperties;", "(Lcom/hivemq/client/mqtt/mqtt5/Mqtt5Client;Lde/smartsquare/starter/mqtt/MqttSubscriberCollector;Lde/smartsquare/starter/mqtt/MqttHandler;Lde/smartsquare/starter/mqtt/MqttProperties;)V", "Lcom/hivemq/client/mqtt/mqtt5/Mqtt5AsyncClient;", "Lorg/jetbrains/annotations/NotNull;", "logger", "Lorg/slf4j/Logger;", "kotlin.jvm.PlatformType", "connect", "Ljava/util/concurrent/CompletableFuture;", "Lcom/hivemq/client/mqtt/mqtt5/message/connect/connack/Mqtt5ConnAck;", "isRunning", "", "start", "", "stop", "callback", "Ljava/lang/Runnable;", "subscribe", "mqtt-starter"})
@SourceDebugExtension({"SMAP\nMqttConnector.kt\nKotlin\n*S Kotlin\n*F\n+ 1 MqttConnector.kt\nde/smartsquare/starter/mqtt/Mqtt5Connector\n+ 2 _Collections.kt\nkotlin/collections/CollectionsKt___CollectionsKt\n+ 3 ArraysJVM.kt\nkotlin/collections/ArraysKt__ArraysJVMKt\n*L\n1#1,161:1\n1549#2:162\n1620#2,3:163\n37#3,2:166\n*S KotlinDebug\n*F\n+ 1 MqttConnector.kt\nde/smartsquare/starter/mqtt/Mqtt5Connector\n*L\n122#1:162\n122#1:163,3\n133#1:166,2\n*E\n"})
/* loaded from: input_file:de/smartsquare/starter/mqtt/Mqtt5Connector.class */
public final class Mqtt5Connector extends MqttConnector {

    @NotNull
    private final MqttSubscriberCollector collector;

    @NotNull
    private final MqttHandler handler;

    @NotNull
    private final MqttProperties config;
    private final Logger logger;

    @NotNull
    private final Mqtt5AsyncClient client;

    public Mqtt5Connector(@NotNull Mqtt5Client mqtt5Client, @NotNull MqttSubscriberCollector mqttSubscriberCollector, @NotNull MqttHandler mqttHandler, @NotNull MqttProperties mqttProperties) {
        Intrinsics.checkNotNullParameter(mqtt5Client, "client");
        Intrinsics.checkNotNullParameter(mqttSubscriberCollector, "collector");
        Intrinsics.checkNotNullParameter(mqttHandler, "handler");
        Intrinsics.checkNotNullParameter(mqttProperties, "config");
        this.collector = mqttSubscriberCollector;
        this.handler = mqttHandler;
        this.config = mqttProperties;
        this.logger = LoggerFactory.getLogger(getClass());
        Mqtt5AsyncClient async = mqtt5Client.toAsync();
        Intrinsics.checkNotNullExpressionValue(async, "toAsync(...)");
        this.client = async;
    }

    public void start() {
        try {
            CompletableFuture.allOf(subscribe(), connect()).get(this.config.getConnectTimeout(), TimeUnit.MILLISECONDS);
        } catch (TimeoutException e) {
            throw new MqttBrokerConnectException("Timeout while connecting to broker", e);
        }
    }

    private final CompletableFuture<?> subscribe() {
        List<MqttSubscriberCollector.ResolvedMqttSubscriber> subscribers = this.collector.getSubscribers();
        ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(subscribers, 10));
        for (MqttSubscriberCollector.ResolvedMqttSubscriber resolvedMqttSubscriber : subscribers) {
            arrayList.add(this.client.subscribeWith().topicFilter(resolvedMqttSubscriber.getTopic()).qos(resolvedMqttSubscriber.getQos()).callback((v1) -> {
                subscribe$lambda$2$lambda$0(r1, v1);
            }).send().exceptionallyCompose(Mqtt5Connector::subscribe$lambda$2$lambda$1));
        }
        CompletableFuture[] completableFutureArr = (CompletableFuture[]) arrayList.toArray(new CompletableFuture[0]);
        CompletableFuture<Void> allOf = CompletableFuture.allOf((CompletableFuture[]) Arrays.copyOf(completableFutureArr, completableFutureArr.length));
        Intrinsics.checkNotNullExpressionValue(allOf, "allOf(...)");
        return allOf;
    }

    private final CompletableFuture<Mqtt5ConnAck> connect() {
        String serverHost = this.client.getConfig().getServerHost();
        Intrinsics.checkNotNullExpressionValue(serverHost, "getServerHost(...)");
        int serverPort = this.client.getConfig().getServerPort();
        Optional simpleAuth = this.client.getConfig().getSimpleAuth();
        Mqtt5Connector$connect$username$1 mqtt5Connector$connect$username$1 = new Function1<Mqtt5SimpleAuth, Optional<? extends MqttUtf8String>>() { // from class: de.smartsquare.starter.mqtt.Mqtt5Connector$connect$username$1
            public final Optional<? extends MqttUtf8String> invoke(Mqtt5SimpleAuth mqtt5SimpleAuth) {
                return mqtt5SimpleAuth.getUsername();
            }
        };
        MqttUtf8String mqttUtf8String = (MqttUtf8String) simpleAuth.flatMap((v1) -> {
            return connect$lambda$3(r1, v1);
        }).orElseGet(Mqtt5Connector::connect$lambda$4);
        String obj = mqttUtf8String != null ? mqttUtf8String.toString() : null;
        Mqtt5Connect build = Mqtt5Connect.builder().cleanStart(this.config.getClean()).build();
        Intrinsics.checkNotNullExpressionValue(build, "build(...)");
        this.logger.info("Connecting to " + (obj != null ? obj + "@" : "") + serverHost + ":" + serverPort + " using mqtt 3...");
        CompletableFuture<Mqtt5ConnAck> exceptionallyCompose = this.client.connect(build).exceptionallyCompose((v2) -> {
            return connect$lambda$5(r1, r2, v2);
        });
        Intrinsics.checkNotNullExpressionValue(exceptionallyCompose, "exceptionallyCompose(...)");
        return exceptionallyCompose;
    }

    @Override // de.smartsquare.starter.mqtt.MqttConnector
    public void stop(@NotNull Runnable runnable) {
        Intrinsics.checkNotNullParameter(runnable, "callback");
        this.client.disconnect().thenRun(runnable);
    }

    public boolean isRunning() {
        return this.client.getState() != MqttClientState.DISCONNECTED;
    }

    private static final void subscribe$lambda$2$lambda$0(Mqtt5Connector mqtt5Connector, Mqtt5Publish mqtt5Publish) {
        Intrinsics.checkNotNullParameter(mqtt5Connector, "this$0");
        MqttHandler mqttHandler = mqtt5Connector.handler;
        Intrinsics.checkNotNull(mqtt5Publish);
        mqttHandler.handle(Mqtt5PublishContainer.m16boximpl(Mqtt5PublishContainer.m15constructorimpl(mqtt5Publish)));
    }

    private static final CompletionStage subscribe$lambda$2$lambda$1(Throwable th) {
        return CompletableFuture.failedFuture(new MqttBrokerConnectException("Failed to subscribe", th));
    }

    private static final Optional connect$lambda$3(Function1 function1, Object obj) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        return (Optional) function1.invoke(obj);
    }

    private static final MqttUtf8String connect$lambda$4() {
        return null;
    }

    private static final CompletionStage connect$lambda$5(String str, int i, Throwable th) {
        Intrinsics.checkNotNullParameter(str, "$host");
        return CompletableFuture.failedFuture(new MqttBrokerConnectException("Failed to connect to broker " + str + ":" + i, th));
    }
}
