package com.github.cm.heclouds.adapter.mqttadapter;

import com.github.cm.heclouds.adapter.config.Config;
import com.github.cm.heclouds.adapter.mqttadapter.mqtt.MqttFixedHeaders;
import com.github.cm.heclouds.adapter.mqttadapter.mqtt.MqttSubscribeFuture;
import com.github.cm.heclouds.adapter.mqttadapter.mqtt.MqttSubscription;
import com.github.cm.heclouds.adapter.mqttadapter.mqtt.PromiseCanceller;
import com.github.cm.heclouds.adapter.mqttadapter.mqtt.promise.MqttConnect;
import com.github.cm.heclouds.adapter.mqttadapter.mqtt.promise.MqttConnectPromise;
import com.github.cm.heclouds.adapter.mqttadapter.mqtt.promise.MqttConnectResult;
import com.github.cm.heclouds.adapter.mqttadapter.mqtt.promise.MqttSubscribePromise;
import com.github.cm.heclouds.adapter.mqttadapter.mqtt.promise.MqttUnsubscribePromise;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import java.util.List;
import java.util.concurrent.ExecutionException;

/* loaded from: input_file:com/github/cm/heclouds/adapter/mqttadapter/MqttClient.class */
public final class MqttClient {
    private Channel channel;

    public MqttClient(Config config) {
        this.channel = MqttClientFactory.initializeNettyClient(config);
    }

    public MqttConnectResult connect(String str, String str2, String str3) throws ExecutionException, InterruptedException {
        MqttConnect mqttConnect = new MqttConnect();
        mqttConnect.setClientId(str.getBytes());
        mqttConnect.setUsername(str2);
        mqttConnect.setPassword(str3);
        return (MqttConnectResult) writeAndFlush(new MqttConnectPromise(mqttConnect, this.channel.eventLoop())).get();
    }

    public MqttSubscribeFuture subscribe(List<MqttSubscription> list) {
        return writeAndFlush(new MqttSubscribePromise(this.channel.eventLoop(), list));
    }

    public Future<MqttUnsubAckMessage> unsubscribe(List<String> list) {
        return writeAndFlush(new MqttUnsubscribePromise(this.channel.eventLoop(), list));
    }

    public Future<Void> disconnect() throws InterruptedException {
        ChannelFuture writeAndFlush = this.channel.writeAndFlush(new MqttMessage(MqttFixedHeaders.DISCONNECT_HEADER));
        this.channel.closeFuture().sync();
        this.channel.eventLoop().shutdownGracefully();
        return writeAndFlush;
    }

    private synchronized <P extends Promise<V>, V> P writeAndFlush(P p) {
        this.channel.writeAndFlush(p).addListener(new PromiseCanceller(p));
        return p;
    }

    public Channel getChannel() {
        return this.channel;
    }
}
