package cn.flood.delay.core;

import cn.flood.delay.common.DelayQueueException;
import cn.flood.delay.common.GlobalConstants;
import cn.flood.delay.entity.DelayQueueJob;
import cn.flood.delay.mapper.TbDelayJobMapper;
import cn.flood.delay.redis.RedisOperation;
import cn.flood.delay.redis.RedisOperationByLua;
import cn.flood.delay.service.RedisDelayQueue;
import cn.flood.delay.service.impl.AbstractTopicRegister;
import cn.flood.delay.service.impl.RedisDelayQueueImpl;
import cn.flood.delay.threads.Move2ReadyThread;
import cn.flood.delay.threads.RetryOutTimesThread;
import cn.flood.delay.threads.ShutdownThread;
import cn.flood.delay.utils.ExceptionUtil;
import cn.flood.delay.utils.NetUtil;
import cn.flood.delay.utils.NextTimeHolder;
import cn.flood.delay.utils.RedisKeyUtil;
import cn.flood.delay.utils.TimeoutUtil;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.time.Clock;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.types.RedisClientInfo;
import org.springframework.util.ObjectUtils;

/* loaded from: input_file:cn/flood/delay/core/RedisDelayQueueContext.class */
public class RedisDelayQueueContext {
    private RedisDelayQueue redisDelayQueue;
    private TbDelayJobMapper tbDelayJobMapper;
    public static String PROJECTNAME;
    private RedisOperation redisOperation;
    private volatile String ipInRedisServer;
    private ThreadPoolExecutor RPUSH_NO_EXEC_JOB = new ThreadPoolExecutor(0, 50, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("异常未消费重入-%d").build(), new ThreadPoolExecutor.DiscardPolicy());
    private final ScheduledExecutorService TIMER_NOTIFY = Executors.newScheduledThreadPool(1);
    private static final Logger logger = LoggerFactory.getLogger(RedisDelayQueueContext.class);
    private static ConcurrentHashMap<String, AbstractTopicRegister> topicRegisterHolder = new ConcurrentHashMap<>();
    private static volatile boolean topicThreadStop = false;
    private static boolean canUseBlpop = false;
    private static ExecutorService DelayQ_ASYNC = new ThreadPoolExecutor(10, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue());

    public RedisDelayQueueContext(RedisTemplate<Object, Object> redisTemplate, String str) {
        initCtx(redisTemplate, str, null, null);
    }

    public RedisDelayQueueContext(RedisTemplate<Object, Object> redisTemplate, String str, TbDelayJobMapper tbDelayJobMapper) {
        initCtx(redisTemplate, str, null, tbDelayJobMapper);
    }

    public RedisDelayQueueContext(RedisTemplate<Object, Object> redisTemplate, String str, String str2) {
        initCtx(redisTemplate, str, str2, null);
    }

    protected final void initCtx(RedisTemplate<Object, Object> redisTemplate, String str, String str2, TbDelayJobMapper tbDelayJobMapper) {
        if (str.contains(":") || str.contains("{") || str.contains("}")) {
            throw new DelayQueueException("projectName 不能包含特殊字符 : } { ");
        }
        this.redisOperation = new RedisOperationByLua(redisTemplate);
        this.redisDelayQueue = new RedisDelayQueueImpl(this.redisOperation, topicRegisterHolder, DelayQ_ASYNC, tbDelayJobMapper);
        this.ipInRedisServer = str2;
        this.tbDelayJobMapper = tbDelayJobMapper;
        PROJECTNAME = str;
        init();
    }

    private void init() {
        Move2ReadyThread.getInstance().runMove2ReadyThread(this.redisOperation);
        checkBlpop();
        runTopicsThreadAfter5Sec();
        runTimerNotify();
        registerDestory();
    }

    private void checkBlpop() {
        try {
            this.redisOperation.BLPOP("daimi_11&*", 5000L);
            canUseBlpop = true;
        } catch (Exception e) {
            canUseBlpop = false;
        }
    }

