package io.github.aooohan.mq.core;

import io.github.aooohan.mq.core.listener.RedisMqListener;
import io.github.aooohan.mq.core.wrapper.StreamMessageListenerContainerWrapper;
import io.github.aooohan.mq.factory.RedisMessageListenerFactory;
import io.github.aooohan.mq.serializer.RedisMqSerializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.Lifecycle;
import org.springframework.context.SmartLifecycle;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:io/github/aooohan/mq/core/RedisMqListenerContainer.class */
public class RedisMqListenerContainer implements SmartLifecycle, InitializingBean, ApplicationContextAware, BeanPostProcessor {
    private final RedisConnectionFactory connectionFactory;
    private ApplicationContext applicationContext;
    private Duration pollTimeout = Duration.ofSeconds(1);
    private Duration shutdownTimeout = Duration.ofMillis(1000);
    private RedisMqSerializer redisMqSerializer = RedisMqSerializer.DEFAULT;
    private final List<RedisMqListener<?>> redisMqListeners = new ArrayList();
    private final List<Lifecycle> listenerLifecycle = new ArrayList();

    public RedisMqListenerContainer(RedisConnectionFactory redisConnectionFactory) {
        this.connectionFactory = redisConnectionFactory;
    }

    public void setMessageSerializer(RedisMqSerializer redisMqSerializer) {
        this.redisMqSerializer = redisMqSerializer;
    }

    public void setPollTimeout(Duration duration) {
        this.pollTimeout = duration;
    }

    public void setShutdownTimeout(Duration duration) {
        this.shutdownTimeout = duration;
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public void start() {
        if (CollectionUtils.isEmpty(this.redisMqListeners)) {
            return;
        }
        RedisTemplate<String, String> createRedisTemplate = createRedisTemplate(this.connectionFactory);
        for (RedisMqListener<?> redisMqListener : this.redisMqListeners) {
            StreamMessageListenerContainerWrapper.ContainerConfig containerConfig = new StreamMessageListenerContainerWrapper.ContainerConfig();
            containerConfig.setListener(redisMqListener);
            containerConfig.setMessageSerializer(this.redisMqSerializer);
            containerConfig.setPollTime(this.pollTimeout);
            containerConfig.setShutdownTimeout(this.shutdownTimeout);
            containerConfig.setRedisTemplate(createRedisTemplate);
            StreamMessageListenerContainerWrapper streamMessageListenerContainerWrapper = new StreamMessageListenerContainerWrapper(containerConfig);
            streamMessageListenerContainerWrapper.start();
            this.listenerLifecycle.add(streamMessageListenerContainerWrapper);
        }
    }

    private RedisTemplate<String, String> createRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();
        redisTemplate.setKeySerializer(RedisSerializer.string());
        redisTemplate.setValueSerializer(RedisSerializer.string());
        redisTemplate.setHashKeySerializer(RedisSerializer.string());
        redisTemplate.setHashValueSerializer(RedisSerializer.string());
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

    public void stop() {
        if (CollectionUtils.isEmpty(this.listenerLifecycle)) {
            return;
        }
        this.listenerLifecycle.forEach((v0) -> {
            v0.stop();
        });
    }

    public boolean isRunning() {
        if (CollectionUtils.isEmpty(this.listenerLifecycle)) {
            return false;
        }
        Iterator<Lifecycle> it = this.listenerLifecycle.iterator();
        while (it.hasNext()) {
            if (it.next().isRunning()) {
                return true;
            }
        }
        return false;
    }

    public Object postProcessBeforeInitialization(Object obj, String str) throws BeansException {
        List<RedisMqListener<?>> parseAnnotation = RedisMessageListenerFactory.of(this.applicationContext).parseAnnotation(obj);
        if (!parseAnnotation.isEmpty()) {
            Iterator<RedisMqListener<?>> it = parseAnnotation.iterator();
            while (it.hasNext()) {
                this.redisMqListeners.add(it.next());
            }
        }
        return obj;
    }

    public void afterPropertiesSet() throws Exception {
        for (String str : this.applicationContext.getBeanNamesForType(RedisMqListener.class)) {
            this.redisMqListeners.add((RedisMqListener) this.applicationContext.getBean(str, RedisMqListener.class));
        }
    }
}
