package com.kevin.flink.streaming.connectors.mqtt;

import com.kevin.flink.streaming.connectors.mqtt.internal.MQTTExceptionListener;
import com.kevin.flink.streaming.connectors.mqtt.internal.RunningChecker;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple9;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/kevin/flink/streaming/connectors/mqtt/MQTTStreamSource.class */
class MQTTStreamSource extends MessageAcknowledgingSourceBase<MQTTMessage, Long> implements ResultTypeQueryable<MQTTMessage> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(MQTTStreamSource.class);
    private Map<String, String> config;
    private MqttAsyncClient client;
    private LocalMessageStore<MQTTMessage> store;
    private BlockingQueue<Long> queue;
    private RunningChecker runningChecker;
    private boolean autoAck;
    private SimpleObjectSerializer<MQTTMessage> objectSerializer;
    private MQTTExceptionListener exceptionListener;
    private boolean logFailuresOnly;

    public MQTTStreamSource(Map<String, String> map) {
        super(Long.class);
        this.objectSerializer = null;
        this.logFailuresOnly = false;
        this.config = map;
        this.objectSerializer = new SimpleObjectSerializer<>();
        this.runningChecker = new RunningChecker();
        this.exceptionListener = new MQTTExceptionListener(LOG, this.logFailuresOnly);
    }

    public void setLogFailuresOnly(boolean z) {
        this.logFailuresOnly = z;
    }

    public void setExceptionListener(MQTTExceptionListener mQTTExceptionListener) {
        this.exceptionListener = mQTTExceptionListener;
    }

    public void setRunningChecker(RunningChecker runningChecker) {
        this.runningChecker = runningChecker;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        Tuple9<String, String, String, MqttClientPersistence, MqttConnectOptions, Integer, Long, Long, Integer> parseConfigParams = MQTTUtils.parseConfigParams(this.config);
        String str = (String) parseConfigParams.f0;
        String str2 = (String) parseConfigParams.f1;
        final String str3 = (String) parseConfigParams.f2;
        MqttClientPersistence mqttClientPersistence = (MqttClientPersistence) parseConfigParams.f3;
        this.store = new LocalMessageStore<>(mqttClientPersistence, this.objectSerializer, this.objectSerializer);
        this.queue = new ArrayBlockingQueue(10000);
        MqttConnectOptions mqttConnectOptions = (MqttConnectOptions) parseConfigParams.f4;
        final Integer num = (Integer) parseConfigParams.f5;
        this.client = new MqttAsyncClient(str, str2, mqttClientPersistence);
        this.client.setCallback(new MqttCallbackExtended() { // from class: com.kevin.flink.streaming.connectors.mqtt.MQTTStreamSource.1
            public void messageArrived(String str4, MqttMessage mqttMessage) {
                synchronized (this) {
                    try {
                        MQTTMessage mQTTMessage = new MQTTMessage();
                        mQTTMessage.setMutable(false);
                        mQTTMessage.setTopic(str4);
                        mQTTMessage.setMessageId(mqttMessage.getId());
                        mQTTMessage.setRetained(mqttMessage.isRetained());
                        mQTTMessage.setDuplicate(mqttMessage.isDuplicate());
                        mQTTMessage.setQos(mqttMessage.getQos());
                        mQTTMessage.setPayload(mqttMessage.getPayload());
                        Long valueOf = Long.valueOf(mQTTMessage.getMessageId());
                        MQTTStreamSource.this.store.store(valueOf, mQTTMessage);
                        MQTTStreamSource.this.queue.put(valueOf);
                        MQTTStreamSource.LOG.trace(String.format("Message arrived, %s", mQTTMessage.toString()));
                    } catch (InterruptedException e) {
                        MQTTStreamSource.LOG.error("MQTT MQTTStreamSource put message into queue error: ", e);
                    }
                }
            }

            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
            }

            public void connectionLost(Throwable th) {
                MQTTStreamSource.LOG.warn("Connection to mqtt server lost.", th);
            }

            public void connectComplete(boolean z, String str4) {
                MQTTStreamSource.LOG.info(String.format("Connect complete %s. Is it a reconnect?: %s", str4, Boolean.valueOf(z)));
                try {
                    MQTTStreamSource.this.client.subscribe(str3, num.intValue());
                } catch (MqttException e) {
                    throw new RuntimeException((Throwable) e);
                }
            }
        });
        this.client.connect(mqttConnectOptions);
        this.runningChecker.setIsRunning(true);
    }

    public void close() throws Exception {
        super.close();
        RuntimeException runtimeException = null;
        try {
            this.client.disconnect();
        } catch (MqttException e) {
            if (this.logFailuresOnly) {
                LOG.error("Failed to disconnect MQTT client", e);
            } else {
                runtimeException = new RuntimeException("Failed to disconnect MQTT client", e);
            }
        }
        try {
            this.store.close();
        } catch (MqttException e2) {
            if (this.logFailuresOnly) {
                LOG.error("Failed to close MQTT store", e2);
            } else {
                runtimeException = runtimeException == null ? new RuntimeException("Failed to close MQTT store", e2) : runtimeException;
            }
        }
        try {
            this.client.close();
        } catch (MqttException e3) {
            if (this.logFailuresOnly) {
                LOG.error("Failed to close MQTT client", e3);
            } else {
                runtimeException = runtimeException == null ? new RuntimeException("Failed to close MQTT client", e3) : runtimeException;
            }
        }
        if (runtimeException != null) {
            throw runtimeException;
        }
    }

    public void run(SourceFunction.SourceContext<MQTTMessage> sourceContext) throws Exception {
        while (this.runningChecker.isRunning()) {
            this.exceptionListener.checkErroneous();
            Long take = this.queue.take();
            MQTTMessage retrieve = this.store.retrieve(take);
            synchronized (sourceContext.getCheckpointLock()) {
                if (this.autoAck || !addId(take)) {
                    sourceContext.collect(retrieve);
                } else {
                    sourceContext.collect(retrieve);
                }
            }
        }
    }

    public void cancel() {
        this.runningChecker.setIsRunning(false);
    }

    public TypeInformation<MQTTMessage> getProducedType() {
        return TypeInformation.of(MQTTMessage.class);
    }

    protected void acknowledgeIDs(long j, Set<Long> set) {
        try {
            Iterator<Long> it = set.iterator();
            while (it.hasNext()) {
                this.store.remove(it.next());
            }
        } catch (Exception e) {
            if (!this.logFailuresOnly) {
                throw new RuntimeException("Failed to acknowledge MQTT message");
            }
            LOG.error("Failed to acknowledge MQTT message");
        }
    }
}
