package com.fqm.framework.mq.config;

import com.fqm.framework.common.core.util.system.SystemUtil;
import com.fqm.framework.common.redis.listener.spring.KeyExpiredEventMessageListener;
import com.fqm.framework.mq.MqFactory;
import com.fqm.framework.mq.MqMode;
import com.fqm.framework.mq.annotation.MqListenerAnnotationBeanPostProcessor;
import com.fqm.framework.mq.listener.MqListenerParam;
import com.fqm.framework.mq.listener.MqRedisKeyExpiredEventHandle;
import com.fqm.framework.mq.listener.RedisMqListener;
import com.fqm.framework.mq.redis.StreamInfo;
import com.fqm.framework.mq.scripts.LuaScriptUtil;
import com.fqm.framework.mq.tasker.RedisMqDeadMessageTasker;
import com.fqm.framework.mq.template.RedisMqTemplate;
import java.time.Duration;
import java.util.HashSet;
import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

@Configuration
@ConditionalOnBean({MqProperties.class})
@AutoConfigureAfter({MqAutoConfiguration.class})
/* loaded from: input_file:com/fqm/framework/mq/config/RedisMqAutoConfiguration.class */
public class RedisMqAutoConfiguration {
    private Logger logger = LoggerFactory.getLogger(getClass());

    @ConditionalOnMissingBean
    @Bean
    @Order(200)
    RedisMqTemplate redisMqTemplate(MqFactory mqFactory, StringRedisTemplate stringRedisTemplate) {
        RedisMqTemplate redisMqTemplate = new RedisMqTemplate(stringRedisTemplate);
        mqFactory.addMqTemplate(redisMqTemplate);
        return redisMqTemplate;
    }

    @Bean(initMethod = "start", destroyMethod = "stop")
    StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, MqListenerAnnotationBeanPostProcessor mqListenerAnnotationBeanPostProcessor, StringRedisTemplate stringRedisTemplate, MqProperties mqProperties) {
        StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer = null;
        if (mqListenerAnnotationBeanPostProcessor.getListeners() != null && !mqListenerAnnotationBeanPostProcessor.getListeners().isEmpty()) {
            streamMessageListenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().batchSize(10).errorHandler((v0) -> {
                v0.printStackTrace();
            }).pollTimeout(Duration.ZERO).serializer(new StringRedisSerializer()).build());
            for (MqListenerParam mqListenerParam : mqListenerAnnotationBeanPostProcessor.getListeners()) {
                MqConfigurationProperties mqConfigurationProperties = (MqConfigurationProperties) mqProperties.getMqs().get(mqListenerParam.getName());
                if (mqConfigurationProperties != null && MqMode.REDIS.equalMode(mqConfigurationProperties.getBinder())) {
                    buildListener(stringRedisTemplate, streamMessageListenerContainer, mqListenerParam, mqConfigurationProperties);
                }
            }
        }
        return streamMessageListenerContainer;
    }

    private void buildListener(StringRedisTemplate stringRedisTemplate, StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer, MqListenerParam mqListenerParam, MqConfigurationProperties mqConfigurationProperties) {
        boolean z = true;
        String group = mqConfigurationProperties.getGroup();
        String topic = mqConfigurationProperties.getTopic();
        Assert.isTrue(StringUtils.hasText(group), "Please specific [group] under mq configuration.");
        Assert.isTrue(StringUtils.hasText(topic), "Please specific [topic] under mq configuration.");
        try {
            StreamInfo.InfoGroups infoGroups = LuaScriptUtil.getInfoGroups(topic, stringRedisTemplate);
            if (infoGroups != null) {
                Iterator it = infoGroups.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    } else if (((StreamInfo.InfoGroup) it.next()).groupName().equals(group)) {
                        z = false;
                        break;
                    }
                }
            }
        } catch (Exception e) {
            this.logger.debug("createGroup error", e);
        }
        if (z) {
            try {
                LuaScriptUtil.createGroup(topic, ReadOffset.from("0"), group, stringRedisTemplate);
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
        streamMessageListenerContainer.register(StreamMessageListenerContainer.StreamReadRequest.builder(StreamOffset.create(topic, ReadOffset.lastConsumed())).consumer(Consumer.from(group, buildConsumerName())).autoAcknowledge(false).cancelOnError(th -> {
            return false;
        }).build(), new RedisMqListener(mqListenerParam.getBean(), mqListenerParam.getMethod(), stringRedisTemplate, topic, group));
        this.logger.info("Init RedisMqListener,bean={},method={},topic={},group={}", new Object[]{mqListenerParam.getBean().getClass(), mqListenerParam.getMethod().getName(), topic, group});
    }

    @Bean(initMethod = "start", destroyMethod = "stop")
    RedisMqDeadMessageTasker redisMqDeadMessageTasker(MqListenerAnnotationBeanPostProcessor mqListenerAnnotationBeanPostProcessor, StringRedisTemplate stringRedisTemplate, MqProperties mqProperties) {
        HashSet hashSet = new HashSet();
        Iterator it = mqListenerAnnotationBeanPostProcessor.getListeners().iterator();
        while (it.hasNext()) {
            if (((MqConfigurationProperties) mqProperties.getMqs().get(((MqListenerParam) it.next()).getName())) == null) {
                Iterator it2 = mqProperties.getMqs().values().iterator();
                while (true) {
                    if (it2.hasNext()) {
                        MqConfigurationProperties mqConfigurationProperties = (MqConfigurationProperties) it2.next();
                        if (MqMode.REDIS.equalMode(mqConfigurationProperties.getBinder())) {
                            hashSet.add(mqConfigurationProperties.getTopic());
                            break;
                        }
                    }
                }
            }
        }
        return new RedisMqDeadMessageTasker(stringRedisTemplate, hashSet, 1L, 60L);
    }

    private static String buildConsumerName() {
        return String.format("%s@%d", SystemUtil.getHostAddress(), Long.valueOf(SystemUtil.getCurrentPid()));
    }

    @ConditionalOnMissingBean({RedisMessageListenerContainer.class})
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory) {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
        return redisMessageListenerContainer;
    }

    @ConditionalOnMissingBean({KeyExpiredEventMessageListener.class})
    @Bean
    KeyExpiredEventMessageListener keyExpiredEventMessageListener(RedisMessageListenerContainer redisMessageListenerContainer) {
        return new KeyExpiredEventMessageListener(redisMessageListenerContainer);
    }

    @ConditionalOnMissingBean({MqRedisKeyExpiredEventHandle.class})
    @Bean
    MqRedisKeyExpiredEventHandle mqRedisKeyExpiredEventHandle(StringRedisTemplate stringRedisTemplate) {
        return new MqRedisKeyExpiredEventHandle(stringRedisTemplate);
    }
}
