package tech.powerjob.server.core.workflow;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson.parser.Feature;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.SwitchableStatus;
import tech.powerjob.common.enums.WorkflowInstanceStatus;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.server.common.utils.SpringUtils;
import tech.powerjob.server.core.alarm.AlarmCenter;
import tech.powerjob.server.core.alarm.AlarmUtils;
import tech.powerjob.server.core.alarm.module.WorkflowInstanceAlarm;
import tech.powerjob.server.core.helper.StatusMappingHelper;
import tech.powerjob.server.core.lock.UseCacheLock;
import tech.powerjob.server.core.service.UserService;
import tech.powerjob.server.core.service.WorkflowNodeHandleService;
import tech.powerjob.server.core.uid.IdGenerateService;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowNodeInfoDO;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository;

@Service
/* loaded from: input_file:tech/powerjob/server/core/workflow/WorkflowInstanceManager.class */
public class WorkflowInstanceManager {
    private static final Logger log = LoggerFactory.getLogger(WorkflowInstanceManager.class);
    private final AlarmCenter alarmCenter;
    private final IdGenerateService idGenerateService;
    private final JobInfoRepository jobInfoRepository;
    private final UserService userService;
    private final WorkflowInfoRepository workflowInfoRepository;
    private final WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
    private final WorkflowNodeInfoRepository workflowNodeInfoRepository;
    private final WorkflowNodeHandleService workflowNodeHandleService;

    public Long create(WorkflowInfoDO workflowInfoDO, String str, Long l, Long l2) {
        PEWorkflowDAG pEWorkflowDAG;
        Long id = workflowInfoDO.getId();
        Long valueOf = Long.valueOf(this.idGenerateService.allocate());
        WorkflowInstanceInfoDO constructWfInstance = constructWfInstance(workflowInfoDO, str, l, id, valueOf);
        if (l2 != null) {
            constructWfInstance.setParentWfInstanceId(l2);
            constructWfInstance.setWfContext(str);
        }
        try {
            pEWorkflowDAG = (PEWorkflowDAG) JSON.parseObject(workflowInfoDO.getPeDAG(), PEWorkflowDAG.class);
        } catch (Exception e) {
            if (0 != 0) {
                constructWfInstance.setDag(JSON.toJSONString((Object) null));
            }
            handleWfInstanceFinalStatus(constructWfInstance, e.getMessage(), WorkflowInstanceStatus.FAILED);
        }
        if (!WorkflowDAGUtils.valid(pEWorkflowDAG)) {
            log.error("[Workflow-{}|{}] DAG of this workflow is illegal! maybe you has modified the DAG info directly in database!", id, valueOf);
            throw new PowerJobException("invalid dag");
        }
        initNodeInfo(pEWorkflowDAG);
        HashSet newHashSet = Sets.newHashSet();
        pEWorkflowDAG.getNodes().forEach(node -> {
            if (node.getNodeType().intValue() == WorkflowNodeType.JOB.getCode()) {
                newHashSet.add(node.getJobId());
            }
            node.setStatus(Integer.valueOf(InstanceStatus.WAITING_DISPATCH.getV()));
        });
        int size = newHashSet.size();
        long countByAppIdAndStatusInAndIdIn = this.jobInfoRepository.countByAppIdAndStatusInAndIdIn(workflowInfoDO.getAppId(), Sets.newHashSet(new Integer[]{Integer.valueOf(SwitchableStatus.ENABLE.getV()), Integer.valueOf(SwitchableStatus.DISABLE.getV())}), newHashSet);
        log.debug("[Workflow-{}|{}] contains {} jobs, find {} jobs in database.", new Object[]{id, valueOf, Integer.valueOf(size), Long.valueOf(countByAppIdAndStatusInAndIdIn)});
        if (countByAppIdAndStatusInAndIdIn < newHashSet.size()) {
            log.warn("[Workflow-{}|{}] this workflow need {} jobs, but just find {} jobs in database, maybe you delete or disable some job!", new Object[]{id, valueOf, Integer.valueOf(size), Long.valueOf(countByAppIdAndStatusInAndIdIn)});
            throw new PowerJobException("can't find some job");
        }
        constructWfInstance.setDag(JSON.toJSONString(pEWorkflowDAG));
        this.workflowInstanceInfoRepository.saveAndFlush(constructWfInstance);
        return valueOf;
    }

