package com.github.wz2coo.localqueue.spring.core;

import com.github.wz2coo.localqueue.spring.annotation.LocalQueueMessageListener;
import com.github.wz2coo.localqueue.spring.autoconfigure.LocalQueueProperties;
import com.github.wz2cool.localqueue.impl.SimpleConsumer;
import com.github.wz2cool.localqueue.model.config.SimpleConsumerConfig;
import com.github.wz2cool.localqueue.model.message.QueueMessage;
import java.io.File;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ConfigurableApplicationContext;

/* loaded from: input_file:com/github/wz2coo/localqueue/spring/core/LocalQueueMessageListenerContainer.class */
public class LocalQueueMessageListenerContainer {
    private final ListenerRegistry registry;
    private final LocalQueueProperties properties;
    private final ConfigurableApplicationContext context;
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final Map<String, ExecutorService> customerIdExecutors = new ConcurrentHashMap();
    private final Map<String, SimpleConsumer> consumerMap = new ConcurrentHashMap();

    public LocalQueueMessageListenerContainer(ListenerRegistry listenerRegistry, LocalQueueProperties localQueueProperties, ConfigurableApplicationContext configurableApplicationContext) {
        this.registry = listenerRegistry;
        this.properties = localQueueProperties;
        this.context = configurableApplicationContext;
    }

    public void start() {
        this.logger.info("[local-queue] start local queue listener container");
        for (String str : this.registry.getCustomerIds()) {
            LocalQueueListener customerHandler = this.registry.getCustomerHandler(str);
            LocalQueueMessageListener customerAnnotation = this.registry.getCustomerAnnotation(str);
            ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
            SimpleConsumer consumer = getConsumer(customerAnnotation);
            this.customerIdExecutors.put(str, newSingleThreadExecutor);
            this.consumerMap.put(str, consumer);
            newSingleThreadExecutor.execute(() -> {
                while (!Thread.currentThread().isInterrupted()) {
                    try {
                        List<QueueMessage> batchTake = consumer.batchTake(customerAnnotation.maxBatchSize());
                        customerHandler.onMessages(batchTake);
                        consumer.ack(batchTake);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    } catch (Exception e2) {
                        this.logger.error("[local-queue] consumer error", e2);
                    }
                }
            });
            this.logger.info("[local-queue] start listener container for customerId: {}, selectorTag: {}", str, customerAnnotation.selectorTag());
        }
    }

    public void stop() {
        this.logger.info("[local-queue] stop local queue listener container");
        Iterator<Map.Entry<String, SimpleConsumer>> it = this.consumerMap.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().close();
        }
        Iterator<Map.Entry<String, ExecutorService>> it2 = this.customerIdExecutors.entrySet().iterator();
        while (it2.hasNext()) {
            it2.next().getValue().shutdownNow();
        }
    }

    private SimpleConsumer getConsumer(LocalQueueMessageListener localQueueMessageListener) {
        return new SimpleConsumer(new SimpleConsumerConfig.Builder().setConsumerId(localQueueMessageListener.customerId()).setDataDir(new File(this.properties.getConsumer().getDataDir())).setSelectTag(localQueueMessageListener.selectorTag()).setPullInterval(localQueueMessageListener.pullInterval()).build());
    }
}
