package devutility.external.redis.queue.stream;

import devutility.external.redis.com.Config;
import devutility.external.redis.com.ExceptionRetryApprover;
import devutility.external.redis.com.RedisQueueOption;
import devutility.external.redis.com.RedisType;
import devutility.external.redis.com.StatusCode;
import devutility.external.redis.com.StreamMessageType;
import devutility.external.redis.exception.JedisFatalException;
import devutility.external.redis.ext.model.ConsumerInfo;
import devutility.external.redis.queue.Acknowledger;
import devutility.external.redis.queue.JedisQueueConsumer;
import devutility.external.redis.utils.JedisUtils;
import devutility.internal.lang.StringUtils;
import devutility.internal.util.CollectionUtils;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.StreamPendingEntry;
import redis.clients.jedis.exceptions.JedisConnectionException;

/* loaded from: input_file:devutility/external/redis/queue/stream/JedisStreamQueueConsumer.class */
public class JedisStreamQueueConsumer extends JedisQueueConsumer {
    private Acknowledger acknowledger;
    private Set<String> consumedIds;

    public JedisStreamQueueConsumer(Jedis jedis, RedisQueueOption redisQueueOption, Acknowledger acknowledger, JedisStreamQueueConsumerEvent jedisStreamQueueConsumerEvent, ExceptionRetryApprover exceptionRetryApprover) {
        super(jedis, redisQueueOption, jedisStreamQueueConsumerEvent, exceptionRetryApprover);
        this.consumedIds = new LinkedHashSet();
        this.acknowledger = acknowledger;
        jedisStreamQueueConsumerEvent.setAcknowledger(acknowledger);
    }

    public JedisStreamQueueConsumer(Jedis jedis, RedisQueueOption redisQueueOption, Acknowledger acknowledger, JedisStreamQueueConsumerEvent jedisStreamQueueConsumerEvent) {
        this(jedis, redisQueueOption, acknowledger, jedisStreamQueueConsumerEvent, null);
    }

    @Override // devutility.external.redis.queue.JedisQueueConsumer
    public void listen() throws Exception {
        RedisType type = this.devJedis.type(this.redisQueueOption.getKey());
        validate(type);
        initialize(type);
        while (isActive()) {
            try {
                processPending();
                process();
            } catch (Exception e) {
                if (JedisUtils.isBrokenJedis(this.jedis)) {
                    throw new JedisConnectionException(e);
                }
                if ((e instanceof JedisFatalException) || isExceptionRetryApproved(e)) {
                    throw e;
                }
                if (!isReasonableConsumerException()) {
                    throw new JedisFatalException("Exceptions count excced the setting exceptionLimit and exceptionIntervalMillis in RedisQueueOption object.", e);
                }
                log(e);
            }
        }
    }

    protected void validate(RedisType redisType) {
        super.validate();
        if (StringUtils.isNullOrEmpty(this.redisQueueOption.getGroupName())) {
            throw new IllegalArgumentException("Group name can't be empty!");
        }
        if (StringUtils.isNullOrEmpty(this.redisQueueOption.getConsumerName())) {
            throw new IllegalArgumentException("Consumer name can't be empty!");
        }
        if (RedisType.NONE != redisType && RedisType.STREAM != redisType) {
            throw new IllegalArgumentException(String.format("Invalid Redis type for key \"%s\"!", this.redisQueueOption.getKey()));
        }
    }

    protected void initialize(RedisType redisType) {
        this.consumedIds.clear();
        if (RedisType.NONE == redisType) {
            initializeGroup();
        } else {
            if (this.devJedis.isGroupExist(this.redisQueueOption.getKey(), this.redisQueueOption.getGroupName())) {
                return;
            }
            initializeGroup();
        }
    }

    private void initializeGroup() {
        if (this.devJedis.createGroup(this.redisQueueOption.getKey(), this.redisQueueOption.getGroupName()) != StatusCode.OK) {
            throw new JedisFatalException("Create group failed!");
        }
    }

