package com.github.joekerouac.async.task.flow.service;

import com.github.joekerouac.async.task.flow.enums.FlowTaskStatus;
import com.github.joekerouac.async.task.flow.enums.TaskNodeStatus;
import com.github.joekerouac.async.task.flow.model.FlowTask;
import com.github.joekerouac.async.task.flow.model.TaskNode;
import com.github.joekerouac.async.task.flow.service.AbstractFlowTaskEngine;
import com.github.joekerouac.async.task.model.TransStrategy;
import com.github.joekerouac.common.tools.constant.ExceptionProviderConst;
import com.github.joekerouac.common.tools.log.Logger;
import com.github.joekerouac.common.tools.log.LoggerFactory;
import com.github.joekerouac.common.tools.scheduler.SchedulerSystem;
import com.github.joekerouac.common.tools.string.StringUtils;
import com.github.joekerouac.common.tools.util.Assert;
import java.util.List;

/* loaded from: input_file:com/github/joekerouac/async/task/flow/service/StreamTaskEngine.class */
public class StreamTaskEngine extends AbstractFlowTaskEngine {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamTaskEngine.class.getName());
    public static final String PROCESSOR_NAME = "StreamTask";
    private final SchedulerSystem schedulerSystem;

    public StreamTaskEngine(AbstractFlowTaskEngine.EngineConfig engineConfig, SchedulerSystem schedulerSystem) {
        super(engineConfig);
        this.schedulerSystem = schedulerSystem;
    }

    @Override // com.github.joekerouac.async.task.spi.AbstractAsyncTaskProcessor
    public String[] processors() {
        return new String[]{PROCESSOR_NAME};
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.github.joekerouac.async.task.flow.service.AbstractFlowTaskEngine
    public List<TaskNode> getAllChild(TaskNode taskNode) {
        List<TaskNode> allChild = super.getAllChild(taskNode);
        Assert.assertTrue(allChild.size() <= 1, StringUtils.format("数据异常，流式任务应该最多有一个子节点的, [{}:{}]", new Object[]{taskNode.getTaskRequestId(), taskNode.getRequestId()}), ExceptionProviderConst.DBExceptionProvider);
        if (!allChild.isEmpty()) {
            return allChild;
        }
        try {
            return (List) this.transactionManager.runWithTrans(TransStrategy.REQUIRED, () -> {
                Assert.notNull(this.flowTaskRepository.selectForLock(taskNode.getTaskRequestId()), StringUtils.format("系统错误，当前子任务 [{}] 对应的主任务 [{}] 不存在", new Object[]{taskNode.getRequestId(), taskNode.getTaskRequestId()}), ExceptionProviderConst.DBExceptionProvider);
                List<TaskNode> allChild2 = super.getAllChild(taskNode);
                Assert.assertTrue(allChild2.size() <= 1, StringUtils.format("数据异常，流式任务应该最多有一个子节点的, [{}:{}]", new Object[]{taskNode.getTaskRequestId(), taskNode.getRequestId()}), ExceptionProviderConst.DBExceptionProvider);
                if (allChild2.isEmpty()) {
                    this.schedulerSystem.scheduler(taskNode.getTaskRequestId(), false);
                }
                return allChild2;
            });
        } catch (Throwable th) {
            LOGGER.warn(th, "无限流获取子节点异常，如果频繁出现请关注", new Object[0]);
            throw new RuntimeException(th);
        }
    }

    @Override // com.github.joekerouac.async.task.flow.service.AbstractFlowTaskEngine
    protected void taskFinish(TaskNode taskNode, TaskNodeStatus taskNodeStatus) {
    }

    @Override // com.github.joekerouac.async.task.flow.service.AbstractFlowTaskEngine
    protected void notifyPending(TaskNode taskNode, TaskNode taskNode2) {
        try {
            this.transactionManager.runWithTrans(TransStrategy.REQUIRED, () -> {
                FlowTask selectForLock = this.flowTaskRepository.selectForLock(taskNode.getTaskRequestId());
                Assert.notNull(selectForLock, StringUtils.format("系统错误，当前子任务 [{}] 对应的主任务 [{}] 不存在", new Object[]{taskNode.getRequestId(), taskNode.getTaskRequestId()}), ExceptionProviderConst.DBExceptionProvider);
                if (selectForLock.getStatus() == FlowTaskStatus.PENDING) {
                    return;
                }
                Assert.assertTrue(selectForLock.getStatus() == FlowTaskStatus.RUNNING, StringUtils.format("主任务 [{}] 状态为： [{}]，与期望的RUNNING不符", new Object[]{selectForLock.getRequestId(), selectForLock.getStatus()}), ExceptionProviderConst.IllegalStateExceptionProvider);
                this.flowTaskRepository.updateStatus(selectForLock.getRequestId(), FlowTaskStatus.PENDING);
            });
        } catch (Throwable th) {
            LOGGER.warn(th, "无限流通知pending异常，如果频繁出现请关注", new Object[0]);
            throw new RuntimeException(th);
        }
    }
}
