package io.github.aooohan.mq.core;

import io.github.aooohan.mq.core.listener.AnnRedisMessageListener;
import io.github.aooohan.mq.core.listener.RedisMessageErrorHandleListener;
import io.github.aooohan.mq.core.listener.RedisMessageListener;
import io.github.aooohan.mq.core.wrapper.RedisMessageListenerWrapper;
import io.github.aooohan.mq.serializer.MessageSerializer;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.ResolvableType;
import org.springframework.data.redis.core.RedisTemplate;

/* loaded from: input_file:io/github/aooohan/mq/core/DefaultMessageOperation.class */
public class DefaultMessageOperation implements MessageOperation {
    private static final Log logger = LogFactory.getLog(DefaultMessageOperation.class);
    private static final String LOCK_PREFIX = "redis-mq:lock:";
    private final RedisTemplate<String, String> redisTemplate;
    private final MessageSerializer messageSerializer;
    private final MessageErrorHandler errorHandler;
    private final MessagePublisher messagePublisher;
    private final RedisMessageListener<?> listener;
    private final String topic;
    private final String groupName;
    private final Class<?> paramClass;

    public DefaultMessageOperation(RedisTemplate<String, String> redisTemplate, MessageSerializer messageSerializer, RedisMessageListener<?> redisMessageListener) {
        this.redisTemplate = redisTemplate;
        this.listener = redisMessageListener;
        this.messageSerializer = messageSerializer;
        this.messagePublisher = new DefaultMessagePublisher(messageSerializer, redisTemplate);
        this.topic = redisMessageListener.topicName();
        this.groupName = redisMessageListener.groupName();
        if (redisMessageListener instanceof RedisMessageListenerWrapper) {
            this.paramClass = getParamClass(((RedisMessageListenerWrapper) redisMessageListener).getTarget());
        } else {
            this.paramClass = getParamClass(redisMessageListener);
        }
        if (redisMessageListener instanceof RedisMessageErrorHandleListener) {
            this.errorHandler = (MessageErrorHandler) redisMessageListener;
        } else {
            this.errorHandler = new DefaultMessageErrorHandler(this.topic, this.groupName);
        }
    }

    private Class<?> getParamClass(RedisMessageListener<?> redisMessageListener) {
        return redisMessageListener instanceof AnnRedisMessageListener ? ((AnnRedisMessageListener) redisMessageListener).getParameterizedType() : ResolvableType.forInstance(redisMessageListener).as(RedisMessageListener.class).resolveGeneric(new int[]{0});
    }

    @Override // io.github.aooohan.mq.core.MessageAck
    public boolean ack(String str, String str2, String... strArr) {
        Long l = 1L;
        return l.equals(this.redisTemplate.opsForStream().acknowledge(str, str2, strArr));
    }

    @Override // io.github.aooohan.mq.core.MessagePublisher
    public String publish(String str, Object obj) {
        return this.messagePublisher.publish(str, obj);
    }

    @Override // io.github.aooohan.mq.core.MessageOperation
    public void createGroupIfAbsent() {
        Boolean ifAbsent = this.redisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX + this.topic, this.groupName, 1L, TimeUnit.SECONDS);
        try {
            if (Boolean.TRUE.equals(this.redisTemplate.hasKey(this.topic))) {
                Stream map = this.redisTemplate.opsForStream().groups(this.topic).stream().map((v0) -> {
                    return v0.groupName();
                });
                String str = this.groupName;
                str.getClass();
                map.filter((v1) -> {
                    return r1.equals(v1);
                }).findFirst().orElseGet(() -> {
                    logger.info("MQ: create group [" + this.groupName + "] for topic [" + this.topic + "]");
                    return this.redisTemplate.opsForStream().createGroup(this.topic, this.groupName);
                });
            } else {
                this.redisTemplate.opsForStream().createGroup(this.topic, this.groupName);
                logger.info("MQ: create topic [" + this.topic + "]");
            }
            if (Boolean.TRUE.equals(ifAbsent)) {
                this.redisTemplate.delete(LOCK_PREFIX + this.topic);
            }
        } catch (Throwable th) {
            if (Boolean.TRUE.equals(ifAbsent)) {
                this.redisTemplate.delete(LOCK_PREFIX + this.topic);
            }
            throw th;
        }
    }

    @Override // io.github.aooohan.mq.core.MessageOperation
    public <T> T deserialize(String str) throws Exception {
        return (T) this.messageSerializer.deserialize(str, this.paramClass);
    }

    @Override // io.github.aooohan.mq.core.MessageErrorHandler
    public void onError(String str, Throwable th) {
        this.errorHandler.onError(str, th);
    }
}
