package com.github.joekerouac.async.task.service;

import com.github.joekerouac.async.task.Const;
import com.github.joekerouac.async.task.entity.AsyncTask;
import com.github.joekerouac.async.task.model.AsyncTaskExecutorConfig;
import com.github.joekerouac.async.task.model.AsyncTaskProcessorEngineConfig;
import com.github.joekerouac.async.task.model.AsyncThreadPoolConfig;
import com.github.joekerouac.async.task.model.ExecResult;
import com.github.joekerouac.async.task.model.ExecStatus;
import com.github.joekerouac.async.task.model.TaskFinishCode;
import com.github.joekerouac.async.task.spi.AbstractAsyncTaskProcessor;
import com.github.joekerouac.async.task.spi.AsyncTaskProcessorEngine;
import com.github.joekerouac.async.task.spi.AsyncTaskRepository;
import com.github.joekerouac.async.task.spi.MonitorService;
import com.github.joekerouac.async.task.spi.ProcessorSupplier;
import com.github.joekerouac.async.task.spi.TraceService;
import com.github.joekerouac.async.task.spi.TransactionCallback;
import com.github.joekerouac.common.tools.collection.CollectionUtil;
import com.github.joekerouac.common.tools.collection.Pair;
import com.github.joekerouac.common.tools.constant.ExceptionProviderConst;
import com.github.joekerouac.common.tools.lock.LockTaskUtil;
import com.github.joekerouac.common.tools.log.Logger;
import com.github.joekerouac.common.tools.log.LoggerFactory;
import com.github.joekerouac.common.tools.scheduler.SchedulerTask;
import com.github.joekerouac.common.tools.scheduler.SimpleSchedulerTask;
import com.github.joekerouac.common.tools.string.StringUtils;
import com.github.joekerouac.common.tools.util.Assert;
import java.time.LocalDateTime;
import java.time.chrono.ChronoLocalDateTime;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

