package com.jeesuite.amqp.redis;

import com.jeesuite.amqp.MQConsumer;
import com.jeesuite.amqp.MQContext;
import com.jeesuite.amqp.MessageHandler;
import com.jeesuite.common.async.StandardThreadExecutor;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.Validate;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

/* loaded from: input_file:com/jeesuite/amqp/redis/RedisConsumerAdapter.class */
public class RedisConsumerAdapter implements MQConsumer {
    private RedisConnectionFactory connectionFactory;
    private RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    private ThreadPoolExecutor fetchExecutor;
    private StandardThreadExecutor asyncProcessExecutor;
    private Map<String, MessageHandler> messageHandlers;

    public RedisConsumerAdapter(StringRedisTemplate stringRedisTemplate, Map<String, MessageHandler> map) {
        this.messageHandlers = new HashMap();
        Validate.notNull(stringRedisTemplate, "can't load bean [redisTemplate]", new Object[0]);
        this.connectionFactory = stringRedisTemplate.getConnectionFactory();
        this.messageHandlers = map;
    }

    @Override // com.jeesuite.amqp.MQConsumer
    public void start() throws Exception {
        int maxProcessThreads = MQContext.getMaxProcessThreads();
        this.fetchExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.SECONDS, (BlockingQueue<Runnable>) new LinkedBlockingQueue(), (ThreadFactory) new StandardThreadExecutor.StandardThreadFactory("messageFetcher"));
        this.asyncProcessExecutor = new StandardThreadExecutor(1, maxProcessThreads, 60L, TimeUnit.SECONDS, 1000, new StandardThreadExecutor.StandardThreadFactory("messageAsyncProcessor"));
        this.container.setConnectionFactory(this.connectionFactory);
        this.container.setSubscriptionExecutor(this.fetchExecutor);
        this.container.setTaskExecutor(this.asyncProcessExecutor);
        for (String str : this.messageHandlers.keySet()) {
            MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new MessageHandlerDelegate(str, this.messageHandlers.get(str)), "onMessage");
            messageListenerAdapter.afterPropertiesSet();
            this.container.addMessageListener(messageListenerAdapter, new PatternTopic(str));
        }
        this.container.afterPropertiesSet();
        this.container.start();
    }

    @Override // com.jeesuite.amqp.MQConsumer
    public void shutdown() {
        this.fetchExecutor.shutdown();
        this.asyncProcessExecutor.shutdown();
        this.container.stop();
        try {
            this.container.destroy();
        } catch (Exception e) {
        }
    }
}
