package cn.flood.delay.redis.job;

import cn.flood.delay.redis.configuration.Config;
import cn.flood.delay.redis.core.Callback;
import cn.flood.delay.redis.core.DQRedis;
import cn.flood.delay.redis.core.RawMessage;
import cn.flood.delay.redis.enums.ConsumeStatus;
import cn.flood.delay.redis.utils.ClassUtil;
import cn.flood.delay.redis.utils.GsonUtil;
import java.io.Serializable;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/flood/delay/redis/job/DelayMessageJob.class */
public class DelayMessageJob extends BaseJob implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(DelayMessageJob.class);
    private ExecutorService threadPool;
    private Map<String, Callback> callbacks;

    public DelayMessageJob(Config config, DQRedis dQRedis, ExecutorService executorService) {
        super(config, dQRedis);
        this.threadPool = executorService;
        this.callbacks = config.getCallbacks();
    }

    @Override // java.lang.Runnable
    public void run() {
        if (this.callbacks.isEmpty()) {
            return;
        }
        long epochSecond = Instant.now().getEpochSecond();
        long taskTtl = epochSecond - this.config.getTaskTtl();
        try {
            List<String> zrangebyscore = zrangebyscore(this.config.getDelayKey(), taskTtl, epochSecond - this.config.getCallbackTtl());
            if (null == zrangebyscore || zrangebyscore.isEmpty()) {
                return;
            }
            Stream<String> stream = zrangebyscore.stream();
            Config config = this.config;
            config.getClass();
            Stream<R> map = stream.filter(config::waitProcessing).map(str -> {
                return () -> {
                    handleCallback(str);
                };
            });
            ExecutorService executorService = this.threadPool;
            executorService.getClass();
            map.forEach(executorService::submit);
        } catch (Exception e) {
            log.error("zrangebyscore({}, {}-{})", new Object[]{this.config.getDelayKey(), Long.valueOf(taskTtl), Long.valueOf(epochSecond), e});
        }
    }

    private <T extends Serializable> void handleCallback(String str) {
        this.config.addProcessed(str);
        RawMessage task = getTask(str);
        if (null != task && this.callbacks.containsKey(task.getTopic())) {
            if (!transferMessage(str, this.config.getDelayKey(), this.config.getAckKey(), Instant.now().getEpochSecond())) {
                this.config.processed(str);
                return;
            }
            Callback callback = this.callbacks.get(task.getTopic());
            Class genericType = ClassUtil.getGenericType(callback);
            if (ConsumeStatus.RETRY.equals(callback.execute(ClassUtil.isBasicType(genericType) ? (Serializable) ClassUtil.convert(genericType, task.getMsg()) : (Serializable) GsonUtil.fromJson(task.getMsg(), genericType)))) {
                retry(str, task);
            } else {
                deleteMessage(str);
            }
            this.config.processed(str);
        }
    }

    private void retry(String str, RawMessage rawMessage) {
        rawMessage.addHasRetries();
        if (rawMessage.getHasRetries() > rawMessage.getMaxRetries()) {
            deleteMessage(str);
            return;
        }
        this.redis.hset(this.config.getHashKey(), str, GsonUtil.toJson(rawMessage));
        transferMessage(str, this.config.getAckKey(), this.config.getErrorKey(), Instant.now().getEpochSecond() + (((int) Math.pow(2.0d, rawMessage.getHasRetries() - 1)) * this.config.getRetryInterval()));
    }
}
