package cn.flood.delay.redis.core;

import cn.flood.delay.redis.configuration.Config;
import cn.flood.delay.redis.constans.LuaScriptConst;
import cn.flood.delay.redis.exception.RDQException;
import cn.flood.delay.redis.job.AckMessageJob;
import cn.flood.delay.redis.job.DelayMessageJob;
import cn.flood.delay.redis.job.ErrorMessageJob;
import cn.flood.delay.redis.utils.ClassUtil;
import cn.flood.delay.redis.utils.GsonUtil;
import cn.flood.delay.redis.utils.ThreadUtil;
import io.lettuce.core.ScriptOutputType;
import java.time.Instant;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/flood/delay/redis/core/RDQueue.class */
public class RDQueue {
    private static final Logger log = LoggerFactory.getLogger(RDQueue.class);
    private DQRedis dqRedis;
    private Config config;

    public RDQueue(Config config) {
        this.config = config;
        init();
    }

    private void init() {
        this.dqRedis = new DQRedis(this.config.getHost(), this.config.getPort(), this.config.getDatabase(), this.config.getPassword(), this.config.getTimeout(), this.config.getCluster(), this.config.getSentinel());
        log.info("redis-dqueue starting...");
        int maxJobCoreSize = this.config.getMaxJobCoreSize();
        int maxCallbackCoreSize = this.config.getMaxCallbackCoreSize();
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(maxJobCoreSize, ThreadUtil.jobThreadFactory(maxJobCoreSize));
        scheduledThreadPoolExecutor.scheduleAtFixedRate(new DelayMessageJob(this.config, this.dqRedis, Executors.newFixedThreadPool(maxCallbackCoreSize, ThreadUtil.callbackThreadFactory(maxCallbackCoreSize))), 0L, 200L, TimeUnit.MILLISECONDS);
        scheduledThreadPoolExecutor.scheduleAtFixedRate(new AckMessageJob(this.config, this.dqRedis), 0L, 1000L, TimeUnit.MILLISECONDS);
        scheduledThreadPoolExecutor.scheduleAtFixedRate(new ErrorMessageJob(this.config, this.dqRedis), 0L, 1000L, TimeUnit.MILLISECONDS);
    }

    public void asyncPushDelayQueue(Message message, BiConsumer<String, ? super Throwable> biConsumer) throws RDQException {
        push(message, biConsumer, true);
    }

    public void syncPushDelayQueue(Message message) throws RDQException {
        push(message, null, false);
    }

    public void delDelayQueue(String str) {
        this.dqRedis.zrem(this.config.getDelayKey(), str);
        this.dqRedis.zrem(this.config.getAckKey(), str);
        this.dqRedis.hdel(this.config.getHashKey(), str);
    }

    private void push(Message message, BiConsumer<String, ? super Throwable> biConsumer, boolean z) throws RDQException {
        checkQueue(this.config.getKeyPrefix(), message);
        String delayKey = this.config.getDelayKey();
        log.info("push message {}", message);
        String msgId = message.getMsgId();
        RawMessage buildTask = buildTask(msgId, message);
        String json = GsonUtil.toJson(buildTask);
        String[] strArr = {this.config.getHashKey(), delayKey, msgId};
        String[] strArr2 = {json, buildTask.getExecuteTime() + ""};
        if (z) {
            this.dqRedis.asyncEval(LuaScriptConst.PUSH_MESSAGE, ScriptOutputType.INTEGER, strArr, strArr2).thenApply(obj -> {
                return msgId;
            }).whenComplete(biConsumer);
        } else {
            log.debug("sync push result: {}", this.dqRedis.syncEval(LuaScriptConst.PUSH_MESSAGE, ScriptOutputType.INTEGER, strArr, strArr2));
        }
    }

    private RawMessage buildTask(String str, Message message) {
        long seconds = message.getTimeUnit().toSeconds(message.getDelayTime());
        long epochSecond = Instant.now().getEpochSecond();
        long j = epochSecond + seconds;
        RawMessage rawMessage = new RawMessage();
        rawMessage.setKey(str);
        rawMessage.setCreateTime(epochSecond);
        rawMessage.setExecuteTime(j);
        rawMessage.setTopic(message.getTopic());
        if (ClassUtil.isBasicType(message.getMsg().getClass())) {
            rawMessage.setMsg(message.getMsg().toString());
        } else {
            rawMessage.setMsg(GsonUtil.toJson(message.getMsg()));
        }
        rawMessage.setMaxRetries(message.getRetries());
        rawMessage.setHasRetries(0);
        return rawMessage;
    }

    private void checkQueue(String str, Message message) throws RDQException {
        if (null == str || str.isEmpty()) {
            throw new RDQException("queue name can not be empty.");
        }
        if (null == message) {
            throw new RDQException("message can not be null.");
        }
        if (null == message.getMsg()) {
            throw new RDQException("message msg can not be null.");
        }
    }

    public void subscribe(String str, Callback callback) {
        log.info("listen the topic [{}]", str);
        this.config.getCallbacks().put(str, callback);
    }

    public void shutdown() {
        this.dqRedis.shutdown();
    }
}
