package io.github.guoshiqiufeng.loki.support.rocketmq.impl;

import io.github.guoshiqiufeng.loki.MessageContent;
import io.github.guoshiqiufeng.loki.support.core.consumer.ConsumerConfig;
import io.github.guoshiqiufeng.loki.support.core.consumer.ConsumerRecord;
import io.github.guoshiqiufeng.loki.support.core.exception.LokiException;
import io.github.guoshiqiufeng.loki.support.core.pipeline.PipelineUtils;
import io.github.guoshiqiufeng.loki.support.core.producer.ProducerRecord;
import io.github.guoshiqiufeng.loki.support.core.producer.ProducerResult;
import io.github.guoshiqiufeng.loki.support.core.util.StringUtils;
import io.github.guoshiqiufeng.loki.support.rocketmq.RocketClient;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
import org.apache.rocketmq.shaded.com.google.common.base.Throwables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/github/guoshiqiufeng/loki/support/rocketmq/impl/BaseRocketClient.class */
public abstract class BaseRocketClient implements RocketClient {
    private static final Logger log = LoggerFactory.getLogger(BaseRocketClient.class);

    public ProducerResult send(String str, ProducerRecord producerRecord) {
        if (producerRecord == null) {
            throw new LokiException("sendAsync fail : record is null!", new Object[0]);
        }
        Message covertMessage = covertMessage(str, producerRecord);
        ProducerResult producerResult = new ProducerResult();
        SendReceipt send = send(str, covertMessage);
        producerResult.setTopic(covertMessage.getTopic());
        producerResult.setMsgId(send.getMessageId().toString());
        return producerResult;
    }

    public CompletableFuture<ProducerResult> sendAsync(String str, ProducerRecord producerRecord) {
        if (producerRecord == null) {
            throw new LokiException("sendAsync fail : record is null!", new Object[0]);
        }
        Message covertMessage = covertMessage(str, producerRecord);
        return sendAsync(str, covertMessage).thenApply(sendReceipt -> {
            ProducerResult producerResult = new ProducerResult();
            producerResult.setTopic(covertMessage.getTopic());
            producerResult.setMsgId(sendReceipt.getMessageId().toString());
            return producerResult;
        });
    }

    public void consumer(ConsumerConfig consumerConfig, Function<MessageContent<String>, Void> function) {
        String topic = consumerConfig.getTopic();
        String tag = consumerConfig.getTag();
        if (StringUtils.isEmpty(topic)) {
            if (log.isErrorEnabled()) {
                log.error("RocketMqHandler# pushMessageListener error: topic is null");
                return;
            }
            return;
        }
        try {
            PushConsumerBuilder consumer = getConsumer(consumerConfig.getConsumerGroup(), consumerConfig.getIndex());
            if (StringUtils.isEmpty(tag)) {
                tag = "*";
            }
            consumer.setConsumerGroup(consumerConfig.getConsumerGroup()).setSubscriptionExpressions(Collections.singletonMap(topic, new FilterExpression(tag, FilterExpressionType.TAG))).setConsumptionThreadCount(consumerConfig.getConsumptionThreadCount().intValue()).setMaxCacheMessageCount(consumerConfig.getMaxCacheMessageCount().intValue()).setMessageListener(messageView -> {
                if (log.isDebugEnabled()) {
                    log.debug("Consume message={}", messageView);
                }
                ConsumerRecord processListener = PipelineUtils.processListener(covertConsumerRecord(messageView));
                if (processListener == null) {
                    return ConsumeResult.SUCCESS;
                }
                try {
                    function.apply(new MessageContent().setMessageId(processListener.getMessageId()).setMessageGroup(processListener.getMessageGroup()).setTopic(processListener.getTopic()).setTag(processListener.getTag()).setKeys(processListener.getKeys()).setBody(processListener.getBodyMessage()).setBodyMessage(processListener.getBodyMessage()));
                    return ConsumeResult.SUCCESS;
                } catch (Exception e) {
                    if (log.isErrorEnabled()) {
                        log.error("RocketMqHandler# pushMessageListener error:{}", Throwables.getStackTraceAsString(e));
                    }
                    return ConsumeResult.FAILURE;
                }
            }).build();
        } catch (Exception e) {
            if (log.isErrorEnabled()) {
                log.error("RocketMqHandler# pushMessageListener error:{}", e.getMessage());
            }
            throw new RuntimeException(e);
        }
    }

    private ConsumerRecord covertConsumerRecord(MessageView messageView) {
        MessageId messageId = messageView.getMessageId();
        String str = (String) messageView.getMessageGroup().orElse("");
        String str2 = (String) messageView.getTag().orElse("");
        return new ConsumerRecord(messageView.getTopic(), str2, messageId.toString(), str, messageView.getKeys(), StandardCharsets.UTF_8.decode(messageView.getBody()).toString());
    }

    private Message covertMessage(String str, ProducerRecord producerRecord) {
        ProducerRecord processSend = PipelineUtils.processSend(producerRecord);
        if (processSend == null) {
            throw new LokiException("record is null!", new Object[0]);
        }
        MessageBuilder topic = new MessageBuilderImpl().setTopic(processSend.getTopic());
        if (StringUtils.isNotEmpty(processSend.getTag())) {
            topic.setTag(processSend.getTag());
        }
        Long deliveryTimestamp = processSend.getDeliveryTimestamp();
        if (deliveryTimestamp == null || deliveryTimestamp.longValue() == 0) {
            topic.setMessageGroup(str);
        } else {
            topic.setDeliveryTimestamp(System.currentTimeMillis() + deliveryTimestamp.longValue());
        }
        List keys = processSend.getKeys();
        if (keys != null && !keys.isEmpty()) {
            topic.setKeys((String[]) keys.toArray(new String[0]));
        }
        return topic.setBody(processSend.getMessage().getBytes()).build();
    }

    abstract SendReceipt send(String str, Message message);

    abstract CompletableFuture<SendReceipt> sendAsync(String str, Message message);
}
