package io.github.guoshiqiufeng.loki.support.rocketmq.remoting.impl;

import com.google.common.base.Throwables;
import io.github.guoshiqiufeng.loki.MessageContent;
import io.github.guoshiqiufeng.loki.support.core.consumer.ConsumerConfig;
import io.github.guoshiqiufeng.loki.support.core.consumer.ConsumerRecord;
import io.github.guoshiqiufeng.loki.support.core.exception.LokiException;
import io.github.guoshiqiufeng.loki.support.core.pipeline.PipelineUtils;
import io.github.guoshiqiufeng.loki.support.core.producer.ProducerRecord;
import io.github.guoshiqiufeng.loki.support.core.producer.ProducerResult;
import io.github.guoshiqiufeng.loki.support.core.util.StringUtils;
import io.github.guoshiqiufeng.loki.support.core.util.ThreadPoolUtils;
import io.github.guoshiqiufeng.loki.support.rocketmq.remoting.RocketRemotingClient;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/github/guoshiqiufeng/loki/support/rocketmq/remoting/impl/BaseRocketRemotingClient.class */
public abstract class BaseRocketRemotingClient implements RocketRemotingClient {
    private static final Logger log = LoggerFactory.getLogger(BaseRocketRemotingClient.class);

    public ProducerResult send(String str, ProducerRecord producerRecord) {
        if (producerRecord == null) {
            throw new LokiException("sendAsync fail : record is null!", new Object[0]);
        }
        Message covertMessage = covertMessage(producerRecord);
        SendResult send = send(str, covertMessage);
        ProducerResult producerResult = new ProducerResult();
        producerResult.setTopic(covertMessage.getTopic());
        producerResult.setMsgId(send.getMsgId());
        return producerResult;
    }

    public CompletableFuture<ProducerResult> sendAsync(String str, ProducerRecord producerRecord) {
        if (producerRecord == null) {
            throw new LokiException("sendAsync fail : record is null!", new Object[0]);
        }
        Message covertMessage = covertMessage(producerRecord);
        return CompletableFuture.supplyAsync(() -> {
            return send(str, covertMessage);
        }).thenApplyAsync(sendResult -> {
            ProducerResult producerResult = new ProducerResult();
            producerResult.setTopic(covertMessage.getTopic());
            producerResult.setMsgId(sendResult.getMsgId());
            return producerResult;
        });
    }

    public void consumer(ConsumerConfig consumerConfig, Function<MessageContent<String>, Void> function) {
        String topic = consumerConfig.getTopic();
        String topicPattern = consumerConfig.getTopicPattern();
        String tag = consumerConfig.getTag();
        if (StringUtils.isEmpty(topic) && StringUtils.isEmpty(topicPattern)) {
            if (log.isErrorEnabled()) {
                log.error("RocketMqHandler# pushMessageListener error: topic and topicPattern is both null");
                return;
            }
            return;
        }
        try {
            if (StringUtils.isEmpty(tag)) {
                tag = "*";
            }
            ExecutorService singleThreadPool = ThreadPoolUtils.getSingleThreadPool();
            String str = tag;
            DefaultMQPushConsumer consumer = getConsumer(consumerConfig.getConsumerGroup(), consumerConfig.getIndex());
            CompletableFuture.runAsync(() -> {
                if (StringUtils.isEmpty(topicPattern)) {
                    try {
                        consumer.subscribe(topic, str);
                        consumer.registerMessageListener((list, consumeConcurrentlyContext) -> {
                            try {
                                Iterator it = list.iterator();
                                while (it.hasNext()) {
                                    MessageExt messageExt = (MessageExt) it.next();
                                    if (log.isDebugEnabled()) {
                                        log.debug("msgExt:{}", messageExt);
                                    }
                                    ConsumerRecord processListener = PipelineUtils.processListener(covertConsumerRecord(messageExt));
                                    if (processListener == null) {
                                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                                    }
                                    function.apply(new MessageContent().setMessageId(processListener.getMessageId()).setTopic(processListener.getTopic()).setTag(processListener.getTag()).setKeys(processListener.getKeys()).setBody(processListener.getBodyMessage()).setBodyMessage(processListener.getBodyMessage()));
                                }
                                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                            } catch (Exception e) {
                                if (log.isErrorEnabled()) {
                                    log.error("RocketMqHandler# pushMessageListener error:{}", Throwables.getStackTraceAsString(e));
                                }
                                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                            }
                        });
                        consumer.start();
                    } catch (Exception e) {
                        if (log.isErrorEnabled()) {
                            log.error("Exception consumer in registerMessageListener: {}", e.getMessage());
                        }
                    }
                }
            }, singleThreadPool).exceptionally(th -> {
                if (!log.isErrorEnabled()) {
                    return null;
                }
                log.error("Exception occurred in CompletableFuture: {}", th.getMessage());
                return null;
            });
        } catch (Exception e) {
            if (log.isErrorEnabled()) {
                log.error("RocketMqHandler# pushMessageListener error:{}", e.getMessage());
            }
            throw new RuntimeException(e);
        }
    }

    private ConsumerRecord covertConsumerRecord(MessageExt messageExt) {
        return new ConsumerRecord(messageExt.getTopic(), messageExt.getTags(), messageExt.getMsgId(), (String) null, Arrays.asList(messageExt.getKeys().split(",")), new String(messageExt.getBody()));
    }

    private Message covertMessage(ProducerRecord producerRecord) {
        ProducerRecord processSend = PipelineUtils.processSend(producerRecord);
        if (processSend == null) {
            throw new LokiException("record is null!", new Object[0]);
        }
        Message message = new Message(processSend.getTopic(), processSend.getTag(), processSend.getMessage().getBytes());
        Long deliveryTimestamp = processSend.getDeliveryTimestamp();
        if (deliveryTimestamp != null && deliveryTimestamp.longValue() != 0) {
            message.setDeliverTimeMs(System.currentTimeMillis() + deliveryTimestamp.longValue());
        }
        List keys = processSend.getKeys();
        if (keys != null && !keys.isEmpty()) {
            message.setKeys(keys);
        }
        return message;
    }

    abstract SendResult send(String str, Message message);
}