/* loaded from: input_file:com/github/joekerouac/async/task/service/DefaultAsyncTaskProcessorEngine.class */
public class DefaultAsyncTaskProcessorEngine implements AsyncTaskProcessorEngine {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAsyncTaskProcessorEngine.class.getName());
    private static final String DEFAULT_THREAD_NAME = "async-worker";
    private static final int MAX_TIME = 300;
    private final ReadWriteLock queueLock;
    private final Condition condition;
    private final NavigableSet<Pair<String, AsyncTask>> queue;
    private final AsyncTaskExecutorConfig executorConfig;
    private SchedulerTask loadTask;
    private volatile long lastEmptyLoad;
    private volatile boolean start = false;
    private Thread[] workerThreads;
    private final Map<String, AbstractAsyncTaskProcessor<?>> processors;
    private final TaskClearRunner taskClearRunner;
    private final ProcessorSupplier processorSupplier;
    private final TraceService traceService;
    private final AsyncTaskRepository repository;
    private final MonitorService monitorService;
    private final Set<String> processorGroup;
    private final boolean contain;
    private final boolean loadTaskFromRepository;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.github.joekerouac.async.task.service.DefaultAsyncTaskProcessorEngine$1, reason: invalid class name */
    /* loaded from: input_file:com/github/joekerouac/async/task/service/DefaultAsyncTaskProcessorEngine$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$github$joekerouac$async$task$model$ExecResult = new int[ExecResult.values().length];

        static {
            try {
                $SwitchMap$com$github$joekerouac$async$task$model$ExecResult[ExecResult.SUCCESS.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$github$joekerouac$async$task$model$ExecResult[ExecResult.WAIT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$github$joekerouac$async$task$model$ExecResult[ExecResult.RETRY.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$github$joekerouac$async$task$model$ExecResult[ExecResult.ERROR.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public DefaultAsyncTaskProcessorEngine(AsyncTaskProcessorEngineConfig asyncTaskProcessorEngineConfig) {
        Assert.notNull(asyncTaskProcessorEngineConfig, "engineConfig不能为null", ExceptionProviderConst.IllegalArgumentExceptionProvider);
        Const.VALIDATION_SERVICE.validate(asyncTaskProcessorEngineConfig);
        this.executorConfig = asyncTaskProcessorEngineConfig.getExecutorConfig();
        this.taskClearRunner = asyncTaskProcessorEngineConfig.getTaskClearRunner();
        this.processorSupplier = asyncTaskProcessorEngineConfig.getProcessorSupplier();
        this.traceService = asyncTaskProcessorEngineConfig.getTraceService();
        this.repository = asyncTaskProcessorEngineConfig.getRepository();
        this.monitorService = asyncTaskProcessorEngineConfig.getMonitorService();
        this.processors = new ConcurrentHashMap();
        this.processorGroup = Collections.unmodifiableSet(asyncTaskProcessorEngineConfig.getProcessorGroup());
        this.contain = asyncTaskProcessorEngineConfig.isContain();
        this.loadTaskFromRepository = asyncTaskProcessorEngineConfig.isLoadTaskFromRepository();
        this.queue = new TreeSet((pair, pair2) -> {
            AsyncTask asyncTask = (AsyncTask) pair.getValue();
            AsyncTask asyncTask2 = (AsyncTask) pair2.getValue();
            int compareTo = asyncTask.getExecTime().compareTo((ChronoLocalDateTime<?>) asyncTask2.getExecTime());
            if (compareTo != 0) {
                return compareTo;
            }
            String createIp = asyncTask.getCreateIp();
            String createIp2 = asyncTask2.getCreateIp();
            if (createIp.equals(createIp2)) {
                return 0;
            }
            if (Const.IP.equals(createIp)) {
                return -1;
            }
            return Const.IP.equals(createIp2) ? 1 : 0;
        });
        this.queueLock = new ReentrantReadWriteLock();
        this.condition = this.queueLock.writeLock().newCondition();
    }

    @Override // com.github.joekerouac.async.task.spi.AsyncTaskProcessorEngine
    public void addProcessor(AbstractAsyncTaskProcessor<?> abstractAsyncTaskProcessor) {
        Assert.notNull(abstractAsyncTaskProcessor, "待添加的异步任务处理器不能为空", ExceptionProviderConst.IllegalArgumentExceptionProvider);
        Assert.assertTrue(!CollectionUtil.isEmpty(abstractAsyncTaskProcessor.processors()), StringUtils.format("处理器可以处理的任务类型不能为空， [{}]", new Object[]{abstractAsyncTaskProcessor}), ExceptionProviderConst.IllegalArgumentExceptionProvider);
        for (String str : abstractAsyncTaskProcessor.processors()) {
            Assert.assertTrue((this.contain && this.processorGroup.contains(str)) || !(this.contain || this.processorGroup.contains(str)), StringUtils.format("本任务处理引擎无法处理任务: [{}], contain: [{}], processorGroup: [{}]", new Object[]{str, Boolean.valueOf(this.contain), this.processorGroup}), ExceptionProviderConst.CodeErrorExceptionProvider);
            if (abstractAsyncTaskProcessor.autoClear()) {
                this.taskClearRunner.addClearDesc(str, abstractAsyncTaskProcessor.reserve());
            }
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("注册处理器 [{}:{}]", new Object[]{str, abstractAsyncTaskProcessor});
            }
            AbstractAsyncTaskProcessor<?> put = this.processors.put(str, abstractAsyncTaskProcessor);
            if (put != null) {
                LOGGER.warn("异步任务处理器[{}]发生变更，使用 [{}] 替换 [{}]", new Object[]{str, abstractAsyncTaskProcessor, put});
            }
        }
    }

    @Override // com.github.joekerouac.async.task.spi.AsyncTaskProcessorEngine
    public <T, P extends AbstractAsyncTaskProcessor<T>> P removeProcessor(String str) {
        P p = (P) this.processors.remove(str);
        if (p != null && p.autoClear()) {
            this.taskClearRunner.removeClearDesc(str);
        }
        return p;
    }

    @Override // com.github.joekerouac.async.task.spi.AsyncTaskProcessorEngine
    public <T, P extends AbstractAsyncTaskProcessor<T>> P getProcessor(String str) {
        AbstractAsyncTaskProcessor<?> abstractAsyncTaskProcessor = this.processors.get(str);
        if (abstractAsyncTaskProcessor == null && this.processorSupplier != null) {
            synchronized (this.processorSupplier) {
                abstractAsyncTaskProcessor = this.processors.get(str);
                if (abstractAsyncTaskProcessor == null) {
                    abstractAsyncTaskProcessor = this.processorSupplier.get(str);
                    if (abstractAsyncTaskProcessor != null) {
                        addProcessor(abstractAsyncTaskProcessor);
                    }
                }
            }
        }
        return (P) abstractAsyncTaskProcessor;
    }

    @Override // com.github.joekerouac.async.task.spi.AsyncTaskProcessorEngine
    public void addTask(Collection<AsyncTask> collection) {
        if (collection == null || collection.isEmpty()) {
            return;
        }
        LockTaskUtil.runWithLock(this.queueLock.writeLock(), () -> {
            Pair<String, AsyncTask> first = this.queue.isEmpty() ? null : this.queue.first();
            int i = 0;
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                AsyncTask asyncTask = (AsyncTask) it.next();
                if (asyncTask.getStatus() != ExecStatus.READY) {
                    LOGGER.debug("当前任务状态不是READY，无需添加到内存队列, task: [{}]", new Object[]{asyncTask});
                } else if (this.queue.add(new Pair(asyncTask.getRequestId(), asyncTask)) || !LOGGER.isDebugEnabled()) {
                    i++;
                } else {
                    LOGGER.debug("任务 [{}] 已经在队列中了，忽略该任务", new Object[]{asyncTask});
                }
            }
            if (i <= 0) {
                LOGGER.debug("当前并未实际添加内存队列，不进行队列唤醒", new Object[0]);
                return;
            }
            while (this.queue.size() - this.executorConfig.getCacheQueueSize() > 0) {
                Pair<String, AsyncTask> pollLast = this.queue.pollLast();
                if (pollLast != null && LOGGER.isDebugEnabled()) {
                    LOGGER.debug("当前任务队列超长，将最晚执行的任务移除, 移除的任务: [{}]", new Object[]{pollLast.getKey()});
                }
            }
            if (first == null) {
                LOGGER.debug("任务添加完毕，原队列为空，直接唤醒", new Object[0]);
                this.condition.signalAll();
                return;
            }
            Pair<String, AsyncTask> first2 = this.queue.first();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("任务添加完毕，开始唤醒处理, oldFirst: [{}], new: [{}]", new Object[]{first, first2});
            }
            if (((AsyncTask) first2.getValue()).getExecTime().isBefore(((AsyncTask) first.getValue()).getExecTime())) {
                this.condition.signalAll();
            }
        });
    }

    @Override // com.github.joekerouac.async.task.spi.AsyncTaskProcessorEngine
    public synchronized void start() {
        LOGGER.info("异步任务引擎准备启动...", new Object[0]);
        this.start = true;
        if (this.loadTaskFromRepository) {
            this.loadTask = new SimpleSchedulerTask(() -> {
                LocalDateTime now = LocalDateTime.now();
                long currentTimeMillis = System.currentTimeMillis() - this.lastEmptyLoad;
                if (currentTimeMillis < this.executorConfig.getLoadInterval()) {
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("当前距离上次空捞取的时间间隔为 [{}ms] ，小于系统配置的最小空捞取间隔 [{}ms]，跳过", new Object[]{Long.valueOf(currentTimeMillis), Long.valueOf(this.executorConfig.getLoadInterval())});
                        return;
                    }
                    return;
                }
                List list = (List) LockTaskUtil.runWithLock(this.queueLock.readLock(), () -> {
                    return (List) this.queue.stream().map((v0) -> {
                        return v0.getKey();
                    }).collect(Collectors.toList());
                });
                int cacheQueueSize = this.executorConfig.getCacheQueueSize();
                List<AsyncTask> selectPage = this.repository.selectPage(ExecStatus.READY, now.plusSeconds(300L), list, 0, Math.min(((cacheQueueSize - list.size()) * 2) + 5, cacheQueueSize), this.processorGroup, this.contain);
                if (selectPage.isEmpty()) {
                    this.lastEmptyLoad = System.currentTimeMillis();
                } else {
                    addTask(selectPage);
                }
            }, "task-load", true);
        } else {
            this.loadTask = new SimpleSchedulerTask(() -> {
                LOGGER.debug("当前配置的不从repository中捞取任务，忽略任务捞取调度", new Object[0]);
            }, "task-load", true);
        }
        this.loadTask.setFixedDelay(this.executorConfig.getLoadInterval());
        this.loadTask.start();
        Thread thread = new Thread(() -> {
            while (this.start) {
                try {
                    Thread.sleep(this.executorConfig.getMonitorInterval());
                } catch (Throwable th) {
                    if (!(th instanceof InterruptedException)) {
                        LOGGER.info(th, "监听线程异常", new Object[0]);
                    }
                }
                if (!this.start) {
                    return;
                }
                LockTaskUtil.runWithLock(this.queueLock.readLock(), () -> {
                    this.monitorService.monitor(this.queue.size());
                });
                List<AsyncTask> stat = this.repository.stat(LocalDateTime.now().plus(-this.executorConfig.getExecTimeout(), (TemporalUnit) ChronoUnit.MILLIS));
                if (!stat.isEmpty()) {
                    this.monitorService.taskExecTimeout(stat, this.executorConfig.getExecTimeout());
                }
            }
        }, "monitor");
        thread.setDaemon(true);
        thread.start();
        AsyncThreadPoolConfig threadPoolConfig = this.executorConfig.getThreadPoolConfig();
        this.workerThreads = new Thread[threadPoolConfig.getCorePoolSize()];
        ClassLoader classLoader = threadPoolConfig.getDefaultContextClassLoader() == null ? DefaultAsyncTaskProcessorEngine.class.getClassLoader() : threadPoolConfig.getDefaultContextClassLoader();
        for (int i = 0; i < this.workerThreads.length; i++) {
            Thread thread2 = new Thread(() -> {
                Thread currentThread = Thread.currentThread();
                currentThread.setContextClassLoader(classLoader);
                while (this.start) {
                    try {
                        scheduler();
                    } catch (Throwable th) {
                        if (this.start || !(th instanceof InterruptedException)) {
                            this.monitorService.uncaughtException(currentThread, th);
                        }
                    }
                }
            }, StringUtils.getOrDefault(threadPoolConfig.getThreadName(), DEFAULT_THREAD_NAME) + "-" + i);
            thread2.setDaemon(false);
            thread2.start();
            this.workerThreads[i] = thread2;
        }
        LOGGER.info("异步任务引擎启动成功...", new Object[0]);
    }

    @Override // com.github.joekerouac.async.task.spi.AsyncTaskProcessorEngine
    public synchronized void stop() {
        LOGGER.info("异步任务引擎准备关闭...", new Object[0]);
        this.start = false;
        this.loadTask.stop();
        for (Thread thread : this.workerThreads) {
            thread.interrupt();
        }
        Lock writeLock = this.queueLock.writeLock();
        NavigableSet<Pair<String, AsyncTask>> navigableSet = this.queue;
        navigableSet.getClass();
        LockTaskUtil.runWithLock(writeLock, navigableSet::clear);
        LOGGER.info("异步任务引擎关闭成功...", new Object[0]);
    }

    protected void scheduler() throws InterruptedException {
        AsyncTask take = take();
        if (take == null) {
            return;
        }
        runTask(take);
        tryLoad();
    }

    protected void runTask(AsyncTask asyncTask) {
        ExecResult execResult;
        String requestId = asyncTask.getRequestId();
        LocalDateTime now = LocalDateTime.now();
        if (ChronoUnit.MILLIS.between(now, asyncTask.getExecTime()) > 0) {
            LOGGER.warn("任务 [{}] 未到执行时间，不执行，跳过执行, 当前时间：[{}]", new Object[]{asyncTask, now});
            asyncTask.setStatus(ExecStatus.READY);
            this.repository.update(requestId, ExecStatus.READY, null, null, null, null);
            return;
        }
        AbstractAsyncTaskProcessor<Object> processor = getProcessor(asyncTask.getProcessor());
        String requestId2 = asyncTask.getRequestId();
        if (processor == null) {
            this.monitorService.noProcessor(requestId2, asyncTask.getTask(), asyncTask.getProcessor());
            this.repository.update(requestId, ExecStatus.FINISH, TaskFinishCode.NO_PROCESSOR, null, null, Const.IP);
            return;
        }
        HashMap hashMap = new HashMap();
        try {
            Object deserialize = processor.deserialize(requestId2, asyncTask.getTask(), hashMap);
            Throwable th = null;
            String str = (String) Optional.ofNullable(asyncTask.getExtMap()).map(extMap -> {
                return (String) extMap.get(AsyncTask.ExtMapKey.TRACE_CONTEXT);
            }).orElse(null);
            Object obj = null;
            if (this.traceService != null && str != null) {
                obj = this.traceService.resume(asyncTask.getRetry(), str);
            }
            try {
                ExecResult process = processor.process(requestId2, deserialize, hashMap);
                execResult = process == null ? ExecResult.SUCCESS : process;
            } catch (Throwable th2) {
                execResult = ExecResult.RETRY;
                th = th2;
            }
            boolean z = false;
            try {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug(th, "任务执行结果：[{}:{}:{}]", new Object[]{requestId2, execResult, deserialize});
                }
                switch (AnonymousClass1.$SwitchMap$com$github$joekerouac$async$task$model$ExecResult[execResult.ordinal()]) {
                    case TransactionCallback.STATUS_ROLLED_BACK /* 1 */:
                        finishTask(this.repository, processor, requestId2, deserialize, TaskFinishCode.SUCCESS, null, hashMap);
                        break;
                    case TransactionCallback.STATUS_UNKNOWN /* 2 */:
                        z = true;
                        this.repository.update(requestId2, ExecStatus.WAIT, null, null, null, Const.IP);
                        break;
                    case 3:
                        int retry = asyncTask.getRetry() + 1;
                        boolean z2 = retry > asyncTask.getMaxRetry();
                        if (!z2 && (th == null || processor.canRetry(requestId2, deserialize, th, hashMap))) {
                            z = true;
                            LocalDateTime plus = LocalDateTime.now().plus(Math.max(processor.nextExecTimeInterval(requestId2, retry, deserialize, hashMap), 0L), (TemporalUnit) ChronoUnit.MILLIS);
                            asyncTask.setStatus(ExecStatus.READY);
                            asyncTask.setExecTime(plus);
                            asyncTask.setRetry(retry);
                            if (LOGGER.isDebugEnabled()) {
                                LOGGER.debug(th, "任务重试, [{}:{}:{}]", new Object[]{requestId2, plus, deserialize});
                            }
                            this.monitorService.processRetry(requestId2, deserialize, processor, th, plus);
                            this.repository.update(requestId, ExecStatus.READY, null, plus, Integer.valueOf(retry), Const.IP);
                            break;
                        } else {
                            if (LOGGER.isDebugEnabled()) {
                                LOGGER.debug(th, "任务不可重试, [{}:{}:{}]", new Object[]{requestId2, Boolean.valueOf(z2), deserialize});
                            }
                            TaskFinishCode taskFinishCode = z2 ? TaskFinishCode.RETRY_OVERFLOW : TaskFinishCode.CANNOT_RETRY;
                            this.monitorService.processError(requestId2, taskFinishCode, deserialize, processor, th);
                            finishTask(this.repository, processor, requestId2, deserialize, taskFinishCode, th, hashMap);
                            break;
                        }
                        break;
                    case 4:
                        finishTask(this.repository, processor, requestId2, deserialize, TaskFinishCode.USER_ERROR, null, hashMap);
                        break;
                    default:
                        throw new IllegalStateException(StringUtils.format("不支持的结果状态： [{}]", new Object[]{execResult}));
                }
                if (this.traceService == null || str == null) {
                    return;
                }
                this.traceService.finish(obj, z, execResult, th);
            } catch (Throwable th3) {
                if (this.traceService != null && str != null) {
                    this.traceService.finish(obj, false, execResult, th);
                }
                throw th3;
            }
        } catch (Throwable th4) {
            this.monitorService.deserializationError(requestId2, asyncTask.getTask(), processor, th4);
            this.repository.update(requestId, ExecStatus.FINISH, TaskFinishCode.DESERIALIZATION_ERROR, null, null, Const.IP);
        }
    }

    private void finishTask(AsyncTaskRepository asyncTaskRepository, AbstractAsyncTaskProcessor<Object> abstractAsyncTaskProcessor, String str, Object obj, TaskFinishCode taskFinishCode, Throwable th, Map<String, Object> map) {
        try {
            abstractAsyncTaskProcessor.afterProcess(str, obj, taskFinishCode, th, map);
            asyncTaskRepository.update(str, ExecStatus.FINISH, taskFinishCode, null, null, Const.IP);
        } catch (Error | RuntimeException e) {
            LOGGER.warn(e, "任务 [{}:{}:{}] 的回调执行异常，该异常将导致异步任务被重新执行", new Object[]{str, taskFinishCode, obj});
            throw e;
        }
    }

    private AsyncTask take() {
        String takeFromMemory = takeFromMemory();
        if (takeFromMemory == null) {
            LOGGER.info("系统关闭，停止调度", new Object[0]);
            return null;
        }
        int casUpdate = this.repository.casUpdate(takeFromMemory, ExecStatus.READY, ExecStatus.RUNNING, Const.IP);
        while (casUpdate <= 0) {
            AsyncTask selectByRequestId = this.repository.selectByRequestId(takeFromMemory);
            ExecStatus status = selectByRequestId.getStatus();
            if (status != ExecStatus.READY) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("任务 [{}] 已经在其他机器处理了，无需重复处理", new Object[]{selectByRequestId});
                }
                if (!Objects.equals(selectByRequestId.getExecIp(), Const.IP)) {
                    return null;
                }
                LOGGER.warn("当前任务的执行IP与本主机一致，但是状态不是ready, status: [{}], task: [{}]", new Object[]{status, selectByRequestId});
                return null;
            }
            casUpdate = this.repository.casUpdate(takeFromMemory, ExecStatus.READY, ExecStatus.RUNNING, Const.IP);
        }
        return this.repository.selectByRequestId(takeFromMemory);
    }

    protected void tryLoad() {
        if (this.queue.size() < this.executorConfig.getLoadThreshold()) {
            this.loadTask.scheduler();
        }
    }

    private String takeFromMemory() {
        return (String) LockTaskUtil.runWithLock(this.queueLock.writeLock(), () -> {
            while (this.start) {
                long j = 5000;
                if (this.queue.isEmpty()) {
                    LOGGER.debug("当前队列为空，等待下次唤醒", new Object[0]);
                } else {
                    Pair<String, AsyncTask> first = this.queue.first();
                    LocalDateTime execTime = ((AsyncTask) first.getValue()).getExecTime();
                    LocalDateTime now = LocalDateTime.now();
                    j = ChronoUnit.MILLIS.between(now, execTime);
                    if (j <= 0) {
                        this.queue.pollFirst();
                        return (String) first.getKey();
                    }
                    LOGGER.debug("当前第一个任务执行时间未到, execTime: [{}], now: [{}]", new Object[]{execTime, now});
                }
                try {
                    if (!this.condition.await(j, TimeUnit.MILLISECONDS) && LOGGER.isDebugEnabled()) {
                        LOGGER.debug("唤醒等待超时，自动唤醒检查", new Object[0]);
                    }
                } catch (InterruptedException e) {
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("队列取任务线程等待被打断", new Object[0]);
                    }
                }
            }
            return null;
        });
    }
}