    private void initNodeInfo(PEWorkflowDAG pEWorkflowDAG) {
        for (PEWorkflowDAG.Node node : pEWorkflowDAG.getNodes()) {
            WorkflowNodeInfoDO workflowNodeInfoDO = (WorkflowNodeInfoDO) this.workflowNodeInfoRepository.findById(node.getNodeId()).orElseThrow(() -> {
                return new PowerJobException("can't find some node");
            });
            if (workflowNodeInfoDO.getType() == null) {
                workflowNodeInfoDO.setType(Integer.valueOf(WorkflowNodeType.JOB.getCode()));
            }
            node.setNodeType(workflowNodeInfoDO.getType()).setJobId(workflowNodeInfoDO.getJobId()).setNodeName(workflowNodeInfoDO.getNodeName()).setNodeParams(workflowNodeInfoDO.getNodeParams()).setEnable(workflowNodeInfoDO.getEnable()).setSkipWhenFailed(workflowNodeInfoDO.getSkipWhenFailed());
            if (node.getNodeType().intValue() == WorkflowNodeType.JOB.getCode()) {
                if (workflowNodeInfoDO.getJobId() == null) {
                    throw new PowerJobException("illegal node info");
                }
                JobInfoDO jobInfoDO = (JobInfoDO) this.jobInfoRepository.findById(workflowNodeInfoDO.getJobId()).orElseThrow(() -> {
                    return new PowerJobException("can't find some job");
                });
                if (StringUtils.isBlank(workflowNodeInfoDO.getNodeParams())) {
                    node.setNodeParams(jobInfoDO.getJobParams());
                } else {
                    node.setNodeParams(workflowNodeInfoDO.getNodeParams());
                }
            }
        }
    }

    private WorkflowInstanceInfoDO constructWfInstance(WorkflowInfoDO workflowInfoDO, String str, Long l, Long l2, Long l3) {
        Date date = new Date();
        WorkflowInstanceInfoDO workflowInstanceInfoDO = new WorkflowInstanceInfoDO();
        workflowInstanceInfoDO.setAppId(workflowInfoDO.getAppId());
        workflowInstanceInfoDO.setWfInstanceId(l3);
        workflowInstanceInfoDO.setWorkflowId(l2);
        workflowInstanceInfoDO.setStatus(Integer.valueOf(WorkflowInstanceStatus.WAITING.getV()));
        workflowInstanceInfoDO.setExpectedTriggerTime(l);
        workflowInstanceInfoDO.setActualTriggerTime(Long.valueOf(System.currentTimeMillis()));
        workflowInstanceInfoDO.setWfInitParams(str);
        boolean z = false;
        try {
            Map map = (Map) JSON.parseObject(str, new TypeReference<Map<String, String>>() { // from class: tech.powerjob.server.core.workflow.WorkflowInstanceManager.1
            }, new Feature[0]);
            if (map != null) {
                if (!map.isEmpty()) {
                    z = true;
                }
            }
        } catch (Exception e) {
        }
        if (z) {
            workflowInstanceInfoDO.setWfContext(str);
        } else {
            HashMap newHashMap = Maps.newHashMap();
            newHashMap.put("initParams", str);
            workflowInstanceInfoDO.setWfContext(JsonUtils.toJSONString(newHashMap));
        }
        workflowInstanceInfoDO.setGmtCreate(date);
        workflowInstanceInfoDO.setGmtModified(date);
        return workflowInstanceInfoDO;
    }

