package de.viadee.bpmnai.core.runner.impl;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.ParameterException;
import de.viadee.bpmnai.core.processing.steps.PipelineStep;
import de.viadee.bpmnai.core.processing.steps.dataprocessing.AddReducedColumnsToDatasetStep;
import de.viadee.bpmnai.core.processing.steps.dataprocessing.AddVariableColumnsStep;
import de.viadee.bpmnai.core.processing.steps.dataprocessing.AggregateActivityInstancesStep;
import de.viadee.bpmnai.core.processing.steps.dataprocessing.AggregateProcessInstancesStep;
import de.viadee.bpmnai.core.processing.steps.dataprocessing.ColumnHashStep;
import de.viadee.bpmnai.core.processing.steps.dataprocessing.ColumnRemoveStep;
import de.viadee.bpmnai.core.processing.steps.dataprocessing.CreateColumnsFromJsonStep;
import de.viadee.bpmnai.core.processing.steps.dataprocessing.DataFilterStep;
import de.viadee.bpmnai.core.processing.steps.dataprocessing.DetermineProcessVariablesStep;
import de.viadee.bpmnai.core.processing.steps.dataprocessing.FillActivityInstancesHistoryStep;
import de.viadee.bpmnai.core.processing.steps.dataprocessing.ReduceColumnsStep;
import de.viadee.bpmnai.core.processing.steps.dataprocessing.TypeCastStep;
import de.viadee.bpmnai.core.processing.steps.output.WriteToDiscStep;
import de.viadee.bpmnai.core.runner.SparkRunner;
import de.viadee.bpmnai.core.runner.config.SparkRunnerConfig;
import de.viadee.bpmnai.core.util.BpmnaiUtils;
import de.viadee.bpmnai.core.util.BpmnaiVariables;
import de.viadee.bpmnai.core.util.arguments.KafkaProcessingArguments;
import de.viadee.bpmnai.core.util.logging.BpmnaiLogger;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/viadee/bpmnai/core/runner/impl/KafkaProcessingRunner.class */
public class KafkaProcessingRunner extends SparkRunner {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaProcessingRunner.class);

    public KafkaProcessingRunner() {
    }

    public KafkaProcessingRunner(SparkRunnerConfig sparkRunnerConfig) {
        super(sparkRunnerConfig);
    }

    @Override // de.viadee.bpmnai.core.runner.SparkRunner
    protected void initialize(String[] strArr) {
        KafkaProcessingArguments kafkaProcessingArguments = KafkaProcessingArguments.getInstance();
        JCommander build = JCommander.newBuilder().addObject(KafkaProcessingArguments.getInstance()).build();
        try {
            build.parse(strArr);
        } catch (ParameterException e) {
            LOG.error("Parsing of parameters failed. Error message: " + e.getMessage());
            build.usage();
            System.exit(1);
        }
        kafkaProcessingArguments.createOrUpdateSparkRunnerConfig(this.sparkRunnerConfig);
        FileUtils.deleteQuietly(new File(this.sparkRunnerConfig.getTargetFolder()));
        BpmnaiLogger.getInstance().writeInfo("Starting data processing with data from: " + this.sparkRunnerConfig.getSourceFolder());
    }

    @Override // de.viadee.bpmnai.core.runner.SparkRunner
    protected List<PipelineStep> buildDefaultPipeline() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new PipelineStep(new DataFilterStep(), ""));
        arrayList.add(new PipelineStep(new ColumnRemoveStep(), "DataFilterStep"));
        arrayList.add(new PipelineStep(new ReduceColumnsStep(), "ColumnRemoveStep"));
        arrayList.add(new PipelineStep(new DetermineProcessVariablesStep(), "ReduceColumnsStep"));
        arrayList.add(new PipelineStep(new AddVariableColumnsStep(), "DetermineProcessVariablesStep"));
        if (this.sparkRunnerConfig.getDataLevel().equals(BpmnaiVariables.DATA_LEVEL_PROCESS)) {
            arrayList.add(new PipelineStep(new AggregateProcessInstancesStep(), "AddVariableColumnsStep"));
        } else {
            arrayList.add(new PipelineStep(new AggregateActivityInstancesStep(), "AddVariableColumnsStep"));
        }
        arrayList.add(new PipelineStep(new CreateColumnsFromJsonStep(), this.sparkRunnerConfig.getDataLevel().equals(BpmnaiVariables.DATA_LEVEL_PROCESS) ? "AggregateProcessInstancesStep" : "AggregateActivityInstancesStep"));
        if (this.sparkRunnerConfig.getDataLevel().equals(BpmnaiVariables.DATA_LEVEL_ACTIVITY)) {
            arrayList.add(new PipelineStep(new FillActivityInstancesHistoryStep(), "CreateColumnsFromJsonStep"));
        }
        arrayList.add(new PipelineStep(new AddReducedColumnsToDatasetStep(), this.sparkRunnerConfig.getDataLevel().equals(BpmnaiVariables.DATA_LEVEL_PROCESS) ? "CreateColumnsFromJsonStep" : "FillActivityInstancesHistoryStep"));
        arrayList.add(new PipelineStep(new ColumnHashStep(), "AddReducedColumnsToDatasetStep"));
        arrayList.add(new PipelineStep(new TypeCastStep(), "ColumnHashStep"));
        arrayList.add(new PipelineStep(new WriteToDiscStep(), "TypeCastStep"));
        return arrayList;
    }

    @Override // de.viadee.bpmnai.core.runner.SparkRunner
    protected Dataset<Row> loadInitialDataset() {
        Dataset<Row> load = this.sparkSession.read().option("inferSchema", "true").load(this.sparkRunnerConfig.getSourceFolder());
        if (this.sparkRunnerConfig.isWriteStepResultsIntoFile()) {
            BpmnaiUtils.getInstance().writeDatasetToCSV(load, "initial_dataset", this.sparkRunnerConfig);
        }
        return load;
    }
}
