package io.github.guoshiqiufeng.loki.support.kafka.impl;

import io.github.guoshiqiufeng.loki.MessageContent;
import io.github.guoshiqiufeng.loki.support.core.consumer.ConsumerConfig;
import io.github.guoshiqiufeng.loki.support.core.exception.LokiException;
import io.github.guoshiqiufeng.loki.support.core.pipeline.PipelineUtils;
import io.github.guoshiqiufeng.loki.support.core.producer.ProducerRecord;
import io.github.guoshiqiufeng.loki.support.core.producer.ProducerResult;
import io.github.guoshiqiufeng.loki.support.core.util.StringUtils;
import io.github.guoshiqiufeng.loki.support.core.util.ThreadPoolUtils;
import io.github.guoshiqiufeng.loki.support.kafka.KafkaClient;
import io.github.guoshiqiufeng.loki.support.kafka.utils.KafkaConsumeUtils;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/github/guoshiqiufeng/loki/support/kafka/impl/BaseKafkaClient.class */
public abstract class BaseKafkaClient implements KafkaClient {
    private static final Logger log = LoggerFactory.getLogger(BaseKafkaClient.class);

    public ProducerResult send(String str, ProducerRecord producerRecord) {
        try {
            return sendAsync(str, producerRecord).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public CompletableFuture<ProducerResult> sendAsync(String str, ProducerRecord producerRecord) {
        if (producerRecord == null) {
            throw new LokiException("sendAsync fail : record is null!", new Object[0]);
        }
        org.apache.kafka.clients.producer.ProducerRecord<String, String> covertKafkaRecord = covertKafkaRecord(producerRecord);
        return CompletableFuture.supplyAsync(() -> {
            try {
                return send(str, (org.apache.kafka.clients.producer.ProducerRecord<String, String>) covertKafkaRecord).get();
            } catch (InterruptedException | ExecutionException e) {
                throw new LokiException(e.getMessage(), new Object[0]);
            }
        }).thenApplyAsync(recordMetadata -> {
            ProducerResult producerResult = new ProducerResult();
            producerResult.setTopic(recordMetadata.topic());
            producerResult.setMsgId(recordMetadata.partition() + "_" + recordMetadata.offset());
            return producerResult;
        });
    }

    public void consumer(ConsumerConfig consumerConfig, Function<MessageContent<String>, Void> function) {
        String topic = consumerConfig.getTopic();
        String topicPattern = consumerConfig.getTopicPattern();
        String tag = consumerConfig.getTag();
        if (StringUtils.isEmpty(topic) && StringUtils.isEmpty(topicPattern)) {
            if (log.isErrorEnabled()) {
                log.error("RocketMqHandler# pushMessageListener error: topic and topicPattern is both null");
                return;
            }
            return;
        }
        try {
            if (StringUtils.isEmpty(tag)) {
                tag = "*";
            }
            ExecutorService singleThreadPool = ThreadPoolUtils.getSingleThreadPool();
            String str = tag;
            KafkaConsumer<String, String> consumer = getConsumer(consumerConfig.getConsumerGroup(), consumerConfig.getIndex());
            CompletableFuture.runAsync(() -> {
                if (StringUtils.isEmpty(topicPattern)) {
                    KafkaConsumeUtils.consumeMessage(consumer, topic, str, consumerRecord -> {
                        return (Void) function.apply(new MessageContent().setMessageId(consumerRecord.getMessageId()).setTopic(consumerRecord.getTopic()).setTag(consumerRecord.getTag()).setKeys(consumerRecord.getKeys()).setBody(consumerRecord.getBodyMessage()).setBodyMessage(consumerRecord.getBodyMessage()));
                    });
                } else {
                    KafkaConsumeUtils.consumeMessageForPattern(consumer, topicPattern, str, consumerRecord2 -> {
                        return (Void) function.apply(new MessageContent().setMessageId(consumerRecord2.getMessageId()).setTopic(consumerRecord2.getTopic()).setTag(consumerRecord2.getTag()).setKeys(consumerRecord2.getKeys()).setBody(consumerRecord2.getBodyMessage()).setBodyMessage(consumerRecord2.getBodyMessage()));
                    });
                }
            }, singleThreadPool).exceptionally(th -> {
                if (!log.isErrorEnabled()) {
                    return null;
                }
                log.error("Exception occurred in CompletableFuture: {}", th.getMessage());
                return null;
            });
        } catch (Exception e) {
            if (log.isErrorEnabled()) {
                log.error("RocketMqHandler# pushMessageListener error:{}", e.getMessage());
            }
            throw new RuntimeException(e);
        }
    }

    private org.apache.kafka.clients.producer.ProducerRecord<String, String> covertKafkaRecord(ProducerRecord producerRecord) {
        ProducerRecord processSend = PipelineUtils.processSend(producerRecord);
        if (processSend == null) {
            throw new LokiException("record is null!", new Object[0]);
        }
        ArrayList arrayList = new ArrayList();
        String tag = processSend.getTag();
        Long deliveryTimestamp = processSend.getDeliveryTimestamp();
        Long l = null;
        if (deliveryTimestamp != null && deliveryTimestamp.longValue() != 0) {
            l = Long.valueOf(System.currentTimeMillis() + deliveryTimestamp.longValue());
        }
        if (StringUtils.isNotEmpty(tag)) {
            arrayList.add(new RecordHeader("kafka_tag_id", tag.getBytes(StandardCharsets.UTF_8)));
        }
        String str = null;
        if (processSend.getKeys() != null && !processSend.getKeys().isEmpty()) {
            str = (String) processSend.getKeys().get(0);
        }
        return new org.apache.kafka.clients.producer.ProducerRecord<>(processSend.getTopic(), (Integer) null, l, str, processSend.getMessage(), arrayList);
    }

    abstract Future<RecordMetadata> send(String str, org.apache.kafka.clients.producer.ProducerRecord<String, String> producerRecord);
}