    @UseCacheLock(type = "processWfInstance", key = "#wfInfo.getMaxWfInstanceNum() > 0 ? #wfInfo.getId() : #wfInstanceId", concurrencyLevel = 1024)
    public void start(WorkflowInfoDO workflowInfoDO, Long l) {
        int countByWorkflowIdAndStatusIn;
        Optional findByWfInstanceId = this.workflowInstanceInfoRepository.findByWfInstanceId(l);
        if (!findByWfInstanceId.isPresent()) {
            log.error("[WorkflowInstanceManager] can't find metadata by workflowInstanceId({}).", l);
            return;
        }
        WorkflowInstanceInfoDO workflowInstanceInfoDO = (WorkflowInstanceInfoDO) findByWfInstanceId.get();
        if (workflowInstanceInfoDO.getStatus().intValue() != WorkflowInstanceStatus.WAITING.getV()) {
            log.info("[Workflow-{}|{}] workflowInstance({}) needn't running any more.", new Object[]{workflowInfoDO.getId(), l, workflowInstanceInfoDO});
            return;
        }
        if (workflowInfoDO.getMaxWfInstanceNum().intValue() > 0 && (countByWorkflowIdAndStatusIn = this.workflowInstanceInfoRepository.countByWorkflowIdAndStatusIn(workflowInfoDO.getId(), WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS)) > workflowInfoDO.getMaxWfInstanceNum().intValue()) {
            handleWfInstanceFinalStatus(workflowInstanceInfoDO, String.format("too many instances(%d>%d)", Integer.valueOf(countByWorkflowIdAndStatusIn), workflowInfoDO.getMaxWfInstanceNum()), WorkflowInstanceStatus.FAILED);
            return;
        }
        try {
            PEWorkflowDAG pEWorkflowDAG = (PEWorkflowDAG) JSON.parseObject(workflowInstanceInfoDO.getDag(), PEWorkflowDAG.class);
            List<PEWorkflowDAG.Node> listReadyNodes = WorkflowDAGUtils.listReadyNodes(pEWorkflowDAG);
            List<PEWorkflowDAG.Node> findControlNodes = findControlNodes(listReadyNodes);
            while (!findControlNodes.isEmpty()) {
                this.workflowNodeHandleService.handleControlNodes(findControlNodes, pEWorkflowDAG, workflowInstanceInfoDO);
                listReadyNodes = WorkflowDAGUtils.listReadyNodes(pEWorkflowDAG);
                findControlNodes = findControlNodes(listReadyNodes);
            }
            if (!listReadyNodes.isEmpty()) {
                workflowInstanceInfoDO.setStatus(Integer.valueOf(WorkflowInstanceStatus.RUNNING.getV()));
                this.workflowNodeHandleService.handleTaskNodes(listReadyNodes, pEWorkflowDAG, workflowInstanceInfoDO);
                log.info("[Workflow-{}|{}] start workflow successfully", workflowInfoDO.getId(), l);
            } else {
                workflowInstanceInfoDO.setFinishedTime(Long.valueOf(System.currentTimeMillis()));
                workflowInstanceInfoDO.setDag(JSON.toJSONString(pEWorkflowDAG));
                log.warn("[Workflow-{}|{}] workflowInstance({}) needn't running ", new Object[]{workflowInfoDO.getId(), l, workflowInstanceInfoDO});
                handleWfInstanceFinalStatus(workflowInstanceInfoDO, "no enabled nodes", WorkflowInstanceStatus.SUCCEED);
            }
        } catch (Exception e) {
            log.error("[Workflow-{}|{}] start workflow: {} failed.", new Object[]{workflowInfoDO.getId(), l, workflowInfoDO, e});
            handleWfInstanceFinalStatus(workflowInstanceInfoDO, e.getMessage(), WorkflowInstanceStatus.FAILED);
        }
    }

