package com.github.ddth.queue.impl;

import com.github.ddth.kafka.KafkaClient;
import com.github.ddth.kafka.KafkaMessage;
import com.github.ddth.queue.IPartitionSupport;
import com.github.ddth.queue.IQueueMessage;
import com.github.ddth.queue.utils.QueueException;
import java.util.Collection;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/ddth/queue/impl/KafkaQueue.class */
public abstract class KafkaQueue<ID, DATA> extends AbstractQueue<ID, DATA> {
    public static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092";
    public static final String DEFAULT_TOPIC_NAME = "ddth-queue";
    public static final KafkaClient.ProducerType DEFAULT_PRODUCER_TYPE = KafkaClient.ProducerType.LEADER_ACK;
    public static final boolean DEFAULT_SEND_ASYNC = true;
    public static final String DEFAULT_CONSUMER_GROUP_ID = "ddth-queue";
    private KafkaClient kafkaClient;
    private Properties producerProps;
    private Properties consumerProps;
    private final Logger LOGGER = LoggerFactory.getLogger(KafkaQueue.class);
    private boolean myOwnKafkaClient = true;
    private String bootstrapServers = DEFAULT_BOOTSTRAP_SERVERS;
    private String topicName = "ddth-queue";
    private String consumerGroupId = "ddth-queue" + System.currentTimeMillis();
    private KafkaClient.ProducerType producerType = DEFAULT_PRODUCER_TYPE;
    private boolean sendAsync = true;

    public boolean isSendAsync() {
        return this.sendAsync;
    }

    public KafkaQueue<ID, DATA> setSendAsync(boolean z) {
        this.sendAsync = z;
        return this;
    }

    public KafkaClient.ProducerType getProducerType() {
        return this.producerType;
    }

    public KafkaQueue<ID, DATA> setProducerType(KafkaClient.ProducerType producerType) {
        this.producerType = producerType;
        return this;
    }

    public String getKafkaBootstrapServers() {
        return this.bootstrapServers;
    }

    public KafkaQueue<ID, DATA> setKafkaBootstrapServers(String str) {
        this.bootstrapServers = str;
        return this;
    }

    public Properties getKafkaProducerProperties() {
        return this.producerProps;
    }

    public KafkaQueue<ID, DATA> setKafkaProducerProperties(Properties properties) {
        this.producerProps = properties;
        return this;
    }

    public Properties getKafkaConsumerProperties() {
        return this.consumerProps;
    }

    public KafkaQueue<ID, DATA> setKafkaConsumerProperties(Properties properties) {
        this.consumerProps = properties;
        return this;
    }

    public String getTopicName() {
        return this.topicName;
    }

    public KafkaQueue<ID, DATA> setTopicName(String str) {
        this.topicName = str;
        return this;
    }

    public String getConsumerGroupId() {
        return this.consumerGroupId;
    }

    public KafkaQueue<ID, DATA> setConsumerGroupId(String str) {
        this.consumerGroupId = str;
        return this;
    }

    protected KafkaClient getKafkaClient() {
        return this.kafkaClient;
    }

    public KafkaQueue<ID, DATA> setKafkaClient(KafkaClient kafkaClient) {
        this.kafkaClient = kafkaClient;
        this.myOwnKafkaClient = false;
        return this;
    }

    protected KafkaClient buildKafkaClient() throws Exception {
        if (StringUtils.isBlank(this.bootstrapServers)) {
            throw new IllegalStateException("Kafka bootstrap server list is not defined.");
        }
        KafkaClient kafkaClient = new KafkaClient(this.bootstrapServers);
        kafkaClient.setProducerProperties(this.consumerProps).setConsumerProperties(this.consumerProps);
        kafkaClient.init();
        return kafkaClient;
    }

    @Override // com.github.ddth.queue.impl.AbstractQueue
    public KafkaQueue<ID, DATA> init() throws Exception {
        if (this.kafkaClient == null) {
            this.kafkaClient = buildKafkaClient();
            this.myOwnKafkaClient = this.kafkaClient != null;
        }
        super.init();
        if (this.kafkaClient == null) {
            throw new IllegalStateException("Kafka client is null.");
        }
        return this;
    }

