package com.github.ddth.queue.impl;

import com.github.ddth.kafka.KafkaClient;
import com.github.ddth.kafka.KafkaMessage;
import com.github.ddth.queue.IQueue;
import com.github.ddth.queue.IQueueMessage;
import com.github.ddth.queue.utils.QueueException;
import java.io.Closeable;
import java.util.Collection;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/ddth/queue/impl/KafkaQueue.class */
public abstract class KafkaQueue implements IQueue, Closeable, AutoCloseable {
    private KafkaClient kafkaClient;
    private Properties producerProps;
    private Properties consumerProps;
    private final Logger LOGGER = LoggerFactory.getLogger(KafkaQueue.class);
    private boolean myOwnKafkaClient = true;
    private String bootstrapServers = "localhost:9092";
    private String topicName = "ddth-queue";
    private String consumerGroupId = "kafkaqueue-" + System.currentTimeMillis();
    private KafkaClient.ProducerType producerType = KafkaClient.ProducerType.SYNC_LEADER_ACK;

    public KafkaClient.ProducerType getProducerType() {
        return this.producerType;
    }

    public KafkaQueue setProducerType(KafkaClient.ProducerType producerType) {
        this.producerType = producerType;
        return this;
    }

    public String getKafkaBootstrapServers() {
        return this.bootstrapServers;
    }

    public KafkaQueue setKafkaBootstrapServers(String str) {
        this.bootstrapServers = str;
        return this;
    }

    public Properties getKafkaProducerProperties() {
        return this.producerProps;
    }

    public void setKafkaProducerProperties(Properties properties) {
        this.producerProps = properties;
    }

    public Properties getKafkaConsumerProperties() {
        return this.consumerProps;
    }

    public void setKafkaConsumerProperties(Properties properties) {
        this.consumerProps = properties;
    }

    public String getTopicName() {
        return this.topicName;
    }

    public KafkaQueue setTopicName(String str) {
        this.topicName = str;
        return this;
    }

    public String getConsumerGroupId() {
        return this.consumerGroupId;
    }

    public KafkaQueue setConsumerGroupId(String str) {
        this.consumerGroupId = str;
        return this;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public KafkaClient getKafkaClient() {
        return this.kafkaClient;
    }

    public KafkaQueue setKafkaClient(KafkaClient kafkaClient) {
        this.kafkaClient = kafkaClient;
        this.myOwnKafkaClient = false;
        return this;
    }

    public KafkaQueue init() throws Exception {
        if (this.kafkaClient == null) {
            this.kafkaClient = new KafkaClient(this.bootstrapServers);
            this.kafkaClient.setProducerProperties(this.consumerProps).setConsumerProperties(this.consumerProps);
            this.kafkaClient.init();
            this.myOwnKafkaClient = true;
        }
        return this;
    }

    public void destroy() {
        if (this.kafkaClient == null || !this.myOwnKafkaClient) {
            return;
        }
        try {
            this.kafkaClient.destroy();
        } catch (Exception e) {
            this.LOGGER.warn(e.getMessage(), e);
        } finally {
            this.kafkaClient = null;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        destroy();
    }

    protected abstract byte[] serialize(IQueueMessage iQueueMessage) throws QueueException;

    protected abstract IQueueMessage deserialize(byte[] bArr) throws QueueException;

    protected IQueueMessage takeFromQueue() {
        KafkaMessage consumeMessage = this.kafkaClient.consumeMessage(this.consumerGroupId, true, this.topicName, 1000L, TimeUnit.MILLISECONDS);
        if (consumeMessage != null) {
            return deserialize(consumeMessage.content());
        }
        return null;
    }

    protected boolean putToQueue(IQueueMessage iQueueMessage) {
        byte[] serialize = serialize(iQueueMessage);
        Object qId = iQueueMessage.qId();
        this.kafkaClient.sendMessage(this.producerType, qId != null ? new KafkaMessage(this.topicName, qId.toString(), serialize) : new KafkaMessage(this.topicName, serialize));
        return true;
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean queue(IQueueMessage iQueueMessage) {
        IQueueMessage mo2clone = iQueueMessage.mo2clone();
        Date date = new Date();
        mo2clone.qNumRequeues(0).qOriginalTimestamp(date).qTimestamp(date);
        return putToQueue(mo2clone);
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean requeue(IQueueMessage iQueueMessage) {
        IQueueMessage mo2clone = iQueueMessage.mo2clone();
        mo2clone.qIncNumRequeues().qTimestamp(new Date());
        return putToQueue(mo2clone);
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean requeueSilent(IQueueMessage iQueueMessage) {
        return putToQueue(iQueueMessage);
    }

    @Override // com.github.ddth.queue.IQueue
    public void finish(IQueueMessage iQueueMessage) {
    }

    @Override // com.github.ddth.queue.IQueue
    public IQueueMessage take() {
        return takeFromQueue();
    }

    @Override // com.github.ddth.queue.IQueue
    public Collection<IQueueMessage> getOrphanMessages(long j) {
        return null;
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean moveFromEphemeralToQueueStorage(IQueueMessage iQueueMessage) {
        throw new UnsupportedOperationException("Method [moveFromEphemeralToQueueStorage] is not supported!");
    }

    @Override // com.github.ddth.queue.IQueue
    public int queueSize() {
        return -1;
    }

    @Override // com.github.ddth.queue.IQueue
    public int ephemeralSize() {
        return -1;
    }
}
