package com.github.joekerouac.async.task.service;

import com.github.joekerouac.async.task.AsyncTaskService;
import com.github.joekerouac.async.task.Const;
import com.github.joekerouac.async.task.entity.AsyncTask;
import com.github.joekerouac.async.task.entity.common.ExtMap;
import com.github.joekerouac.async.task.impl.MonitorServiceAdaptor;
import com.github.joekerouac.async.task.impl.MonitorServiceProxy;
import com.github.joekerouac.async.task.model.AsyncServiceConfig;
import com.github.joekerouac.async.task.model.AsyncTaskExecutorConfig;
import com.github.joekerouac.async.task.model.AsyncTaskProcessorEngineConfig;
import com.github.joekerouac.async.task.model.CancelStatus;
import com.github.joekerouac.async.task.model.ExecStatus;
import com.github.joekerouac.async.task.model.TaskFinishCode;
import com.github.joekerouac.async.task.model.TransStrategy;
import com.github.joekerouac.async.task.spi.AbstractAsyncTaskProcessor;
import com.github.joekerouac.async.task.spi.AsyncTaskProcessorEngine;
import com.github.joekerouac.async.task.spi.AsyncTaskRepository;
import com.github.joekerouac.async.task.spi.IDGenerator;
import com.github.joekerouac.async.task.spi.MonitorService;
import com.github.joekerouac.async.task.spi.TraceService;
import com.github.joekerouac.async.task.spi.TransactionCallback;
import com.github.joekerouac.common.tools.collection.CollectionUtil;
import com.github.joekerouac.common.tools.constant.ExceptionProviderConst;
import com.github.joekerouac.common.tools.log.Logger;
import com.github.joekerouac.common.tools.log.LoggerFactory;
import com.github.joekerouac.common.tools.string.StringUtils;
import com.github.joekerouac.common.tools.util.Assert;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import javax.validation.constraints.NotNull;

