package io.github.aooohan.mq.core;

import io.github.aooohan.mq.core.listener.AnnRedisMqListener;
import io.github.aooohan.mq.core.listener.RedisMqErrorHandleListener;
import io.github.aooohan.mq.core.listener.RedisMqListener;
import io.github.aooohan.mq.core.wrapper.RedisMqListenerWrapper;
import io.github.aooohan.mq.serializer.RedisMqSerializer;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.ResolvableType;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.util.Assert;

/* loaded from: input_file:io/github/aooohan/mq/core/DefaultRedisMqOperation.class */
public class DefaultRedisMqOperation implements RedisMqOperation {
    private static final Log logger = LogFactory.getLog(DefaultRedisMqOperation.class);
    private static final String LOCK_PREFIX = "redis-mq:lock:";
    private final RedisTemplate<String, String> redisTemplate;
    private final RedisMqSerializer redisMqSerializer;
    private final RedisMqErrorHandler errorHandler;
    private final RedisMqPublisher redisMqPublisher;
    private final RedisMqListener<?> listener;
    private final String topic;
    private final String groupName;
    private final Class<?> paramClass;

    public DefaultRedisMqOperation(RedisTemplate<String, String> redisTemplate, RedisMqSerializer redisMqSerializer, RedisMqListener<?> redisMqListener) {
        Assert.notNull(redisTemplate, "redisTemplate can not be null");
        Assert.notNull(redisMqSerializer, "messageSerializer can not be null");
        Assert.notNull(redisMqListener, "listener can not be null");
        this.redisTemplate = redisTemplate;
        this.listener = redisMqListener;
        this.redisMqSerializer = redisMqSerializer;
        this.redisMqPublisher = new DefaultRedisMqPublisher(redisMqSerializer, redisTemplate);
        this.topic = redisMqListener.topicName();
        this.groupName = redisMqListener.groupName();
        if (redisMqListener instanceof RedisMqListenerWrapper) {
            this.paramClass = getParamClass(((RedisMqListenerWrapper) redisMqListener).getTarget());
        } else {
            this.paramClass = getParamClass(redisMqListener);
        }
        if (redisMqListener instanceof RedisMqErrorHandleListener) {
            this.errorHandler = (RedisMqErrorHandler) redisMqListener;
        } else {
            this.errorHandler = new DefaultRedisMqErrorHandler(this.topic, this.groupName);
        }
    }

    private Class<?> getParamClass(RedisMqListener<?> redisMqListener) {
        return redisMqListener instanceof AnnRedisMqListener ? ((AnnRedisMqListener) redisMqListener).getParameterizedType() : ResolvableType.forInstance(redisMqListener).as(RedisMqListener.class).resolveGeneric(new int[]{0});
    }

    @Override // io.github.aooohan.mq.core.RedisMqAck
    public boolean ack(String str, String str2, String... strArr) {
        Long l = 1L;
        return l.equals(this.redisTemplate.opsForStream().acknowledge(str, str2, strArr));
    }

    @Override // io.github.aooohan.mq.core.RedisMqPublisher
    public String publish(String str, Object obj) {
        return this.redisMqPublisher.publish(str, obj);
    }

    @Override // io.github.aooohan.mq.core.RedisMqOperation
    public void createGroupIfAbsent() {
        Boolean ifAbsent = this.redisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX + this.topic, this.groupName, 1L, TimeUnit.SECONDS);
        try {
            if (Boolean.TRUE.equals(this.redisTemplate.hasKey(this.topic))) {
                Stream map = this.redisTemplate.opsForStream().groups(this.topic).stream().map((v0) -> {
                    return v0.groupName();
                });
                String str = this.groupName;
                str.getClass();
                map.filter((v1) -> {
                    return r1.equals(v1);
                }).findFirst().orElseGet(() -> {
                    logger.info("MQ: create group [" + this.groupName + "] for topic [" + this.topic + "]");
                    return this.redisTemplate.opsForStream().createGroup(this.topic, this.groupName);
                });
            } else {
                this.redisTemplate.opsForStream().createGroup(this.topic, this.groupName);
                logger.info("MQ: create topic [" + this.topic + "]");
            }
            if (Boolean.TRUE.equals(ifAbsent)) {
                this.redisTemplate.delete(LOCK_PREFIX + this.topic);
            }
        } catch (Throwable th) {
            if (Boolean.TRUE.equals(ifAbsent)) {
                this.redisTemplate.delete(LOCK_PREFIX + this.topic);
            }
            throw th;
        }
    }

    @Override // io.github.aooohan.mq.core.RedisMqOperation
    public <T> T deserialize(String str) throws Exception {
        return (T) this.redisMqSerializer.deserialize(str, this.paramClass);
    }

    @Override // io.github.aooohan.mq.core.RedisMqErrorHandler
    public void onError(String str, Throwable th) {
        this.errorHandler.onError(str, th);
    }
}
