package com.github.liaomengge.base_common.mq.rabbitmq.receiver;

import com.github.liaomengge.base_common.mq.consts.MQConst;
import com.github.liaomengge.base_common.mq.rabbitmq.AbstractMQReceiver;
import com.github.liaomengge.base_common.utils.thread.LyThreadPoolExecutorUtil;
import java.util.Arrays;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.aopalliance.aop.Advice;
import org.apache.commons.lang3.ArrayUtils;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.InitializingBean;

/* loaded from: input_file:com/github/liaomengge/base_common/mq/rabbitmq/receiver/BaseMQReceiver.class */
public abstract class BaseMQReceiver extends AbstractMQReceiver implements InitializingBean {
    private CachingConnectionFactory connectionFactory;
    private int concurrentConsumers;
    private int maxConcurrentConsumers;
    private int prefetchCount;
    private long receiveTimeout;
    private Advice[] adviceChain;
    private SimpleMessageListenerContainer simpleMessageListenerContainer;

    public BaseMQReceiver(CachingConnectionFactory cachingConnectionFactory, int i, int i2, int i3) {
        this(cachingConnectionFactory, i, i2, i3, MQConst.DEFAULT_RECEIVE_TIMEOUT);
    }

    public BaseMQReceiver(CachingConnectionFactory cachingConnectionFactory, int i, int i2, int i3, long j) {
        this(cachingConnectionFactory, i, i2, i3, j, null);
    }

    public BaseMQReceiver(CachingConnectionFactory cachingConnectionFactory, int i, int i2, int i3, long j, Advice[] adviceArr) {
        this.connectionFactory = cachingConnectionFactory;
        this.concurrentConsumers = i;
        this.maxConcurrentConsumers = i2;
        this.prefetchCount = i3;
        this.receiveTimeout = j;
        this.adviceChain = adviceArr;
        this.simpleMessageListenerContainer = new SimpleMessageListenerContainer();
        this.simpleMessageListenerContainer.setConnectionFactory(this.connectionFactory);
        this.simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        this.simpleMessageListenerContainer.setConcurrentConsumers(this.concurrentConsumers);
        this.simpleMessageListenerContainer.setMaxConcurrentConsumers(this.maxConcurrentConsumers);
        this.simpleMessageListenerContainer.setPrefetchCount(this.prefetchCount);
        this.simpleMessageListenerContainer.setReceiveTimeout(this.receiveTimeout);
        if (ArrayUtils.isNotEmpty(this.adviceChain)) {
            this.simpleMessageListenerContainer.setAdviceChain(this.adviceChain);
        }
    }

    protected abstract ChannelAwareMessageListener buildMessageListener();

    protected abstract String[] buildQueueNames();

    @Override // com.github.liaomengge.base_common.mq.rabbitmq.AbstractMQReceiver
    public void start() {
        this.simpleMessageListenerContainer.initialize();
        this.simpleMessageListenerContainer.start();
    }

    @Override // com.github.liaomengge.base_common.mq.rabbitmq.AbstractMQReceiver
    public void stop() {
        if (this.simpleMessageListenerContainer.isRunning()) {
            this.simpleMessageListenerContainer.stop(() -> {
                log.info("队列[{}]监听器已经停止...", Arrays.toString(this.simpleMessageListenerContainer.getQueueNames()));
            });
        }
    }

    public void destroy() {
        this.simpleMessageListenerContainer.destroy();
    }

    public void afterPropertiesSet() throws Exception {
        this.simpleMessageListenerContainer.setMessageListener(buildMessageListener());
        this.simpleMessageListenerContainer.setQueueNames(buildQueueNames());
        if (this.consumerExecutor != null) {
            this.simpleMessageListenerContainer.setTaskExecutor(this.consumerExecutor);
        } else {
            this.consumerExecutor = LyThreadPoolExecutorUtil.buildThreadPool(this.maxConcurrentConsumers, this.concurrentConsumers + this.maxConcurrentConsumers, "rabbitmq-consumer", 30L, TimeUnit.SECONDS, new LinkedBlockingQueue(16));
            this.simpleMessageListenerContainer.setTaskExecutor(this.consumerExecutor);
        }
        super.registerShutdownHook(this.simpleMessageListenerContainer);
    }
}
