package io.github.aooohan.mq.core;

import io.github.aooohan.mq.core.container.ListenerEnvContainer;
import io.github.aooohan.mq.core.container.SpringDataRedisListenerEnvContainer;
import io.github.aooohan.mq.core.listener.RedisMqListener;
import io.github.aooohan.mq.factory.RedisMessageListenerFactory;
import io.github.aooohan.mq.serializer.RedisMqSerializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:io/github/aooohan/mq/core/RedisMqListenerContainer.class */
public class RedisMqListenerContainer implements SmartLifecycle, InitializingBean, ApplicationContextAware, BeanPostProcessor {
    private final RedisConnectionFactory connectionFactory;
    private ApplicationContext applicationContext;
    private RedisMessageListenerFactory redisMessageListenerFactory;
    private ScheduledFuture<?> transferPendingFuture;
    private Duration pollTimeout = Duration.ofSeconds(1);
    private Duration shutdownTimeout = Duration.ofMillis(1000);
    private RedisMqSerializer redisMqSerializer = RedisMqSerializer.DEFAULT;
    private final List<RedisMqListener<?>> redisMqListeners = new ArrayList();
    private final List<ListenerEnvContainer> listenerEnvContainer = new ArrayList();
    private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1, runnable -> {
        return new Thread(runnable, "mq-pending-t");
    });

    public RedisMqListenerContainer(RedisConnectionFactory redisConnectionFactory) {
        this.connectionFactory = redisConnectionFactory;
    }

    public void setMessageSerializer(RedisMqSerializer redisMqSerializer) {
        this.redisMqSerializer = redisMqSerializer;
    }

    public void setPollTimeout(Duration duration) {
        Assert.isTrue(duration.toMillis() > 0, "pollTimeout must be positive");
        this.pollTimeout = duration;
    }

    public void setShutdownTimeout(Duration duration) {
        this.shutdownTimeout = duration;
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
        this.redisMessageListenerFactory = RedisMessageListenerFactory.of(applicationContext);
    }

    public void start() {
        if (CollectionUtils.isEmpty(this.redisMqListeners)) {
            return;
        }
        StringRedisTemplate createRedisTemplate = createRedisTemplate(this.connectionFactory);
        for (RedisMqListener<?> redisMqListener : this.redisMqListeners) {
            SpringDataRedisListenerEnvContainer springDataRedisListenerEnvContainer = new SpringDataRedisListenerEnvContainer(new SpringDataRedisMqOperation(createRedisTemplate, this.redisMqSerializer, redisMqListener), this.connectionFactory, redisMqListener, this.shutdownTimeout, this.pollTimeout);
            springDataRedisListenerEnvContainer.start();
            this.listenerEnvContainer.add(springDataRedisListenerEnvContainer);
        }
        this.transferPendingFuture = this.scheduledExecutorService.scheduleWithFixedDelay(() -> {
            Iterator<ListenerEnvContainer> it = this.listenerEnvContainer.iterator();
            while (it.hasNext()) {
                it.next().transferPendingMsg();
            }
        }, 0L, 3L, TimeUnit.SECONDS);
    }

    private StringRedisTemplate createRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
        return new StringRedisTemplate(redisConnectionFactory);
    }

    public void stop() {
        if (CollectionUtils.isEmpty(this.listenerEnvContainer)) {
            return;
        }
        if (this.transferPendingFuture != null && !this.transferPendingFuture.isCancelled()) {
            this.transferPendingFuture.cancel(true);
            this.transferPendingFuture = null;
        }
        this.scheduledExecutorService.shutdown();
        this.listenerEnvContainer.forEach((v0) -> {
            v0.stop();
        });
    }

    public boolean isRunning() {
        if (CollectionUtils.isEmpty(this.listenerEnvContainer)) {
            return false;
        }
        Iterator<ListenerEnvContainer> it = this.listenerEnvContainer.iterator();
        while (it.hasNext()) {
            if (it.next().isRunning()) {
                return true;
            }
        }
        return false;
    }

    public Object postProcessBeforeInitialization(Object obj, String str) throws BeansException {
        List<RedisMqListener<?>> parseAnnotation = this.redisMessageListenerFactory.parseAnnotation(obj);
        if (!parseAnnotation.isEmpty()) {
            this.redisMqListeners.addAll(parseAnnotation);
        }
        return obj;
    }

    public void afterPropertiesSet() throws Exception {
        for (String str : this.applicationContext.getBeanNamesForType(RedisMqListener.class)) {
            this.redisMqListeners.add((RedisMqListener) this.applicationContext.getBean(str, RedisMqListener.class));
        }
    }
}
