package io.github.aooohan.mq.core.container;

import io.github.aooohan.mq.adapter.StreamErrorHandlerAdapter;
import io.github.aooohan.mq.adapter.StreamListenerAdapter;
import io.github.aooohan.mq.core.RedisMqOperation;
import io.github.aooohan.mq.core.invoker.DefaultListenerInvoker;
import io.github.aooohan.mq.core.listener.ConcurrentRedisMqListener;
import io.github.aooohan.mq.core.listener.RedisMqListener;
import io.github.aooohan.mq.core.metadata.ListenerMetadataExtractor;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import org.springframework.util.Assert;

/* loaded from: input_file:io/github/aooohan/mq/core/container/SpringDataRedisListenerEnvContainer.class */
public class SpringDataRedisListenerEnvContainer implements ListenerEnvContainer {
    private static final Log logger = LogFactory.getLog(SpringDataRedisListenerEnvContainer.class);
    private final StreamMessageListenerContainer<String, MapRecord<String, String, String>> container;
    private final RedisMqListener<?> listener;
    private final ListenerMetadataExtractor listenerMetadataExtractor;
    private final RedisMqOperation redisMqOperation;
    private final Duration shutdownTimeout;
    private ExecutorService executor;
    private boolean poolNeedClose;
    private final List<Subscription> subscriptions;
    protected boolean running = false;

    public SpringDataRedisListenerEnvContainer(RedisMqOperation redisMqOperation, RedisConnectionFactory redisConnectionFactory, RedisMqListener<?> redisMqListener, Duration duration, Duration duration2) {
        Assert.notNull(redisMqOperation, "operation can not be null");
        Assert.notNull(redisConnectionFactory, "connectionFactory can not be null");
        Assert.notNull(duration, "shutdownTimeout can not be null");
        Assert.notNull(duration2, "pollTime can not be null");
        this.listener = redisMqListener;
        this.shutdownTimeout = duration;
        this.redisMqOperation = redisMqOperation;
        this.listenerMetadataExtractor = redisMqOperation.getListenerMetadataExtractor();
        StreamListenerAdapter streamListenerAdapter = new StreamListenerAdapter(new DefaultListenerInvoker(redisMqOperation));
        StreamMessageListenerContainer.StreamMessageListenerContainerOptionsBuilder keySerializer = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().errorHandler(new StreamErrorHandlerAdapter(redisMqOperation)).batchSize(redisMqListener.batchSize()).pollTimeout(duration2).keySerializer(RedisSerializer.string());
        if (redisMqListener instanceof ConcurrentRedisMqListener) {
            this.executor = ((ConcurrentRedisMqListener) redisMqListener).executor();
            this.poolNeedClose = ((ConcurrentRedisMqListener) redisMqListener).poolNeedClose();
            if (this.executor != null) {
                keySerializer.executor(this.executor);
            }
        }
        this.container = StreamMessageListenerContainer.create((RedisConnectionFactory) Objects.requireNonNull(redisConnectionFactory), keySerializer.build());
        this.subscriptions = new ArrayList(redisMqListener.consumerSize());
        String consumerName = this.listenerMetadataExtractor.getConsumerName();
        String groupName = this.listenerMetadataExtractor.groupName();
        String str = this.listenerMetadataExtractor.topicName();
        for (int i = 0; i < redisMqListener.consumerSize(); i++) {
            String str2 = consumerName + "-" + i;
            this.subscriptions.add(this.container.receive(Consumer.from(groupName, str2), StreamOffset.create(str, ReadOffset.lastConsumed()), streamListenerAdapter));
            logger.info("MQ: add listener: topic[" + str + "], group[" + groupName + "], consumer[" + str2 + "]");
        }
    }

    private Executor getExecutor() {
        return this.executor == null ? new SimpleAsyncTaskExecutor() : this.executor;
    }

    public void start() {
        if (this.running) {
            return;
        }
        this.redisMqOperation.createGroupIfAbsent();
        this.container.start();
        this.running = true;
        logger.info("MQ: run listener [" + this.listenerMetadataExtractor.getConsumerName() + "], info:[" + this.listener.topicName() + "-" + this.listener.groupName() + "]");
    }

    public void stop() {
        logger.info("MQ: destroy listener [" + this.listenerMetadataExtractor.getConsumerName() + "], info:[" + this.listener.topicName() + "-" + this.listener.groupName() + "]");
        this.subscriptions.stream().map(subscription -> {
            return CompletableFuture.runAsync(() -> {
                subscription.cancel();
                while (subscription.isActive()) {
                    try {
                        Thread.sleep(50L);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }, getExecutor());
        }).forEach(completableFuture -> {
            try {
                completableFuture.get(this.shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (ExecutionException | TimeoutException e2) {
            }
        });
        this.container.stop();
        if (this.poolNeedClose) {
            this.executor.shutdown();
            logger.info("MQ: shutdown pool, topic[" + this.listener.topicName() + "], group[" + this.listener.groupName() + "]");
        }
    }

    public boolean isRunning() {
        return this.running;
    }

    @Override // io.github.aooohan.mq.core.container.ListenerEnvContainer
    public void transferPendingMsg() {
        this.redisMqOperation.transferPendingMsg();
    }
}