    @UseCacheLock(type = "processWfInstance", key = "#wfInstanceId", concurrencyLevel = 1024)
    public void move(Long l, Long l2, InstanceStatus instanceStatus, String str) {
        Optional findByWfInstanceId = this.workflowInstanceInfoRepository.findByWfInstanceId(l);
        if (!findByWfInstanceId.isPresent()) {
            log.error("[WorkflowInstanceManager] can't find metadata by workflowInstanceId({}).", l);
            return;
        }
        WorkflowInstanceInfoDO workflowInstanceInfoDO = (WorkflowInstanceInfoDO) findByWfInstanceId.get();
        Long workflowId = workflowInstanceInfoDO.getWorkflowId();
        if (instanceStatus != InstanceStatus.STOPPED || WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(workflowInstanceInfoDO.getStatus())) {
            try {
                PEWorkflowDAG pEWorkflowDAG = (PEWorkflowDAG) JSON.parseObject(workflowInstanceInfoDO.getDag(), PEWorkflowDAG.class);
                boolean z = true;
                PEWorkflowDAG.Node node = null;
                for (PEWorkflowDAG.Node node2 : pEWorkflowDAG.getNodes()) {
                    if (l2.equals(node2.getInstanceId())) {
                        node2.setStatus(Integer.valueOf(instanceStatus.getV()));
                        node2.setResult(str);
                        node2.setFinishedTime(CommonUtils.formatTime(Long.valueOf(System.currentTimeMillis())));
                        node = node2;
                        log.info("[Workflow-{}|{}] node(nodeId={},jobId={},instanceId={}) finished in workflowInstance, status={},result={}", new Object[]{workflowId, l, node2.getNodeId(), node2.getJobId(), l2, instanceStatus.name(), str});
                    }
                    if (InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(node2.getStatus())) {
                        z = false;
                    }
                }
                if (node == null) {
                    log.warn("[Workflow-{}|{}] current job instance(instanceId={}) is dissociative! it will be ignore! ", new Object[]{workflowId, l, l2});
                    return;
                }
                workflowInstanceInfoDO.setGmtModified(new Date());
                workflowInstanceInfoDO.setDag(JSON.toJSONString(pEWorkflowDAG));
                if (!WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(workflowInstanceInfoDO.getStatus())) {
                    this.workflowInstanceInfoRepository.saveAndFlush(workflowInstanceInfoDO);
                    log.info("[Workflow-{}|{}] workflow already finished(status={}), just update the dag info.", new Object[]{workflowId, l, workflowInstanceInfoDO.getStatus()});
                    return;
                }
                if (instanceStatus == InstanceStatus.FAILED && WorkflowDAGUtils.isNotAllowSkipWhenFailed(node)) {
                    log.warn("[Workflow-{}|{}] workflow instance process failed because middle task(instanceId={}) failed", new Object[]{workflowId, l, l2});
                    handleWfInstanceFinalStatus(workflowInstanceInfoDO, "middle job failed", WorkflowInstanceStatus.FAILED);
                    return;
                }
                if (instanceStatus == InstanceStatus.STOPPED) {
                    handleWfInstanceFinalStatus(workflowInstanceInfoDO, "middle job stopped by user", WorkflowInstanceStatus.STOPPED);
                    log.warn("[Workflow-{}|{}] workflow instance stopped because middle task(instanceId={}) stopped by user", new Object[]{workflowId, l, l2});
                    return;
                }
                List<PEWorkflowDAG.Node> listReadyNodes = WorkflowDAGUtils.listReadyNodes(pEWorkflowDAG);
                if (listReadyNodes.isEmpty() && isFinish(pEWorkflowDAG)) {
                    z = true;
                }
                if (z) {
                    workflowInstanceInfoDO.setDag(JSON.toJSONString(pEWorkflowDAG));
                    handleWfInstanceFinalStatus(workflowInstanceInfoDO, str, WorkflowInstanceStatus.SUCCEED);
                    log.info("[Workflow-{}|{}] process successfully.", workflowId, l);
                    return;
                }
                List<PEWorkflowDAG.Node> findControlNodes = findControlNodes(listReadyNodes);
                while (!findControlNodes.isEmpty()) {
                    this.workflowNodeHandleService.handleControlNodes(findControlNodes, pEWorkflowDAG, workflowInstanceInfoDO);
                    listReadyNodes = WorkflowDAGUtils.listReadyNodes(pEWorkflowDAG);
                    findControlNodes = findControlNodes(listReadyNodes);
                }
                if (!listReadyNodes.isEmpty()) {
                    this.workflowNodeHandleService.handleTaskNodes(listReadyNodes, pEWorkflowDAG, workflowInstanceInfoDO);
                    return;
                }
                if (!isFinish(pEWorkflowDAG)) {
                    workflowInstanceInfoDO.setDag(JSON.toJSONString(pEWorkflowDAG));
                    this.workflowInstanceInfoRepository.saveAndFlush(workflowInstanceInfoDO);
                } else {
                    workflowInstanceInfoDO.setDag(JSON.toJSONString(pEWorkflowDAG));
                    handleWfInstanceFinalStatus(workflowInstanceInfoDO, str, WorkflowInstanceStatus.SUCCEED);
                    log.info("[Workflow-{}|{}] process successfully.", workflowId, l);
                }
            } catch (Exception e) {
                handleWfInstanceFinalStatus(workflowInstanceInfoDO, "MOVE NEXT STEP FAILED: " + e.getMessage(), WorkflowInstanceStatus.FAILED);
                log.error("[Workflow-{}|{}] update failed.", new Object[]{workflowId, l, e});
            }
        }
    }

    @UseCacheLock(type = "processWfInstance", key = "#wfInstanceId", concurrencyLevel = 1024)
    public void updateWorkflowContext(Long l, Map<String, String> map) {
        try {
            Optional findByWfInstanceId = this.workflowInstanceInfoRepository.findByWfInstanceId(l);
            if (!findByWfInstanceId.isPresent()) {
                log.error("[WorkflowInstanceManager] can't find metadata by workflowInstanceId({}).", l);
                return;
            }
            WorkflowInstanceInfoDO workflowInstanceInfoDO = (WorkflowInstanceInfoDO) findByWfInstanceId.get();
            HashMap hashMap = (HashMap) JSON.parseObject(workflowInstanceInfoDO.getWfContext(), new TypeReference<HashMap<String, String>>() { // from class: tech.powerjob.server.core.workflow.WorkflowInstanceManager.2
            }, new Feature[0]);
            for (Map.Entry<String, String> entry : map.entrySet()) {
                String key = entry.getKey();
                log.info("[Workflow-{}|{}] update workflow context {} : {} -> {}", new Object[]{workflowInstanceInfoDO.getWorkflowId(), workflowInstanceInfoDO.getWfInstanceId(), key, (String) hashMap.put(key, entry.getValue()), entry.getValue()});
            }
            workflowInstanceInfoDO.setWfContext(JSON.toJSONString(hashMap));
            this.workflowInstanceInfoRepository.saveAndFlush(workflowInstanceInfoDO);
        } catch (Exception e) {
            log.error("[WorkflowInstanceManager] update workflow(workflowInstanceId={}) context failed.", l, e);
        }
    }

