package org.apache.shardingsphere.data.pipeline.core.job;

import java.util.Collection;
import java.util.LinkedList;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.class */
public abstract class AbstractInseparablePipelineJob<T extends PipelineJobItemContext> implements PipelineJob {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(AbstractInseparablePipelineJob.class);
    private final PipelineJobRunnerManager jobRunnerManager;

    public final void execute(ShardingContext shardingContext) {
        String jobName = shardingContext.getJobName();
        log.info("Execute job {}", jobName);
        PipelineJobType parseJobType = PipelineJobIdUtils.parseJobType(jobName);
        PipelineJobConfiguration swapToObject = parseJobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
        LinkedList linkedList = new LinkedList();
        for (int i = 0; i < swapToObject.getJobShardingCount(); i++) {
            if (this.jobRunnerManager.isStopping()) {
                log.info("Stopping true, ignore");
                return;
            }
            T buildJobItemContext = buildJobItemContext(swapToObject, i);
            if (this.jobRunnerManager.addTasksRunner(i, buildTasksRunner(buildJobItemContext))) {
                linkedList.add(buildJobItemContext);
                PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobName)).getJobItemFacade().getErrorMessage().clean(jobName, i);
                log.info("Start tasks runner, jobId={}, shardingItem={}", jobName, Integer.valueOf(i));
            }
        }
        if (linkedList.isEmpty()) {
            log.warn("Job item contexts is empty, ignore");
            return;
        }
        prepare(linkedList);
        executeInventoryTasks(parseJobType, linkedList);
        executeIncrementalTasks(parseJobType, linkedList);
    }

    protected abstract T buildJobItemContext(PipelineJobConfiguration pipelineJobConfiguration, int i);

    protected abstract PipelineTasksRunner buildTasksRunner(T t);

    private void prepare(Collection<T> collection) {
        try {
            doPrepare(collection);
        } catch (RuntimeException e) {
            for (T t : collection) {
                processFailed(t.getJobId(), t.getShardingItem(), e);
            }
            throw e;
        }
    }

    protected abstract void doPrepare(Collection<T> collection);

    private void processFailed(String str, int i, Exception exc) {
        log.error("Job execution failed, {}-{}", new Object[]{str, Integer.valueOf(i), exc});
        PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(str)).getJobItemFacade().getErrorMessage().update(str, i, exc);
        PipelineJobRegistry.stop(str);
        processFailed(str);
    }

    protected abstract void processFailed(String str);

    private void executeInventoryTasks(PipelineJobType pipelineJobType, Collection<T> collection) {
        LinkedList linkedList = new LinkedList();
        for (T t : collection) {
            updateJobItemStatus(t, pipelineJobType, JobStatus.EXECUTE_INVENTORY_TASK);
            for (PipelineTask pipelineTask : ((TransmissionJobItemContext) t).getInventoryTasks()) {
                if (!(pipelineTask.getTaskProgress().getPosition() instanceof IngestFinishedPosition)) {
                    linkedList.addAll(pipelineTask.start());
                }
            }
        }
        if (linkedList.isEmpty()) {
            return;
        }
        ExecuteEngine.trigger(linkedList, buildExecuteCallback("inventory", collection.iterator().next()));
    }

    private void executeIncrementalTasks(PipelineJobType pipelineJobType, Collection<T> collection) {
        LinkedList linkedList = new LinkedList();
        for (T t : collection) {
            if (JobStatus.EXECUTE_INCREMENTAL_TASK == t.getStatus()) {
                log.info("job status already EXECUTE_INCREMENTAL_TASK, ignore");
                return;
            }
            updateJobItemStatus(t, pipelineJobType, JobStatus.EXECUTE_INCREMENTAL_TASK);
            for (PipelineTask pipelineTask : ((TransmissionJobItemContext) t).getIncrementalTasks()) {
                if (!(pipelineTask.getTaskProgress().getPosition() instanceof IngestFinishedPosition)) {
                    linkedList.addAll(pipelineTask.start());
                }
            }
        }
        ExecuteEngine.trigger(linkedList, buildExecuteCallback("incremental", collection.iterator().next()));
    }

    private void updateJobItemStatus(T t, PipelineJobType pipelineJobType, JobStatus jobStatus) {
        t.setStatus(jobStatus);
        new PipelineJobItemManager(pipelineJobType.getYamlJobItemProgressSwapper()).updateStatus(t.getJobId(), t.getShardingItem(), jobStatus);
    }

    protected abstract ExecuteCallback buildExecuteCallback(String str, T t);

    @Generated
    public AbstractInseparablePipelineJob(PipelineJobRunnerManager pipelineJobRunnerManager) {
        this.jobRunnerManager = pipelineJobRunnerManager;
    }

    @Override // org.apache.shardingsphere.data.pipeline.core.job.PipelineJob
    @Generated
    public PipelineJobRunnerManager getJobRunnerManager() {
        return this.jobRunnerManager;
    }
}
