package io.github.aooohan.mq.core.wrapper;

import io.github.aooohan.mq.adapter.StreamErrorHandlerAdapter;
import io.github.aooohan.mq.adapter.StreamListenerAdapter;
import io.github.aooohan.mq.core.DefaultRedisMqOperation;
import io.github.aooohan.mq.core.listener.ConcurrentRedisMqListener;
import io.github.aooohan.mq.core.listener.RedisMqListener;
import io.github.aooohan.mq.serializer.RedisMqSerializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.context.Lifecycle;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.hash.ObjectHashMapper;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import org.springframework.util.Assert;

/* loaded from: input_file:io/github/aooohan/mq/core/wrapper/StreamMessageListenerContainerWrapper.class */
public class StreamMessageListenerContainerWrapper implements Lifecycle {
    private static final Log logger = LogFactory.getLog(StreamMessageListenerContainerWrapper.class);
    private final StreamMessageListenerContainer<String, ObjectRecord<String, String>> container;
    private final RedisMqListener<?> listener;
    private final Duration shutdownTimeout;
    private ExecutorService executor;
    private boolean poolNeedClose;
    private final List<Subscription> subscriptions;
    protected boolean running = false;

    /* loaded from: input_file:io/github/aooohan/mq/core/wrapper/StreamMessageListenerContainerWrapper$ContainerConfig.class */
    public static class ContainerConfig {
        private RedisMqListener<?> listener;
        private RedisTemplate<String, String> redisTemplate;
        private Duration pollTime;
        private Duration shutdownTimeout;
        private RedisMqSerializer redisMqSerializer;

        public RedisMqListener<?> getListener() {
            return this.listener;
        }

        public void setListener(RedisMqListener<?> redisMqListener) {
            this.listener = redisMqListener;
        }

        public RedisTemplate<String, String> getRedisTemplate() {
            return this.redisTemplate;
        }

        public void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
            this.redisTemplate = redisTemplate;
        }

        public Duration getPollTime() {
            return this.pollTime;
        }

        public void setPollTime(Duration duration) {
            this.pollTime = duration;
        }

        public Duration getShutdownTimeout() {
            return this.shutdownTimeout;
        }

        public void setShutdownTimeout(Duration duration) {
            this.shutdownTimeout = duration;
        }

        public RedisMqSerializer getMessageSerializer() {
            return this.redisMqSerializer;
        }

        public void setMessageSerializer(RedisMqSerializer redisMqSerializer) {
            this.redisMqSerializer = redisMqSerializer;
        }
    }

    public StreamMessageListenerContainerWrapper(ContainerConfig containerConfig) {
        Assert.notNull(containerConfig, "config must not be null");
        this.listener = containerConfig.getListener();
        this.shutdownTimeout = containerConfig.getShutdownTimeout();
        RedisTemplate<String, String> redisTemplate = containerConfig.getRedisTemplate();
        DefaultRedisMqOperation defaultRedisMqOperation = new DefaultRedisMqOperation(redisTemplate, containerConfig.getMessageSerializer(), this.listener);
        StreamMessageListenerContainer.StreamMessageListenerContainerOptionsBuilder targetType = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().errorHandler(new StreamErrorHandlerAdapter(defaultRedisMqOperation)).pollTimeout(containerConfig.getPollTime()).keySerializer(RedisSerializer.string()).hashValueSerializer(RedisSerializer.string()).hashKeySerializer(RedisSerializer.string()).objectMapper(new ObjectHashMapper()).targetType(String.class);
        if (this.listener instanceof ConcurrentRedisMqListener) {
            this.executor = ((ConcurrentRedisMqListener) this.listener).executor();
            this.poolNeedClose = ((ConcurrentRedisMqListener) this.listener).poolNeedClose();
            targetType.executor(this.executor);
        }
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions build = targetType.build();
        defaultRedisMqOperation.createGroupIfAbsent();
        this.container = StreamMessageListenerContainer.create((RedisConnectionFactory) Objects.requireNonNull(redisTemplate.getConnectionFactory()), build);
        this.subscriptions = new ArrayList(this.listener.groupSize());
        for (int i = 0; i < this.listener.groupSize(); i++) {
            StreamListenerAdapter streamListenerAdapter = new StreamListenerAdapter(defaultRedisMqOperation, this.listener);
            String groupName = this.listener.groupName();
            String str = this.listener.topicName();
            String consumerName = getConsumerName();
            this.subscriptions.add(this.listener.autoAck() ? this.container.receiveAutoAck(Consumer.from(groupName, consumerName), StreamOffset.create(str, ReadOffset.lastConsumed()), streamListenerAdapter) : this.container.receive(Consumer.from(groupName, consumerName), StreamOffset.create(str, ReadOffset.lastConsumed()), streamListenerAdapter));
            logger.info("MQ: add listener: topic[" + str + "], group[" + groupName + "], consumer[" + consumerName + "]");
        }
    }

    private String getConsumerName() {
        return UUID.randomUUID().toString();
    }

    private Executor getExecutor() {
        return this.executor == null ? new SimpleAsyncTaskExecutor() : this.executor;
    }

    public void start() {
        if (this.running) {
            return;
        }
        this.container.start();
        this.running = true;
        logger.info("MQ: run listener, topic[" + this.listener.topicName() + "], group[" + this.listener.groupName() + "]");
    }

    public void stop() {
        logger.info("MQ: destroy listener, topic[" + this.listener.topicName() + "], group[" + this.listener.groupName() + "]");
        this.subscriptions.stream().map(subscription -> {
            return CompletableFuture.runAsync(() -> {
                subscription.cancel();
                while (subscription.isActive()) {
                    try {
                        Thread.sleep(50L);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }, getExecutor());
        }).forEach(completableFuture -> {
            try {
                completableFuture.get(this.shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (ExecutionException | TimeoutException e2) {
            }
        });
        this.container.stop();
        if (this.poolNeedClose) {
            this.executor.shutdown();
            logger.info("MQ: shutdown pool, topic[" + this.listener.topicName() + "], group[" + this.listener.groupName() + "]");
        }
    }

    public boolean isRunning() {
        return this.running;
    }
}