/* loaded from: input_file:com/github/joekerouac/async/task/service/AsyncTaskServiceImpl.class */
public class AsyncTaskServiceImpl implements AsyncTaskService {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncTaskServiceImpl.class.getName());
    private final AsyncServiceConfig config;
    private final AsyncTaskProcessorEngine defaultEngine;
    private final Map<String, AsyncTaskProcessorEngine> engineMap;
    private volatile boolean start = false;

    public AsyncTaskServiceImpl(@NotNull AsyncServiceConfig asyncServiceConfig) {
        Assert.notNull(asyncServiceConfig, "config不能为null", ExceptionProviderConst.IllegalArgumentExceptionProvider);
        Const.VALIDATION_SERVICE.validate(asyncServiceConfig);
        MonitorService monitorService = asyncServiceConfig.getMonitorService();
        MonitorService monitorServiceAdaptor = monitorService == null ? new MonitorServiceAdaptor() : monitorService;
        asyncServiceConfig.setMonitorService(monitorServiceAdaptor instanceof MonitorServiceProxy ? monitorServiceAdaptor : new MonitorServiceProxy(monitorServiceAdaptor));
        if (asyncServiceConfig.getEngineFactory() == null) {
            asyncServiceConfig.setEngineFactory(new DefaultAsyncTaskProcessorEngineFactory());
        }
        TaskClearRunner taskClearRunner = new TaskClearRunner(asyncServiceConfig.getRepository());
        this.engineMap = new HashMap();
        this.config = asyncServiceConfig;
        Map<Set<String>, AsyncTaskExecutorConfig> executorConfigs = asyncServiceConfig.getExecutorConfigs();
        HashSet hashSet = new HashSet();
        if (!CollectionUtil.isEmpty(executorConfigs)) {
            executorConfigs.forEach((set, asyncTaskExecutorConfig) -> {
                AsyncTaskProcessorEngine build = build(asyncServiceConfig, taskClearRunner, set, true);
                Iterator it = set.iterator();
                while (it.hasNext()) {
                    String str = (String) it.next();
                    Assert.assertTrue(hashSet.add(str), StringUtils.format("处理器有多个配置, processor: [{}]", new Object[]{str}), ExceptionProviderConst.IllegalArgumentExceptionProvider);
                    this.engineMap.put(str, build);
                }
            });
        }
        this.defaultEngine = build(asyncServiceConfig, taskClearRunner, hashSet, false);
        if (CollectionUtil.isNotEmpty(asyncServiceConfig.getProcessors())) {
            asyncServiceConfig.getProcessors().forEach(this::addProcessor);
        }
        Thread thread = new Thread(taskClearRunner, "异步任务自动清理线程");
        thread.setPriority(1);
        thread.setDaemon(true);
        thread.start();
    }

    private AsyncTaskProcessorEngine build(AsyncServiceConfig asyncServiceConfig, TaskClearRunner taskClearRunner, Set<String> set, boolean z) {
        AsyncTaskProcessorEngineConfig asyncTaskProcessorEngineConfig = new AsyncTaskProcessorEngineConfig();
        asyncTaskProcessorEngineConfig.setExecutorConfig(asyncServiceConfig.getDefaultExecutorConfig());
        asyncTaskProcessorEngineConfig.setRepository(asyncServiceConfig.getRepository());
        asyncTaskProcessorEngineConfig.setProcessorSupplier(asyncServiceConfig.getProcessorSupplier());
        asyncTaskProcessorEngineConfig.setTraceService(asyncServiceConfig.getTraceService());
        asyncTaskProcessorEngineConfig.setMonitorService(asyncServiceConfig.getMonitorService());
        asyncTaskProcessorEngineConfig.setTaskClearRunner(taskClearRunner);
        asyncTaskProcessorEngineConfig.setProcessorGroup(set);
        asyncTaskProcessorEngineConfig.setContain(z);
        int cacheQueueSize = asyncTaskProcessorEngineConfig.getExecutorConfig().getCacheQueueSize();
        int loadThreshold = asyncTaskProcessorEngineConfig.getExecutorConfig().getLoadThreshold();
        Assert.assertTrue(loadThreshold < cacheQueueSize || (loadThreshold == 0 && cacheQueueSize == 0), StringUtils.format("触发捞取任务的队列长度阈值应该小于缓存队列的长度，当前触发捞取任务的队列长度为：[{}],当前缓存队列长度为：[{}]", new Object[]{Integer.valueOf(loadThreshold), Integer.valueOf(cacheQueueSize)}), ExceptionProviderConst.IllegalArgumentExceptionProvider);
        return asyncServiceConfig.getEngineFactory().create(asyncTaskProcessorEngineConfig);
    }

    @Override // com.github.joekerouac.async.task.AsyncTaskService
    public void start() {
        synchronized (this.config) {
            if (this.start) {
                LOGGER.warn("当前异步任务服务已经启动，请勿重复调用启动方法", new Object[0]);
            } else {
                this.defaultEngine.start();
                if (!this.engineMap.isEmpty()) {
                    this.engineMap.values().forEach((v0) -> {
                        v0.start();
                    });
                }
                this.start = true;
            }
        }
    }

    @Override // com.github.joekerouac.async.task.AsyncTaskService
    public void stop() {
        synchronized (this.config) {
            if (this.start) {
                this.defaultEngine.stop();
                if (!this.engineMap.isEmpty()) {
                    this.engineMap.values().forEach((v0) -> {
                        v0.stop();
                    });
                }
                this.start = false;
            } else {
                LOGGER.warn("当前异步任务服务已经关闭，请勿重复调用关闭方法", new Object[0]);
            }
        }
    }

    @Override // com.github.joekerouac.async.task.AsyncTaskService
    public void addProcessor(AbstractAsyncTaskProcessor<?> abstractAsyncTaskProcessor) {
        for (String str : abstractAsyncTaskProcessor.processors()) {
            getEngine(str).addProcessor(abstractAsyncTaskProcessor);
        }
    }

    @Override // com.github.joekerouac.async.task.AsyncTaskService
    public <T, P extends AbstractAsyncTaskProcessor<T>> P removeProcessor(String str) {
        return (P) getEngine(str).removeProcessor(str);
    }

    @Override // com.github.joekerouac.async.task.AsyncTaskService
    public <T, P extends AbstractAsyncTaskProcessor<T>> P getProcessor(String str) {
        return (P) getEngine(str).getProcessor(str);
    }

    @Override // com.github.joekerouac.async.task.AsyncTaskService
    public void addTask(String str, Object obj, int i, LocalDateTime localDateTime, String str2, TransStrategy transStrategy) {
        addTaskInternal(str, obj, i, localDateTime, str2, transStrategy, ExecStatus.READY);
    }

    @Override // com.github.joekerouac.async.task.AsyncTaskService
    public void addTaskWithWait(String str, Object obj, int i, LocalDateTime localDateTime, String str2, TransStrategy transStrategy) {
        addTaskInternal(str, obj, i, localDateTime, str2, transStrategy, ExecStatus.WAIT);
    }

    @Override // com.github.joekerouac.async.task.AsyncTaskService
    public void notifyTask(String str, TransStrategy transStrategy) {
        AsyncTask selectByRequestId = this.config.getRepository().selectByRequestId(str);
        if (selectByRequestId == null || selectByRequestId.getStatus() != ExecStatus.WAIT) {
            return;
        }
        this.config.getTransactionManager().runWithTrans(transStrategy, () -> {
            if (this.config.getRepository().casUpdate(str, ExecStatus.WAIT, ExecStatus.READY, Const.IP) > 0) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("唤醒任务: [{}]", new Object[]{selectByRequestId.getRequestId()});
                }
                selectByRequestId.setStatus(ExecStatus.READY);
                addTaskToEngineAfterTransCommit(selectByRequestId);
            }
        });
    }

    @Override // com.github.joekerouac.async.task.AsyncTaskService
    public CancelStatus cancelTask(String str, TransStrategy transStrategy) {
        Assert.assertTrue(this.start, "当前服务还未启动，请先启动后调用", ExceptionProviderConst.IllegalStateExceptionProvider);
        return (CancelStatus) this.config.getTransactionManager().runWithTrans(transStrategy, () -> {
            AsyncTask selectByRequestId = this.config.getRepository().selectByRequestId(str);
            if (selectByRequestId == null) {
                return CancelStatus.NOT_EXIST;
            }
            if (selectByRequestId.getStatus() == ExecStatus.RUNNING) {
                return CancelStatus.RUNNING;
            }
            if (selectByRequestId.getStatus() == ExecStatus.FINISH) {
                return CancelStatus.FINISH;
            }
            if (this.config.getRepository().casCancel(str, selectByRequestId.getStatus(), Const.IP) > 0) {
                return CancelStatus.SUCCESS;
            }
            LOGGER.info("任务取消失败，当前任务状态: [{}:{}]", new Object[]{str, selectByRequestId.getStatus()});
            return CancelStatus.UNKNOWN;
        });
    }

    private void addTaskInternal(String str, Object obj, int i, LocalDateTime localDateTime, String str2, TransStrategy transStrategy, ExecStatus execStatus) {
        String dump;
        Assert.assertTrue(this.start, "当前服务还未启动，请先启动后调用", ExceptionProviderConst.IllegalStateExceptionProvider);
        AbstractAsyncTaskProcessor processor = getEngine(str2).getProcessor(str2);
        Assert.notNull(processor, StringUtils.format("指定的任务处理器 [{}] 不存在", new Object[]{str2}), ExceptionProviderConst.IllegalArgumentExceptionProvider);
        IDGenerator idGenerator = this.config.getIdGenerator();
        String generateId = idGenerator.generateId();
        Assert.notBlank(generateId, StringUtils.format("ID生成器 [{}] 生成的ID为空", new Object[]{idGenerator}), ExceptionProviderConst.IllegalStateExceptionProvider);
        String serialize = processor.serialize(obj);
        AsyncTaskRepository repository = this.config.getRepository();
        AsyncTask asyncTask = new AsyncTask();
        asyncTask.setId(generateId);
        asyncTask.setRequestId(str);
        asyncTask.setTask(serialize);
        asyncTask.setMaxRetry(i);
        asyncTask.setExecTime(localDateTime);
        asyncTask.setProcessor(str2);
        asyncTask.setRetry(0);
        asyncTask.setStatus(execStatus);
        asyncTask.setTaskFinishCode(TaskFinishCode.NONE);
        asyncTask.setCreateIp(Const.IP);
        asyncTask.setExecIp(Const.IP);
        TraceService traceService = this.config.getTraceService();
        if (traceService != null && (dump = traceService.dump()) != null) {
            ExtMap<String, Object> extMap = asyncTask.getExtMap();
            if (extMap == null) {
                extMap = new ExtMap<>();
                asyncTask.setExtMap(extMap);
            }
            extMap.put(AsyncTask.ExtMapKey.TRACE_CONTEXT, dump);
        }
        this.config.getTransactionManager().runWithTrans(transStrategy, () -> {
            if (repository.save(asyncTask)) {
                addTaskToEngineAfterTransCommit(asyncTask);
            } else {
                this.config.getMonitorService().duplicateTask(str, obj);
            }
        });
    }

    private void addTaskToEngineAfterTransCommit(AsyncTask asyncTask) {
        final Runnable runnable = () -> {
            getEngine(asyncTask.getProcessor()).addTask(Collections.singletonList(asyncTask));
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("将任务[{}]添加到内存队列中", new Object[]{asyncTask});
            }
        };
        if (this.config.getTransactionManager().isActualTransactionActive()) {
            LOGGER.debug("当前在事务中，等待事务提交后将任务提交到任务执行引擎", new Object[0]);
            this.config.getTransactionManager().registerCallback(new TransactionCallback() { // from class: com.github.joekerouac.async.task.service.AsyncTaskServiceImpl.1
                @Override // com.github.joekerouac.async.task.spi.TransactionCallback
                public void afterCommit() throws RuntimeException {
                    runnable.run();
                }
            });
        } else {
            LOGGER.debug("当前不在事务中，直接将任务提交到任务执行引擎", new Object[0]);
            runnable.run();
        }
    }

    private AsyncTaskProcessorEngine getEngine(String str) {
        return (AsyncTaskProcessorEngine) Optional.ofNullable(this.engineMap.get(str)).orElse(this.defaultEngine);
    }
}