    private void handleWfInstanceFinalStatus(WorkflowInstanceInfoDO workflowInstanceInfoDO, String str, WorkflowInstanceStatus workflowInstanceStatus) {
        workflowInstanceInfoDO.setStatus(Integer.valueOf(workflowInstanceStatus.getV()));
        workflowInstanceInfoDO.setResult(str);
        workflowInstanceInfoDO.setFinishedTime(Long.valueOf(System.currentTimeMillis()));
        workflowInstanceInfoDO.setGmtModified(new Date());
        this.workflowInstanceInfoRepository.saveAndFlush(workflowInstanceInfoDO);
        if (workflowInstanceInfoDO.getParentWfInstanceId() != null) {
            if (workflowInstanceStatus == WorkflowInstanceStatus.SUCCEED) {
                ((WorkflowInstanceManager) SpringUtils.getBean(getClass())).updateWorkflowContext(workflowInstanceInfoDO.getParentWfInstanceId(), (HashMap) JSON.parseObject(workflowInstanceInfoDO.getWfContext(), new TypeReference<HashMap<String, String>>() { // from class: tech.powerjob.server.core.workflow.WorkflowInstanceManager.3
                }, new Feature[0]));
            }
            ((WorkflowInstanceManager) SpringUtils.getBean(getClass())).move(workflowInstanceInfoDO.getParentWfInstanceId(), workflowInstanceInfoDO.getWfInstanceId(), StatusMappingHelper.toInstanceStatus(workflowInstanceStatus), str);
        }
        if (workflowInstanceStatus == WorkflowInstanceStatus.FAILED) {
            try {
                this.workflowInfoRepository.findById(workflowInstanceInfoDO.getWorkflowId()).ifPresent(workflowInfoDO -> {
                    WorkflowInstanceAlarm workflowInstanceAlarm = new WorkflowInstanceAlarm();
                    BeanUtils.copyProperties(workflowInfoDO, workflowInstanceAlarm);
                    BeanUtils.copyProperties(workflowInstanceInfoDO, workflowInstanceAlarm);
                    workflowInstanceAlarm.setResult(str);
                    this.alarmCenter.alarmFailed(workflowInstanceAlarm, AlarmUtils.convertUserInfoList2AlarmTargetList(this.userService.fetchNotifyUserList(workflowInfoDO.getNotifyUserIds())));
                });
            } catch (Exception e) {
            }
        }
    }

    private List<PEWorkflowDAG.Node> findControlNodes(List<PEWorkflowDAG.Node> list) {
        return (List) list.stream().filter(node -> {
            return WorkflowNodeType.of(node.getNodeType().intValue()).isControlNode();
        }).collect(Collectors.toList());
    }

    private boolean isFinish(PEWorkflowDAG pEWorkflowDAG) {
        Iterator it = pEWorkflowDAG.getNodes().iterator();
        while (it.hasNext()) {
            if (InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(((PEWorkflowDAG.Node) it.next()).getStatus())) {
                return false;
            }
        }
        return true;
    }

    public WorkflowInstanceManager(AlarmCenter alarmCenter, IdGenerateService idGenerateService, JobInfoRepository jobInfoRepository, UserService userService, WorkflowInfoRepository workflowInfoRepository, WorkflowInstanceInfoRepository workflowInstanceInfoRepository, WorkflowNodeInfoRepository workflowNodeInfoRepository, WorkflowNodeHandleService workflowNodeHandleService) {
        this.alarmCenter = alarmCenter;
        this.idGenerateService = idGenerateService;
        this.jobInfoRepository = jobInfoRepository;
        this.userService = userService;
        this.workflowInfoRepository = workflowInfoRepository;
        this.workflowInstanceInfoRepository = workflowInstanceInfoRepository;
        this.workflowNodeInfoRepository = workflowNodeInfoRepository;
        this.workflowNodeHandleService = workflowNodeHandleService;
    }
}
