package tech.powerjob.server.core;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import tech.powerjob.common.SystemInstanceResult;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.ProcessorType;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.request.ServerScheduleJobReq;
import tech.powerjob.server.common.Holder;
import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.server.core.instance.InstanceManager;
import tech.powerjob.server.core.instance.InstanceMetadataService;
import tech.powerjob.server.core.lock.UseCacheLock;
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.remote.transporter.TransportService;
import tech.powerjob.server.remote.transporter.impl.ServerURLFactory;
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;

@Service
/* loaded from: input_file:BOOT-INF/lib/powerjob-server-core-5.0.0-beta.jar:tech/powerjob/server/core/DispatchService.class */
public class DispatchService {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) DispatchService.class);
    private final TransportService transportService;
    private final WorkerClusterQueryService workerClusterQueryService;
    private final InstanceManager instanceManager;
    private final InstanceMetadataService instanceMetadataService;
    private final InstanceInfoRepository instanceInfoRepository;

    @UseCacheLock(type = "processJobInstance", key = "#instanceId", concurrencyLevel = 1024)
    public void redispatchAsync(Long l, int i) {
        this.instanceInfoRepository.updateStatusAndGmtModifiedByInstanceIdAndOriginStatus(l.longValue(), i, InstanceStatus.WAITING_DISPATCH.getV(), new Date());
    }

    public void redispatchBatchAsyncLockFree(List<Long> list, int i) {
        this.instanceInfoRepository.updateStatusAndGmtModifiedByInstanceIdListAndOriginStatus(list, i, InstanceStatus.WAITING_DISPATCH.getV(), new Date());
    }

    @UseCacheLock(type = "processJobInstance", key = "#jobInfo.getMaxInstanceNum() > 0 || T(tech.powerjob.common.enums.TimeExpressionType).FREQUENT_TYPES.contains(#jobInfo.getTimeExpressionType()) ? #jobInfo.getId() : #instanceId", concurrencyLevel = 1024)
    public void dispatch(JobInfoDO jobInfoDO, Long l, Optional<InstanceInfoDO> optional, Optional<Holder<Boolean>> optional2) {
        InstanceInfoDO orElseGet = optional.orElseGet(() -> {
            return this.instanceInfoRepository.findByInstanceId(l.longValue());
        });
        Long jobId = orElseGet.getJobId();
        if (InstanceStatus.CANCELED.getV() == orElseGet.getStatus().intValue()) {
            log.info("[Dispatcher-{}|{}] cancel dispatch due to instance has been canceled", jobId, l);
            return;
        }
        if (orElseGet.getStatus().intValue() != InstanceStatus.WAITING_DISPATCH.getV()) {
            log.info("[Dispatcher-{}|{}] cancel dispatch due to instance has been dispatched", jobId, l);
            return;
        }
        if (jobInfoDO.getId() == null) {
            log.warn("[Dispatcher-{}|{}] cancel dispatch due to job(id={}) has been deleted!", jobId, l, jobId);
            this.instanceManager.processFinishedInstance(l, orElseGet.getWfInstanceId(), InstanceStatus.FAILED, "can't find job by id " + jobId);
            return;
        }
        Date date = new Date();
        log.info("[Dispatcher-{}|{}] start to dispatch job: {};instancePrams: {}.", jobId, l, jobInfoDO, orElseGet.getInstanceParams() == null ? "" : orElseGet.getInstanceParams());
        long currentTimeMillis = System.currentTimeMillis();
        Integer maxInstanceNum = jobInfoDO.getMaxInstanceNum();
        if (TimeExpressionType.FREQUENT_TYPES.contains(jobInfoDO.getTimeExpressionType())) {
            maxInstanceNum = 1;
        }
        if (maxInstanceNum.intValue() > 0) {
            long countByJobIdAndStatusIn = this.instanceInfoRepository.countByJobIdAndStatusIn(jobId.longValue(), Lists.newArrayList(Integer.valueOf(InstanceStatus.WAITING_WORKER_RECEIVE.getV()), Integer.valueOf(InstanceStatus.RUNNING.getV())));
            if (countByJobIdAndStatusIn >= maxInstanceNum.intValue()) {
                String format = String.format(SystemInstanceResult.TOO_MANY_INSTANCES, Long.valueOf(countByJobIdAndStatusIn), maxInstanceNum);
                log.warn("[Dispatcher-{}|{}] cancel dispatch job due to too much instance is running ({} > {}).", jobId, l, Long.valueOf(countByJobIdAndStatusIn), maxInstanceNum);
                this.instanceInfoRepository.update4TriggerFailed(l.longValue(), InstanceStatus.FAILED.getV(), currentTimeMillis, currentTimeMillis, "N/A", format, date);
                this.instanceManager.processFinishedInstance(l, orElseGet.getWfInstanceId(), InstanceStatus.FAILED, format);
                return;
            }
        }
        List<WorkerInfo> suitableWorkers = this.workerClusterQueryService.getSuitableWorkers(jobInfoDO);
        if (CollectionUtils.isEmpty(suitableWorkers)) {
            log.warn("[Dispatcher-{}|{}] cancel dispatch job due to no worker available", jobId, l);
            this.instanceInfoRepository.update4TriggerFailed(l.longValue(), InstanceStatus.FAILED.getV(), currentTimeMillis, currentTimeMillis, "N/A", SystemInstanceResult.NO_WORKER_AVAILABLE, date);
            this.instanceManager.processFinishedInstance(l, orElseGet.getWfInstanceId(), InstanceStatus.FAILED, SystemInstanceResult.NO_WORKER_AVAILABLE);
            return;
        }
        List<WorkerInfo> filterOverloadWorker = filterOverloadWorker(suitableWorkers);
        if (filterOverloadWorker.isEmpty()) {
            optional2.ifPresent(holder -> {
                holder.set(true);
            });
            log.warn("[Dispatcher-{}|{}] cancel to dispatch job due to all worker is overload", jobId, l);
            return;
        }
        ServerScheduleJobReq constructServerScheduleJobReq = constructServerScheduleJobReq(jobInfoDO, orElseGet, (List) filterOverloadWorker.stream().map((v0) -> {
            return v0.getAddress();
        }).collect(Collectors.toList()));
        WorkerInfo workerInfo = filterOverloadWorker.get(0);
        String address = workerInfo.getAddress();
        this.transportService.tell(workerInfo.getProtocol(), ServerURLFactory.dispatchJob2Worker(address), constructServerScheduleJobReq);
        log.info("[Dispatcher-{}|{}] send schedule request to TaskTracker[protocol:{},address:{}] successfully: {}.", jobId, l, workerInfo.getProtocol(), address, constructServerScheduleJobReq);
        this.instanceInfoRepository.update4TriggerSucceed(l.longValue(), InstanceStatus.WAITING_WORKER_RECEIVE.getV(), currentTimeMillis, address, date, orElseGet.getStatus().intValue());
        this.instanceMetadataService.loadJobInfo(l, jobInfoDO);
    }

    private List<WorkerInfo> filterOverloadWorker(List<WorkerInfo> list) {
        ArrayList arrayList = new ArrayList(list.size());
        for (WorkerInfo workerInfo : list) {
            if (!workerInfo.overload()) {
                arrayList.add(workerInfo);
            }
        }
        return arrayList;
    }

    private ServerScheduleJobReq constructServerScheduleJobReq(JobInfoDO jobInfoDO, InstanceInfoDO instanceInfoDO, List<String> list) {
        ServerScheduleJobReq serverScheduleJobReq = new ServerScheduleJobReq();
        BeanUtils.copyProperties(jobInfoDO, serverScheduleJobReq);
        serverScheduleJobReq.setJobId(jobInfoDO.getId());
        if (StringUtils.isEmpty(instanceInfoDO.getInstanceParams())) {
            serverScheduleJobReq.setInstanceParams(null);
        } else {
            serverScheduleJobReq.setInstanceParams(instanceInfoDO.getInstanceParams());
        }
        if (!StringUtils.isEmpty(instanceInfoDO.getJobParams())) {
            serverScheduleJobReq.setJobParams(instanceInfoDO.getJobParams());
        }
        serverScheduleJobReq.setInstanceId(instanceInfoDO.getInstanceId());
        serverScheduleJobReq.setAllWorkerAddress(list);
        serverScheduleJobReq.setMaxWorkerCount(jobInfoDO.getMaxWorkerCount());
        serverScheduleJobReq.setWfInstanceId(instanceInfoDO.getWfInstanceId());
        serverScheduleJobReq.setExecuteType(ExecuteType.of(jobInfoDO.getExecuteType().intValue()).name());
        serverScheduleJobReq.setProcessorType(ProcessorType.of(jobInfoDO.getProcessorType().intValue()).name());
        serverScheduleJobReq.setTimeExpressionType(TimeExpressionType.of(jobInfoDO.getTimeExpressionType().intValue()).name());
        if (jobInfoDO.getInstanceTimeLimit() != null) {
            serverScheduleJobReq.setInstanceTimeoutMS(jobInfoDO.getInstanceTimeLimit().longValue());
        }
        serverScheduleJobReq.setThreadConcurrency(jobInfoDO.getConcurrency().intValue());
        return serverScheduleJobReq;
    }

    public DispatchService(TransportService transportService, WorkerClusterQueryService workerClusterQueryService, InstanceManager instanceManager, InstanceMetadataService instanceMetadataService, InstanceInfoRepository instanceInfoRepository) {
        this.transportService = transportService;
        this.workerClusterQueryService = workerClusterQueryService;
        this.instanceManager = instanceManager;
        this.instanceMetadataService = instanceMetadataService;
        this.instanceInfoRepository = instanceInfoRepository;
    }
}
