package io.github.guoshiqiufeng.loki.support.redis.impl;

import io.github.guoshiqiufeng.loki.MessageContent;
import io.github.guoshiqiufeng.loki.support.core.consumer.ConsumerConfig;
import io.github.guoshiqiufeng.loki.support.core.consumer.ConsumerRecord;
import io.github.guoshiqiufeng.loki.support.core.exception.LokiException;
import io.github.guoshiqiufeng.loki.support.core.pipeline.PipelineUtils;
import io.github.guoshiqiufeng.loki.support.core.producer.ProducerRecord;
import io.github.guoshiqiufeng.loki.support.core.producer.ProducerResult;
import io.github.guoshiqiufeng.loki.support.core.util.StringUtils;
import io.github.guoshiqiufeng.loki.support.core.util.ThreadPoolUtils;
import io.github.guoshiqiufeng.loki.support.redis.RedisClient;
import io.github.guoshiqiufeng.loki.support.redis.consumer.DefaultJedisPubSub;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisPubSub;

/* loaded from: input_file:io/github/guoshiqiufeng/loki/support/redis/impl/BaseRedisClient.class */
public abstract class BaseRedisClient implements RedisClient {
    private static final Logger log = LoggerFactory.getLogger(BaseRedisClient.class);

    public ProducerResult send(String str, ProducerRecord producerRecord) {
        if (producerRecord == null) {
            throw new LokiException("sendAsync fail : record is null!", new Object[0]);
        }
        ProducerRecord processSend = PipelineUtils.processSend(producerRecord);
        if (processSend == null) {
            throw new LokiException("record is null!", new Object[0]);
        }
        publish(processSend.getTopic(), processSend.getMessage());
        ProducerResult producerResult = new ProducerResult();
        producerResult.setTopic(processSend.getTopic());
        return producerResult;
    }

    public CompletableFuture<ProducerResult> sendAsync(String str, ProducerRecord producerRecord) {
        if (producerRecord == null) {
            throw new LokiException("sendAsync fail : record is null!", new Object[0]);
        }
        ProducerRecord processSend = PipelineUtils.processSend(producerRecord);
        if (processSend == null) {
            throw new LokiException("record is null!", new Object[0]);
        }
        return CompletableFuture.supplyAsync(() -> {
            return Long.valueOf(publish(processSend.getTopic(), processSend.getMessage()));
        }).thenApplyAsync(l -> {
            ProducerResult producerResult = new ProducerResult();
            producerResult.setTopic(processSend.getTopic());
            return producerResult;
        });
    }

    public void consumer(ConsumerConfig consumerConfig, Function<MessageContent<String>, Void> function) {
        String topic = consumerConfig.getTopic();
        String topicPattern = consumerConfig.getTopicPattern();
        String tag = consumerConfig.getTag();
        if (StringUtils.isEmpty(topic) && StringUtils.isEmpty(topicPattern)) {
            if (log.isErrorEnabled()) {
                log.error("RocketMqHandler# pushMessageListener error: topic and topicPattern is both null");
                return;
            }
            return;
        }
        try {
            if (StringUtils.isEmpty(tag)) {
            }
            CompletableFuture.runAsync(() -> {
                if (StringUtils.isEmpty(topicPattern)) {
                    subscribe(consumerRecord -> {
                        return (Void) function.apply(new MessageContent().setTopic(consumerRecord.getTopic()).setBody(consumerRecord.getBodyMessage()).setBodyMessage(consumerRecord.getBodyMessage()));
                    }, topic);
                } else {
                    psubscribe(consumerRecord2 -> {
                        return (Void) function.apply(new MessageContent().setTopic(consumerRecord2.getTopic()).setBody(consumerRecord2.getBodyMessage()).setBodyMessage(consumerRecord2.getBodyMessage()));
                    }, topicPattern);
                }
            }, ThreadPoolUtils.getSingleThreadPool()).exceptionally(th -> {
                if (!log.isErrorEnabled()) {
                    return null;
                }
                log.error("Exception occurred in CompletableFuture: {}", th.getMessage());
                return null;
            });
        } catch (Exception e) {
            if (log.isErrorEnabled()) {
                log.error("RocketMqHandler# pushMessageListener error:{}", e.getMessage());
            }
            throw new RuntimeException(e);
        }
    }

    abstract long publish(String str, String str2);

    public abstract void subscribe(JedisPubSub jedisPubSub, String... strArr);

    public abstract void psubscribe(JedisPubSub jedisPubSub, String... strArr);

    @Override // io.github.guoshiqiufeng.loki.support.redis.RedisClient
    public void subscribe(Function<ConsumerRecord, Void> function, String... strArr) {
        subscribe(new DefaultJedisPubSub(function), strArr);
    }

    @Override // io.github.guoshiqiufeng.loki.support.redis.RedisClient
    public void psubscribe(Function<ConsumerRecord, Void> function, String... strArr) {
        psubscribe(new DefaultJedisPubSub(function), strArr);
    }
}
