package com.kevin.flink.streaming.connectors.mqtt;

import com.kevin.flink.streaming.connectors.mqtt.internal.Retry;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple9;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/kevin/flink/streaming/connectors/mqtt/MQTTStreamSink.class */
public class MQTTStreamSink extends RichSinkFunction<MQTTMessage> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(MQTTStreamSink.class);
    private Map<String, String> config;
    private LocalMessageStore<MQTTMessage> store;
    private SimpleObjectSerializer<MQTTMessage> objectSerializer;

    public MQTTStreamSink(Map<String, String> map) {
        this.config = map;
    }

    public void invoke(MQTTMessage mQTTMessage, SinkFunction.Context context) throws Exception {
        Integer valueOf = Integer.valueOf(System.getProperty("flink.mqtt.client.publish.attempts", "-1"));
        Long valueOf2 = Long.valueOf(System.getProperty("flink.mqtt.client.publish.backoff", "5000"));
        Tuple2<MqttAsyncClient, Tuple9<String, String, String, MqttClientPersistence, MqttConnectOptions, Integer, Long, Long, Integer>> orCreate = CachedMQTTClient.getInstance().getOrCreate(this.config);
        MqttAsyncClient mqttAsyncClient = (MqttAsyncClient) orCreate.f0;
        Tuple9 tuple9 = (Tuple9) orCreate.f1;
        String str = (String) tuple9.f2;
        Integer num = (Integer) tuple9.f5;
        Long valueOf3 = Long.valueOf(mQTTMessage.getMessageId());
        this.store.store(valueOf3, mQTTMessage);
        if (((Boolean) Retry.apply(valueOf, valueOf2, new Class[]{MqttException.class}, valueOf3, l -> {
            try {
                MQTTMessage retrieve = this.store.retrieve(l);
                if (!mqttAsyncClient.isConnected()) {
                    return Optional.of(false);
                }
                mqttAsyncClient.publish(str, retrieve.getPayload(), num.intValue(), false);
                return Optional.of(true);
            } catch (Exception e) {
                LOG.error("MQTT client publish error: ", e);
                return Optional.of(false);
            }
        }).orElse(false)).booleanValue()) {
            this.store.remove(valueOf3);
        }
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        try {
            Tuple2<MqttAsyncClient, Tuple9<String, String, String, MqttClientPersistence, MqttConnectOptions, Integer, Long, Long, Integer>> orCreate = CachedMQTTClient.getInstance().getOrCreate(this.config);
            this.objectSerializer = new SimpleObjectSerializer<>();
            this.store = new LocalMessageStore<>((MqttClientPersistence) ((Tuple9) orCreate.f1).f3, this.objectSerializer, this.objectSerializer);
        } catch (Exception e) {
            LOG.error("MQTT has not been properly initialized: ", e);
            throw e;
        }
    }

    public void close() throws IOException {
    }
}
