package com.kevin.flink.streaming.connectors.mqtt;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.kevin.flink.streaming.connectors.mqtt.internal.Retry;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple9;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/kevin/flink/streaming/connectors/mqtt/CachedMQTTClient.class */
public class CachedMQTTClient {
    private static CachedMQTTClient instance;
    private static final Logger LOG = LoggerFactory.getLogger(CachedMQTTClient.class);
    private static LoadingCache<List<Tuple2<String, String>>, Tuple2<MqttAsyncClient, Tuple9<String, String, String, MqttClientPersistence, MqttConnectOptions, Integer, Long, Long, Integer>>> cache = null;

    public static CachedMQTTClient getInstance() throws Exception {
        if (instance == null) {
            synchronized (CachedMQTTClient.class) {
                if (instance == null) {
                    instance = new CachedMQTTClient();
                }
            }
        }
        return instance;
    }

    private CachedMQTTClient() {
        try {
            cache = CacheBuilder.newBuilder().expireAfterAccess(Long.valueOf(Long.parseLong(System.getProperty("flink.mqtt.connection.cache.timeout", "600000"))).longValue(), TimeUnit.MILLISECONDS).removalListener(new RemovalListener<List<Tuple2<String, String>>, Tuple2<MqttAsyncClient, Tuple9<String, String, String, MqttClientPersistence, MqttConnectOptions, Integer, Long, Long, Integer>>>() { // from class: com.kevin.flink.streaming.connectors.mqtt.CachedMQTTClient.2
                public void onRemoval(RemovalNotification<List<Tuple2<String, String>>, Tuple2<MqttAsyncClient, Tuple9<String, String, String, MqttClientPersistence, MqttConnectOptions, Integer, Long, Long, Integer>>> removalNotification) {
                    List list = (List) removalNotification.getKey();
                    MqttAsyncClient mqttAsyncClient = (MqttAsyncClient) ((Tuple2) removalNotification.getValue()).f0;
                    Tuple9 tuple9 = (Tuple9) ((Tuple2) removalNotification.getValue()).f1;
                    CachedMQTTClient.LOG.debug(String.format("Evicting MQTT client %s params: %s, due to %s", mqttAsyncClient, list, removalNotification.getCause()));
                    CachedMQTTClient.this.closeMqttClient(mqttAsyncClient, (MqttClientPersistence) tuple9.f3);
                }
            }).build(new CacheLoader<List<Tuple2<String, String>>, Tuple2<MqttAsyncClient, Tuple9<String, String, String, MqttClientPersistence, MqttConnectOptions, Integer, Long, Long, Integer>>>() { // from class: com.kevin.flink.streaming.connectors.mqtt.CachedMQTTClient.1
                public Tuple2<MqttAsyncClient, Tuple9<String, String, String, MqttClientPersistence, MqttConnectOptions, Integer, Long, Long, Integer>> load(List<Tuple2<String, String>> list) throws Exception {
                    CachedMQTTClient.LOG.debug(String.format("Creating new MQTT client with params: %s", list));
                    HashMap hashMap = new HashMap();
                    for (Tuple2<String, String> tuple2 : list) {
                        hashMap.put((String) tuple2.f0, (String) tuple2.f1);
                    }
                    return CachedMQTTClient.this.createMqttClient(hashMap);
                }
            });
        } catch (Exception e) {
            LOG.error("MQTT CachedMQTTClient Initializes error: ", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Tuple2<MqttAsyncClient, Tuple9<String, String, String, MqttClientPersistence, MqttConnectOptions, Integer, Long, Long, Integer>> createMqttClient(Map<String, String> map) throws Exception {
        Tuple9<String, String, String, MqttClientPersistence, MqttConnectOptions, Integer, Long, Long, Integer> parseConfigParams = MQTTUtils.parseConfigParams(map);
        String str = (String) parseConfigParams.f0;
        String str2 = (String) parseConfigParams.f1;
        MqttClientPersistence mqttClientPersistence = (MqttClientPersistence) parseConfigParams.f3;
        MqttConnectOptions mqttConnectOptions = (MqttConnectOptions) parseConfigParams.f4;
        MqttAsyncClient mqttAsyncClient = new MqttAsyncClient(str, str2, mqttClientPersistence);
        mqttAsyncClient.setCallback(new MqttCallbackExtended() { // from class: com.kevin.flink.streaming.connectors.mqtt.CachedMQTTClient.3
            public void messageArrived(String str3, MqttMessage mqttMessage) {
            }

            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
            }

            public void connectionLost(Throwable th) {
                CachedMQTTClient.LOG.warn("Connection to mqtt server lost.", th);
            }

            public void connectComplete(boolean z, String str3) {
                CachedMQTTClient.LOG.info(String.format("Connect complete %s. Is it a reconnect?: %s", str3, Boolean.valueOf(z)));
            }
        });
        Retry.apply(Integer.valueOf(System.getProperty("flink.mqtt.client.publish.attempts", "-1")), Long.valueOf(System.getProperty("flink.mqtt.client.publish.backoff", "5000")), new Class[]{MqttException.class}, Optional.empty(), optional -> {
            try {
                mqttAsyncClient.connect(mqttConnectOptions);
            } catch (MqttException e) {
                LOG.error("MQTT MQTTStreamSource connect MqttException: ", e);
            } catch (MqttSecurityException e2) {
                LOG.error("MQTT MQTTStreamSource connect MqttSecurityException: ", e2);
            }
            return Optional.of(true);
        });
        return new Tuple2<>(mqttAsyncClient, parseConfigParams);
    }

    public void closeMqttClient(MqttAsyncClient mqttAsyncClient, MqttClientPersistence mqttClientPersistence) {
        try {
            if (mqttAsyncClient.isConnected()) {
                mqttAsyncClient.disconnect();
            }
            try {
                mqttClientPersistence.close();
            } catch (Throwable th) {
                LOG.warn(String.format("Error while closing MQTT persistent store %s", th.getMessage()), th);
            }
            mqttAsyncClient.close();
        } catch (Throwable th2) {
            LOG.warn(String.format("Error while closing MQTT client %s", th2.getMessage()), th2);
        }
    }

    public Tuple2<MqttAsyncClient, Tuple9<String, String, String, MqttClientPersistence, MqttConnectOptions, Integer, Long, Long, Integer>> getOrCreate(Map<String, String> map) throws Exception {
        return (Tuple2) cache.get(mapToSeq(map));
    }

    public void close(Map<String, String> map) {
        cache.invalidate(mapToSeq(map));
    }

    public void clear() {
        LOG.debug("Cleaning MQTT client cache");
        cache.invalidateAll();
    }

    public List<Tuple2<String, String>> mapToSeq(Map<String, String> map) {
        return (List) map.entrySet().stream().sorted(Comparator.comparing(entry -> {
            return (String) entry.getKey();
        })).map(entry2 -> {
            return new Tuple2((String) entry2.getKey(), (String) entry2.getValue());
        }).collect(Collectors.toList());
    }
}