    private void processPending() {
        ConsumerInfo consumerInfo = this.devJedis.getConsumerInfo(this.redisQueueOption.getKey(), this.redisQueueOption.getGroupName(), this.redisQueueOption.getConsumerName());
        if (consumerInfo == null) {
            return;
        }
        int pending = (int) consumerInfo.getPending();
        int pageSizeForReadPending = this.redisQueueOption.getPageSizeForReadPending();
        int pageSizeForReadPending2 = pending / this.redisQueueOption.getPageSizeForReadPending();
        StreamEntryID streamEntryID = null;
        if (pageSizeForReadPending % this.redisQueueOption.getPageSizeForReadPending() > 0) {
            pageSizeForReadPending2++;
        }
        for (int i = 0; i < pageSizeForReadPending2; i++) {
            for (StreamPendingEntry streamPendingEntry : this.jedis.xpending(this.redisQueueOption.getKey(), this.redisQueueOption.getGroupName(), streamEntryID, (StreamEntryID) null, pageSizeForReadPending, this.redisQueueOption.getConsumerName())) {
                Map<String, String> xrangeOne = this.devJedis.xrangeOne(this.redisQueueOption.getKey(), streamPendingEntry.getID());
                if (xrangeOne != null) {
                    onMessage(StreamMessageType.PENDING, streamPendingEntry.getID(), xrangeOne.get(Config.QUEUE_DEFAULT_ITEM_KEY));
                    streamEntryID = new StreamEntryID(streamPendingEntry.getID().getTime(), streamPendingEntry.getID().getSequence() + 1);
                }
            }
        }
    }

    private void process() throws InterruptedException {
        List<Map.Entry<String, List<StreamEntry>>> xreadGroup = this.devJedis.xreadGroup(this.redisQueueOption.getGroupName(), this.redisQueueOption.getConsumerName(), 1, this.redisQueueOption.getWaitMilliseconds(), this.redisQueueOption.isNoNeedAck(), new AbstractMap.SimpleEntry(this.redisQueueOption.getKey(), StreamEntryID.UNRECEIVED_ENTRY));
        if (CollectionUtils.isNullOrEmpty(xreadGroup)) {
            return;
        }
        List<StreamEntry> value = xreadGroup.get(0).getValue();
        if (CollectionUtils.isNullOrEmpty(value)) {
            throw new JedisFatalException("Illegal StreamEntry list!");
        }
        StreamEntry streamEntry = value.get(0);
        Map fields = streamEntry.getFields();
        if (fields == null || !fields.containsKey(Config.QUEUE_DEFAULT_ITEM_KEY)) {
            throw new JedisFatalException("Illegal StreamEntry map!");
        }
        onMessage(StreamMessageType.NORMAL, streamEntry.getID(), fields.get(Config.QUEUE_DEFAULT_ITEM_KEY));
    }

    private void onMessage(StreamMessageType streamMessageType, Object... objArr) {
        StreamEntryID streamEntryID = (StreamEntryID) objArr[0];
        if (this.consumedIds.contains(streamEntryID.toString())) {
            return;
        }
        switch (streamMessageType) {
            case NORMAL:
                if (!this.consumerEvent.onMessage(this.redisQueueOption.getKey(), objArr)) {
                    return;
                }
                break;
            case PENDING:
                if (!this.consumerEvent.onPendingMessage(this.redisQueueOption.getKey(), objArr)) {
                    return;
                }
                break;
            default:
                return;
        }
        this.consumedIds.add(streamEntryID.toString());
        autoAck(streamEntryID);
    }

    private void autoAck(StreamEntryID streamEntryID) {
        if (this.redisQueueOption.isNoNeedAck() || !this.redisQueueOption.isAutoAck()) {
            return;
        }
        this.acknowledger.ack(streamEntryID);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.jedis.close();
        this.devJedis.close();
    }
}
