package com.kevin.flink.streaming.connectors.mqtt;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* compiled from: MessageStore.java */
/* loaded from: input_file:com/kevin/flink/streaming/connectors/mqtt/LocalMessageStore.class */
public class LocalMessageStore<T> implements MessageStore<T> {
    private static final Logger LOG = LoggerFactory.getLogger(LocalMessageStore.class);
    private MqttClientPersistence persistentStore;
    private final SerializationSchema<T> serializationSchema;
    private final DeserializationSchema<T> deserializationSchema;

    public LocalMessageStore(MqttClientPersistence mqttClientPersistence, SerializationSchema<T> serializationSchema, DeserializationSchema<T> deserializationSchema) {
        this.persistentStore = null;
        this.persistentStore = mqttClientPersistence;
        this.serializationSchema = serializationSchema;
        this.deserializationSchema = deserializationSchema;
    }

    private byte[] get(Long l) {
        try {
            return this.persistentStore.get(l.toString()).getHeaderBytes();
        } catch (MqttPersistenceException e) {
            LOG.error("MQTT LocalMessageStore get error: ", e);
            return null;
        }
    }

    @Override // com.kevin.flink.streaming.connectors.mqtt.MessageStore
    public Long maxProcessedOffset() {
        ArrayList arrayList = new ArrayList();
        try {
            Enumeration keys = this.persistentStore.keys();
            while (keys.hasMoreElements()) {
                arrayList.add(Long.valueOf((String) keys.nextElement()));
            }
        } catch (MqttPersistenceException e) {
            LOG.error("MQTT LocalMessageStore maxProcessedOffset error: ", e);
        }
        return (Long) Collections.max(arrayList);
    }

    @Override // com.kevin.flink.streaming.connectors.mqtt.MessageStore
    public Boolean store(Long l, T t) {
        try {
            this.persistentStore.put(l.toString(), new MqttPersistableData(this.serializationSchema.serialize(t)));
            return true;
        } catch (MqttPersistenceException e) {
            LOG.warn(String.format("Failed to store message Id: %s", l), e);
            return false;
        }
    }

    @Override // com.kevin.flink.streaming.connectors.mqtt.MessageStore
    public T retrieve(Long l) throws Exception {
        return (T) this.deserializationSchema.deserialize(get(l));
    }

    @Override // com.kevin.flink.streaming.connectors.mqtt.MessageStore
    public void remove(Long l) {
        try {
            this.persistentStore.remove(l.toString());
        } catch (MqttPersistenceException e) {
            LOG.error("MQTT LocalMessageStore remove error: ", e);
        }
    }

    @Override // com.kevin.flink.streaming.connectors.mqtt.MessageStore
    public void close() throws MqttPersistenceException {
        this.persistentStore.close();
    }
}