    private void registerDestory() {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            logger.info("延迟任务开始关机....");
            ShutdownThread.closeExecutor(DelayQ_ASYNC, "异步AddJob线程池");
            this.TIMER_NOTIFY.shutdown();
            Move2ReadyThread.getInstance().toStop();
            shutdownTopicThreads();
            RetryOutTimesThread.getInstance().toStop();
            ShutdownThread.closeExecutor(this.RPUSH_NO_EXEC_JOB, "异常未消费重入List线程池");
            logger.info("延迟任务关机完毕....");
        }));
    }

    private void runTopicsThreadAfter5Sec() {
        new Thread(() -> {
            try {
                Thread.sleep(5000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            runTopicsThreads();
        }).start();
    }

    private void runTimerNotify() {
        this.TIMER_NOTIFY.scheduleWithFixedDelay(() -> {
            NextTimeHolder.setZeroAndNotify();
        }, 1L, 1L, TimeUnit.MINUTES);
    }

    private void runTopicsThreads() {
        logger.info("初始化Topic线程runTopicsThreads topic.size:{}", Integer.valueOf(topicRegisterHolder.size()));
        Iterator<Map.Entry<String, AbstractTopicRegister>> it = topicRegisterHolder.entrySet().iterator();
        while (it.hasNext()) {
            runTopicThreads(it.next().getValue());
        }
    }

    private void runTopicThreads(AbstractTopicRegister abstractTopicRegister) {
        Semaphore semaphore = new Semaphore(abstractTopicRegister.getMaxPoolSize());
        Thread thread = new Thread(() -> {
            List arrayList;
            logger.info("创建Topic:{}线程;topicThreadStop:{}", abstractTopicRegister.getTopic(), Boolean.valueOf(topicThreadStop));
            while (!topicThreadStop) {
                try {
                    int availablePermits = semaphore.availablePermits();
                    if (availablePermits == 0) {
                        semaphore.acquire(1);
                        semaphore.release();
                    }
                    int lrangMaxCount = abstractTopicRegister.getLrangMaxCount() < availablePermits ? abstractTopicRegister.getLrangMaxCount() : availablePermits;
                    if (lrangMaxCount > 1) {
                        List lrangeAndLTrim = this.redisOperation.lrangeAndLTrim(abstractTopicRegister.getTopic(), lrangMaxCount);
                        arrayList = lrangeAndLTrim;
                        if (null != lrangeAndLTrim) {
                        }
                    }
                    arrayList = new ArrayList(1);
                    if (canUseBlpop) {
                        long millis = Clock.systemDefaultZone().millis();
                        String BLPOPKey = this.redisOperation.BLPOPKey(abstractTopicRegister.getTopic());
                        logger.info("BLPOPKey 耗时:{}", Long.valueOf(Clock.systemDefaultZone().millis() - millis));
                        if (BLPOPKey != null) {
                            arrayList.add(BLPOPKey);
                        }
                    } else {
                        Thread.sleep(1000L);
                    }
                    if (arrayList != null && arrayList.size() > 0) {
                        semaphore.acquire(arrayList.size());
                        for (int i = 0; i < arrayList.size(); i++) {
                            if (!ObjectUtils.isEmpty(arrayList.get(i))) {
                                String replaceAll = ((String) arrayList.get(i)).replaceAll("\"", "");
                                abstractTopicRegister.getTOPIC_THREADS().execute(() -> {
                                    boolean z = false;
                                    DelayQueueJob delayQueueJob = null;
                                    try {
                                        try {
                                            DelayQueueJob job = this.redisOperation.getJob(replaceAll + "");
                                            if (job != null) {
                                                job.setTtr(Clock.systemDefaultZone().millis() - job.getExecutionTime());
                                                try {
                                                    checkTimeoutExectue(abstractTopicRegister.getMethodTimeout(), abstractTopicRegister, job);
                                                    if (job.getRetryCount() > 0) {
                                                        logger.info("重试延迟任务第{}次重试消费成功,topicId:{},DelayQueueJob:{} ", new Object[]{Integer.valueOf(job.getRetryCount()), RedisKeyUtil.getTopicId(abstractTopicRegister.getTopic(), job.getId()), job.toString()});
                                                    } else {
                                                        logger.info("延迟任务消费成功,topicId:{},DelayQueueJob:{} ", RedisKeyUtil.getTopicId(abstractTopicRegister.getTopic(), job.getId()), job.toString());
                                                    }
                                                } catch (Exception e) {
                                                    if (job.getRetryCount() > 0) {
                                                        logger.error("重试任务第{}次重试败,执行回调接口出错:topicId:{},DelayQueueJob:{},Err:{}", new Object[]{Integer.valueOf(job.getRetryCount()), RedisKeyUtil.getTopicId(abstractTopicRegister.getTopic(), job.getId()), job.toString(), ExceptionUtil.getStackTrace(e)});
                                                    } else {
                                                        logger.error("延迟任务消费失败,执行回调接口出错:topicId:{},DelayQueueJob:{},Err:{}", new Object[]{RedisKeyUtil.getTopicId(abstractTopicRegister.getTopic(), job.getId()), job.toString(), ExceptionUtil.getStackTrace(e)});
                                                    }
                                                    z = true;
                                                    RetryOutTimesThread.getInstance().callBackExceptionTryRetry(abstractTopicRegister, job, this.redisOperation);
                                                }
                                            }
                                            if (job != null && !z) {
                                                this.redisOperation.deleteJob(abstractTopicRegister.getTopic(), job.getId());
                                                if (null != this.tbDelayJobMapper) {
                                                    job.setStatus(GlobalConstants.STATUS_SUCCESS);
                                                    job.setUpdateDate(LocalDateTime.now());
                                                    this.tbDelayJobMapper.update(job);
                                                }
                                            }
                                            if (job != null && (job.getRetryCount() > 2 || job.getRetryCount() < 0)) {
                                                if (null != this.tbDelayJobMapper) {
                                                    job.setStatus(GlobalConstants.STATUS_ERROR);
                                                    job.setUpdateDate(LocalDateTime.now());
                                                    this.tbDelayJobMapper.update(job);
                                                }
                                                this.redisOperation.deleteJob(abstractTopicRegister.getTopic(), job.getId());
                                            }
                                            semaphore.release();
                                        } catch (Exception e2) {
                                            if (0 == 0) {
                                                logger.error("延迟任务消费异常: TopicId已经Pop出来,但是Job没有被消费,重新RightPush进待消费列表;topicId:{},Err:{}", replaceAll, ExceptionUtil.getStackTrace(e2));
                                                againRightPush(abstractTopicRegister.getTopic(), null);
                                                if (null != this.tbDelayJobMapper) {
                                                    delayQueueJob.setStatus(GlobalConstants.STATUS_RUNNER);
                                                    delayQueueJob.setUpdateDate(LocalDateTime.now());
                                                    this.tbDelayJobMapper.update(null);
                                                }
                                            }
                                            if (0 != 0 && 0 == 0) {
                                                this.redisOperation.deleteJob(abstractTopicRegister.getTopic(), delayQueueJob.getId());
                                                if (null != this.tbDelayJobMapper) {
                                                    delayQueueJob.setStatus(GlobalConstants.STATUS_SUCCESS);
                                                    delayQueueJob.setUpdateDate(LocalDateTime.now());
                                                    this.tbDelayJobMapper.update(null);
                                                }
                                            }
                                            if (0 != 0 && (delayQueueJob.getRetryCount() > 2 || delayQueueJob.getRetryCount() < 0)) {
                                                if (null != this.tbDelayJobMapper) {
                                                    delayQueueJob.setStatus(GlobalConstants.STATUS_ERROR);
                                                    delayQueueJob.setUpdateDate(LocalDateTime.now());
                                                    this.tbDelayJobMapper.update(null);
                                                }
                                                this.redisOperation.deleteJob(abstractTopicRegister.getTopic(), delayQueueJob.getId());
                                            }
                                            semaphore.release();
                                        }
                                    } catch (Throwable th) {
                                        if (0 != 0 && 0 == 0) {
                                            this.redisOperation.deleteJob(abstractTopicRegister.getTopic(), delayQueueJob.getId());
                                            if (null != this.tbDelayJobMapper) {
                                                delayQueueJob.setStatus(GlobalConstants.STATUS_SUCCESS);
                                                delayQueueJob.setUpdateDate(LocalDateTime.now());
                                                this.tbDelayJobMapper.update(null);
                                            }
                                        }
                                        if (0 != 0 && (delayQueueJob.getRetryCount() > 2 || delayQueueJob.getRetryCount() < 0)) {
                                            if (null != this.tbDelayJobMapper) {
                                                delayQueueJob.setStatus(GlobalConstants.STATUS_ERROR);
                                                delayQueueJob.setUpdateDate(LocalDateTime.now());
                                                this.tbDelayJobMapper.update(null);
                                            }
                                            this.redisOperation.deleteJob(abstractTopicRegister.getTopic(), delayQueueJob.getId());
                                        }
                                        semaphore.release();
                                        throw th;
                                    }
                                });
                            }
                        }
                    }
                } catch (Exception e) {
                    logger.error(ExceptionUtil.getStackTrace(e));
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e2) {
                    }
                }
            }
        });
        thread.setDaemon(true);
        thread.setName(abstractTopicRegister.getTopic() + "监听redis线程");
        thread.start();
    }

    private void checkTimeoutExectue(long j, AbstractTopicRegister abstractTopicRegister, DelayQueueJob delayQueueJob) throws InterruptedException, ExecutionException, TimeoutException {
        TimeoutUtil.timeoutMethod(j, obj -> {
            abstractTopicRegister.execute(delayQueueJob);
            return false;
        });
    }

    private void againRightPush(String str, DelayQueueJob delayQueueJob) {
        if (delayQueueJob.getReentry() > 2) {
            logger.error("未被消费任务topicId:{},重入了3次仍旧失败", RedisKeyUtil.getTopicId(str, delayQueueJob.getId()));
        } else {
            this.RPUSH_NO_EXEC_JOB.execute(() -> {
                logger.warn("未被消费任务topicId:{} 重新放入待消费队列", RedisKeyUtil.getTopicId(str, delayQueueJob.getId()));
                delayQueueJob.setRetryCount(delayQueueJob.getReentry() + 1);
                this.redisOperation.retryJob(str, delayQueueJob.getId(), delayQueueJob);
            });
        }
    }

    public static final void addTopic(String str, AbstractTopicRegister abstractTopicRegister) {
        if (null != topicRegisterHolder.get(str)) {
            throw new DelayQueueException("Topic 注册重复,请保证Topic唯一");
        }
        topicRegisterHolder.put(str, abstractTopicRegister);
    }

    public RedisDelayQueue getRedisDelayQueue() {
        return this.redisDelayQueue;
    }

    public static void setTopicThreadStop(boolean z) {
        topicThreadStop = z;
    }

    private void shutdownTopicThreads() {
        setTopicThreadStop(true);
        for (Map.Entry<String, AbstractTopicRegister> entry : topicRegisterHolder.entrySet()) {
            ShutdownThread.closeExecutor(entry.getValue().getTOPIC_THREADS(), entry.getKey());
        }
    }

    private void killThisMachineAllRedisBlpopClients(RedisOperation redisOperation) {
        String str = this.ipInRedisServer;
        if (ObjectUtils.isEmpty(str)) {
            str = NetUtil.getLocalHostLANAddress();
        }
        logger.info("ipInRedisServer;{}", str);
        List<RedisClientInfo> thisMachineAllBlpopClientList = redisOperation.getThisMachineAllBlpopClientList();
        ArrayList newArrayList = Lists.newArrayList();
        for (RedisClientInfo redisClientInfo : thisMachineAllBlpopClientList) {
            if (redisClientInfo.getAddressPort().split(":")[0].equals(str) && redisClientInfo.getLastCommand().equals("blpop")) {
                newArrayList.add(redisClientInfo.getAddressPort());
                logger.info("优雅关机,杀掉redis的Blpop客户端;{}", redisClientInfo.getAddressPort());
            }
        }
        redisOperation.killClient(newArrayList);
    }
}
