package easier.framework.starter.mq;

import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.extra.spring.EnableSpringUtil;
import easier.framework.core.plugin.mq.MQ;
import easier.framework.core.plugin.mq.annotation.MQListener;
import easier.framework.core.util.SpringUtil;
import easier.framework.starter.cache.EnableEasierCache;
import easier.framework.starter.cache.redis.RedissonClients;
import easier.framework.starter.mq.builder.MQBuilderInvoker;
import easier.framework.starter.mq.builder.MQMethodDetail;
import easier.framework.starter.mq.kafka.KafkaConsumers;
import easier.framework.starter.mq.kafka.KafkaProducers;
import easier.framework.starter.mq.listener.KafkaTopicListener;
import easier.framework.starter.mq.listener.RedisQueueListener;
import easier.framework.starter.mq.listener.RedisTopicListener;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Map;
import org.redisson.api.RQueue;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

@EnableSpringUtil
@EnableConfigurationProperties({EasierMQProperties.class})
@Configuration(proxyBeanMethods = false)
@EnableEasierCache
@Import({MQBuilderInvoker.class})
/* loaded from: input_file:easier/framework/starter/mq/EasierMQAutoConfiguration.class */
public class EasierMQAutoConfiguration implements ApplicationListener<ApplicationReadyEvent> {
    private static final Logger log = LoggerFactory.getLogger(EasierMQAutoConfiguration.class);

    @Bean
    public KafkaProducers kafkaProducers(EasierMQProperties easierMQProperties) {
        KafkaProducers kafkaProducers = new KafkaProducers();
        kafkaProducers.init(easierMQProperties);
        return kafkaProducers;
    }

    @Bean
    public KafkaConsumers kafkaConsumers(EasierMQProperties easierMQProperties) {
        KafkaConsumers kafkaConsumers = new KafkaConsumers();
        kafkaConsumers.init(easierMQProperties);
        return kafkaConsumers;
    }

    public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
        SpringUtil.getMethodByAnnotation(MQListener.class).forEach(this::initMQListener);
    }

    private void initMQListener(Object obj, Map<Method, MQListener> map) {
        map.forEach((method, mQListener) -> {
            initMQListener(obj, method, mQListener);
        });
    }

    private void initMQListener(Object obj, Method method, MQListener mQListener) {
        MQMethodDetail mQMethodDetail = getMQMethodDetail(method);
        if (mQMethodDetail == null) {
            return;
        }
        if (mQMethodDetail.isRedis() && mQMethodDetail.isQueue()) {
            initRedisQueueListener(obj, method, mQListener, mQMethodDetail);
        }
        if (mQMethodDetail.isRedis() && mQMethodDetail.isTopic()) {
            initRedisTopicListener(obj, method, mQListener, mQMethodDetail);
        }
        if (mQMethodDetail.isKafka() && mQMethodDetail.isTopic()) {
            initKafkaTopicListener(obj, method, mQListener, mQMethodDetail);
        }
    }

    private void initRedisQueueListener(Object obj, Method method, MQListener mQListener, MQMethodDetail mQMethodDetail) {
        RQueue<Object> queue = ((RedissonClients) SpringUtil.getAndCache(RedissonClients.class)).getClient(mQMethodDetail.getQueue().source()).getQueue(mQMethodDetail.getQueue().name());
        ThreadPoolTaskScheduler scheduler = SpringUtil.getScheduler();
        for (int i = 0; i < mQListener.concurrency(); i++) {
            scheduler.scheduleWithFixedDelay(RedisQueueListener.builder().bean(obj).method(method).mqListener(mQListener).methodDetail(mQMethodDetail).queue(queue).build().init(), mQListener.timeUnit().toMillis(mQListener.delay()));
        }
    }

    private void initRedisTopicListener(Object obj, Method method, MQListener mQListener, MQMethodDetail mQMethodDetail) {
        String name = mQMethodDetail.getTopic().name();
        RedissonClient client = ((RedissonClients) SpringUtil.getAndCache(RedissonClients.class)).getClient(mQMethodDetail.getTopic().source());
        client.getTopic(name).addListener(Object.class, RedisTopicListener.builder().bean(obj).method(method).mqListener(mQListener).methodDetail(mQMethodDetail).build().init());
    }

    private void initKafkaTopicListener(Object obj, Method method, MQListener mQListener, MQMethodDetail mQMethodDetail) {
        ThreadPoolTaskScheduler scheduler = SpringUtil.getScheduler();
        for (int i = 0; i < mQListener.concurrency(); i++) {
            scheduler.scheduleWithFixedDelay(KafkaTopicListener.builder().bean(obj).method(method).mqListener(mQListener).methodDetail(mQMethodDetail).consumer(((KafkaConsumers) SpringUtil.getAndCache(KafkaConsumers.class)).create(mQMethodDetail.getTopic().source())).build().init(), mQListener.timeUnit().toMillis(mQListener.delay()));
        }
    }

    private MQMethodDetail getMQMethodDetail(Method method) {
        Class cls = (Class) Arrays.stream(method.getDeclaringClass().getInterfaces()).filter(cls2 -> {
            return ArrayUtil.contains(cls2.getInterfaces(), MQ.class);
        }).findAny().orElse(null);
        if (cls == null) {
            return null;
        }
        return new MQMethodDetail(ReflectUtil.getMethod(cls, method.getName(), method.getParameterTypes()));
    }
}
