package io.github.aooohan.mq.core;

import io.github.aooohan.mq.core.listener.RedisMqListener;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.hash.ObjectHashMapper;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;

/* loaded from: input_file:io/github/aooohan/mq/core/AbstractRedisMqListener.class */
public abstract class AbstractRedisMqListener<T> implements RedisMqListener<T>, SmartLifecycle {
    private RedisConnectionFactory redisConnectionFactory;
    protected RedisTemplate<String, Object> redisTemplate;
    private StreamMessageListenerContainer<String, ObjectRecord<String, String>> container;
    private List<Subscription> subscriptions;
    protected final Log logger = LogFactory.getLog(getClass());
    protected boolean running = false;

    protected ExecutorService pool() {
        return Executors.newFixedThreadPool(consumerSize(), runnable -> {
            return new Thread(runnable, topicName() + "-listener");
        });
    }

    @Override // io.github.aooohan.mq.core.listener.RedisMqListener
    public int consumerSize() {
        return 1;
    }

    protected boolean ack(ObjectRecord<String, T> objectRecord) {
        Long l = 1L;
        return l.equals(this.redisTemplate.opsForStream().acknowledge(Objects.requireNonNull(objectRecord.getStream()), getClass().getName(), new RecordId[]{objectRecord.getId()}));
    }

    public void stop(Runnable runnable) {
        stop();
        runnable.run();
    }

    public void start() {
        String name = getClass().getName();
        if (Boolean.TRUE.equals(this.redisTemplate.hasKey(topicName()))) {
            StreamOperations opsForStream = this.redisTemplate.opsForStream();
            if (opsForStream.groups(topicName()).isEmpty()) {
                opsForStream.createGroup(topicName(), name);
                this.logger.info("MQ: create topic: " + topicName());
            }
            PendingMessagesSummary pending = opsForStream.pending(topicName(), name);
            if (pending != null && pending.getTotalPendingMessages() > 0) {
                pending.getPendingMessagesPerConsumer().forEach((str, l) -> {
                    if (l.longValue() > 0) {
                        opsForStream.pending(topicName(), Consumer.from(name, str)).forEach(pendingMessage -> {
                            RecordId id = pendingMessage.getId();
                            pendingMessage.getElapsedTimeSinceLastDelivery();
                            pendingMessage.getTotalDeliveryCount();
                            opsForStream.range(topicName(), Range.closed(id.getValue(), id.getValue())).forEach(mapRecord -> {
                                Map map = (Map) mapRecord.getValue();
                                opsForStream.acknowledge(topicName(), name, new RecordId[]{mapRecord.getId()});
                                opsForStream.add(ObjectRecord.create(topicName(), map));
                            });
                        });
                    }
                });
            }
        } else {
            this.redisTemplate.opsForStream().createGroup(topicName(), name);
            this.logger.info("MQ: create topic=" + topicName());
        }
        StreamMessageListenerContainer<String, ObjectRecord<String, String>> create = StreamMessageListenerContainer.create(this.redisConnectionFactory, StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().batchSize(batchSize()).executor(pool()).pollTimeout(Duration.ofMillis(100L)).keySerializer(RedisSerializer.string()).hashValueSerializer(new Jackson2JsonRedisSerializer(Object.class)).hashKeySerializer(RedisSerializer.string()).objectMapper(new ObjectHashMapper()).targetType(String.class).build());
        this.subscriptions = new ArrayList(consumerSize());
        for (int i = 0; i < consumerSize(); i++) {
            this.subscriptions.add(autoAck() ? create.receiveAutoAck(Consumer.from(name, name + i), StreamOffset.create(topicName(), ReadOffset.lastConsumed()), (StreamListener) null) : create.receive(Consumer.from(name, name + i), StreamOffset.create(topicName(), ReadOffset.lastConsumed()), (StreamListener) null));
            this.logger.info("MQ: add listener " + name + "-" + i);
        }
        this.container = create;
        this.container.start();
        this.running = true;
        this.logger.info("MQ: run " + getClass().getSimpleName() + " listener...");
    }

    protected long shutdownTimeout() {
        return 1000L;
    }

    public void stop() {
        this.logger.info("MQ: destroy " + getClass().getSimpleName() + " listener...");
        this.subscriptions.stream().map(subscription -> {
            return CompletableFuture.runAsync(() -> {
                subscription.cancel();
                while (subscription.isActive()) {
                    try {
                        Thread.sleep(50L);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }, pool());
        }).forEach(completableFuture -> {
            try {
                completableFuture.get(shutdownTimeout(), TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (ExecutionException | TimeoutException e2) {
            }
        });
        this.container.stop();
        if (pool() == null || pool().isShutdown()) {
            return;
        }
        pool().shutdown();
        this.logger.info("MQ: shutdown pool...");
    }

    public boolean isRunning() {
        return this.running;
    }

    @Override // io.github.aooohan.mq.core.listener.RedisMqListener
    public boolean autoAck() {
        return true;
    }
}