    @Override // com.github.ddth.queue.impl.AbstractQueue
    public void destroy() {
        try {
            super.destroy();
            if (this.kafkaClient != null) {
                try {
                } catch (Exception e) {
                    this.LOGGER.warn(e.getMessage(), e);
                } finally {
                    this.kafkaClient = null;
                }
                if (this.myOwnKafkaClient) {
                    this.kafkaClient.destroy();
                }
            }
        } catch (Throwable th) {
            if (this.kafkaClient != null) {
                try {
                    if (this.myOwnKafkaClient) {
                        try {
                            this.kafkaClient.destroy();
                        } catch (Exception e2) {
                            this.LOGGER.warn(e2.getMessage(), e2);
                            this.kafkaClient = null;
                        }
                    }
                } catch (Throwable th2) {
                    throw th2;
                }
            }
            throw th;
        }
    }

    protected abstract byte[] serialize(IQueueMessage<ID, DATA> iQueueMessage) throws QueueException;

    protected abstract IQueueMessage<ID, DATA> deserialize(byte[] bArr) throws QueueException;

    protected IQueueMessage<ID, DATA> takeFromQueue() {
        KafkaMessage consumeMessage = this.kafkaClient.consumeMessage(this.consumerGroupId, true, this.topicName, 1000L, TimeUnit.MILLISECONDS);
        if (consumeMessage != null) {
            return deserialize(consumeMessage.content());
        }
        return null;
    }

    protected boolean putToQueue(IQueueMessage<ID, DATA> iQueueMessage) {
        byte[] serialize = serialize(iQueueMessage);
        ID qId = iQueueMessage instanceof IPartitionSupport ? (ID) ((IPartitionSupport) iQueueMessage).qPartitionKey() : iQueueMessage.qId();
        if (qId == null) {
            qId = iQueueMessage.qId();
        }
        KafkaMessage kafkaMessage = qId != null ? new KafkaMessage(this.topicName, qId.toString(), serialize) : new KafkaMessage(this.topicName, serialize);
        return this.sendAsync ? this.kafkaClient.sendMessageRaw(this.producerType, kafkaMessage) != null : this.kafkaClient.sendMessage(this.producerType, kafkaMessage) != null;
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean queue(IQueueMessage<ID, DATA> iQueueMessage) {
        IQueueMessage<ID, DATA> mo1clone = iQueueMessage.mo1clone();
        Date date = new Date();
        mo1clone.qNumRequeues2(0).qOriginalTimestamp2(date).qTimestamp2(date);
        return putToQueue(mo1clone);
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean requeue(IQueueMessage<ID, DATA> iQueueMessage) {
        IQueueMessage<ID, DATA> mo1clone = iQueueMessage.mo1clone();
        mo1clone.qIncNumRequeues2().qTimestamp2(new Date());
        return putToQueue(mo1clone);
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean requeueSilent(IQueueMessage<ID, DATA> iQueueMessage) {
        return putToQueue(iQueueMessage.mo1clone());
    }

    @Override // com.github.ddth.queue.IQueue
    public void finish(IQueueMessage<ID, DATA> iQueueMessage) {
    }

    @Override // com.github.ddth.queue.IQueue
    public IQueueMessage<ID, DATA> take() {
        return takeFromQueue();
    }

    @Override // com.github.ddth.queue.IQueue
    public Collection<IQueueMessage<ID, DATA>> getOrphanMessages(long j) {
        throw new QueueException.OperationNotSupported("This queue does not support retrieving orphan messages");
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean moveFromEphemeralToQueueStorage(IQueueMessage<ID, DATA> iQueueMessage) {
        throw new QueueException.OperationNotSupported("This queue does not support ephemeral storage.");
    }

    @Override // com.github.ddth.queue.IQueue
    public int queueSize() {
        return -1;
    }

    @Override // com.github.ddth.queue.IQueue
    public int ephemeralSize() {
        return -1;
    }
}
