package io.github.aooohan.mq.core;

import io.github.aooohan.mq.core.listener.RedisMqListener;
import io.github.aooohan.mq.entity.MsgDeliver;
import io.github.aooohan.mq.serializer.RedisMqSerializer;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.support.collections.DefaultRedisSet;
import org.springframework.data.redis.support.collections.RedisSet;
import org.springframework.util.Assert;

/* loaded from: input_file:io/github/aooohan/mq/core/SpringDataRedisMqOperation.class */
public class SpringDataRedisMqOperation extends AbstractRedisMqOperation implements RedisMqOperation {
    private static final String CUR_HANDLE_MSG_IDS = "@__redis-mq:cur-handler-msg-ids:";
    private final StringRedisTemplate redisTemplate;
    private final RedisSet<String> curHandlerMsgIds;

    public SpringDataRedisMqOperation(StringRedisTemplate stringRedisTemplate, RedisMqSerializer redisMqSerializer, RedisMqListener<?> redisMqListener) {
        super(redisMqSerializer, redisMqListener, new DefaultRedisMqPublisher(redisMqSerializer, stringRedisTemplate));
        Assert.notNull(stringRedisTemplate, "redisTemplate can not be null");
        Assert.notNull(redisMqSerializer, "messageSerializer can not be null");
        Assert.notNull(redisMqListener, "listener can not be null");
        this.redisTemplate = stringRedisTemplate;
        this.curHandlerMsgIds = new DefaultRedisSet(CUR_HANDLE_MSG_IDS + this.listenerMetadataExtractor.topicName(), stringRedisTemplate);
    }

    @Override // io.github.aooohan.mq.core.RedisMqAck
    public boolean ack(String str, String str2, String str3) {
        StreamOperations opsForStream = this.redisTemplate.opsForStream();
        Long acknowledge = opsForStream.acknowledge(str, str2, new String[]{str3});
        opsForStream.delete(str, new String[]{str3});
        Long l = 1L;
        return l.equals(acknowledge);
    }

    @Override // io.github.aooohan.mq.core.RedisMqOperation
    public void createGroupIfAbsent() {
        String str = this.listenerMetadataExtractor.topicName();
        String groupName = this.listenerMetadataExtractor.groupName();
        Boolean ifAbsent = this.redisTemplate.opsForValue().setIfAbsent("@__redis-mq:lock:" + str + ":" + groupName, groupName, 1L, TimeUnit.SECONDS);
        try {
            StreamOperations opsForStream = this.redisTemplate.opsForStream();
            if (Boolean.TRUE.equals(this.redisTemplate.hasKey(str))) {
                Stream map = opsForStream.groups(str).stream().map((v0) -> {
                    return v0.groupName();
                });
                groupName.getClass();
                map.filter((v1) -> {
                    return r1.equals(v1);
                }).findFirst().orElseGet(() -> {
                    this.logger.info("MQ: create group [" + groupName + "] for topic [" + str + "]");
                    return opsForStream.createGroup(str, groupName);
                });
            } else {
                opsForStream.createGroup(str, groupName);
                this.logger.info("MQ: create topic [" + str + "]");
            }
            if (Boolean.TRUE.equals(ifAbsent)) {
                this.redisTemplate.delete("@__redis-mq:lock:" + str);
            }
        } catch (Throwable th) {
            if (Boolean.TRUE.equals(ifAbsent)) {
                this.redisTemplate.delete("@__redis-mq:lock:" + str);
            }
            throw th;
        }
    }

    @Override // io.github.aooohan.mq.core.RedisMqOperation
    public void transferPendingMsg() {
        String groupName = this.listenerMetadataExtractor.groupName();
        String str = this.listenerMetadataExtractor.topicName();
        StreamOperations opsForStream = this.redisTemplate.opsForStream();
        Boolean ifAbsent = this.redisTemplate.opsForValue().setIfAbsent("@__redis-mq:pending:lock:" + str, str, 1L, TimeUnit.SECONDS);
        if (Boolean.FALSE.equals(ifAbsent)) {
            return;
        }
        try {
            opsForStream.pending(str, groupName, Range.unbounded(), 10L).forEach(pendingMessage -> {
                RecordId id = pendingMessage.getId();
                if (pendingMessage.getElapsedTimeSinceLastDelivery().getSeconds() < 600) {
                    return;
                }
                if (this.curHandlerMsgIds.contains(id.getValue())) {
                    this.logger.warn("MQ: message [" + id.getValue() + "] has been handled, so ignore it");
                } else {
                    opsForStream.range(str, Range.closed(id.getValue(), id.getValue())).forEach(mapRecord -> {
                        MsgDeliver msgDeliver = new MsgDeliver(id.getValue(), (Map) mapRecord.getValue());
                        int retryTimesIfFailed = this.listener.retryTimesIfFailed();
                        Long deliveryCount = msgDeliver.getDeliveryCount();
                        if (deliveryCount.longValue() > retryTimesIfFailed) {
                            this.logger.warn("MQ: message [" + id.getValue() + "] has been re-delivery [" + deliveryCount + "] times, so discard it");
                            ack(str, groupName, id.getValue());
                            return;
                        }
                        ack(str, groupName, msgDeliver.getId());
                        if (!msgDeliver.isValid()) {
                            this.logger.warn("MQ: message [" + msgDeliver + "] has been re-delivery [" + deliveryCount + "] times, but still invalid, so discard it");
                        } else {
                            this.logger.warn("MQ: re-delivery message [" + msgDeliver + "] to topic[" + str + "]");
                            publish(str, msgDeliver);
                        }
                    });
                }
            });
            if (Boolean.TRUE.equals(ifAbsent)) {
                this.redisTemplate.delete("@__redis-mq:pending:lock:" + str);
            }
        } catch (Throwable th) {
            if (Boolean.TRUE.equals(ifAbsent)) {
                this.redisTemplate.delete("@__redis-mq:pending:lock:" + str);
            }
            throw th;
        }
    }

    @Override // io.github.aooohan.mq.core.RedisMqOperation
    public Set<String> getCurHandleMsgIds() {
        return this.curHandlerMsgIds;
    }
}
