package de.viadee.ki.sparkimporter.runner;

import de.viadee.ki.sparkimporter.configuration.Configuration;
import de.viadee.ki.sparkimporter.configuration.preprocessing.PipelineStepConfiguration;
import de.viadee.ki.sparkimporter.configuration.preprocessing.PreprocessingConfiguration;
import de.viadee.ki.sparkimporter.configuration.preprocessing.Step;
import de.viadee.ki.sparkimporter.configuration.util.ConfigurationUtils;
import de.viadee.ki.sparkimporter.exceptions.FaultyConfigurationException;
import de.viadee.ki.sparkimporter.processing.PreprocessingRunner;
import de.viadee.ki.sparkimporter.processing.aggregation.AllButEmptyStringAggregationFunction;
import de.viadee.ki.sparkimporter.processing.aggregation.ProcessStatesAggregationFunction;
import de.viadee.ki.sparkimporter.processing.steps.PipelineManager;
import de.viadee.ki.sparkimporter.processing.steps.PipelineStep;
import de.viadee.ki.sparkimporter.processing.steps.dataprocessing.AddVariableColumnsStep;
import de.viadee.ki.sparkimporter.processing.steps.dataprocessing.CreateColumnsFromJsonStep;
import de.viadee.ki.sparkimporter.processing.steps.dataprocessing.DetermineProcessVariablesStep;
import de.viadee.ki.sparkimporter.processing.steps.dataprocessing.ReduceColumnsStep;
import de.viadee.ki.sparkimporter.util.SparkImporterLogger;
import de.viadee.ki.sparkimporter.util.SparkImporterVariables;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark_project.guava.primitives.Longs;

/* loaded from: input_file:de/viadee/ki/sparkimporter/runner/SparkRunner.class */
public abstract class SparkRunner {
    private static final Logger LOG = LoggerFactory.getLogger(SparkRunner.class);
    private Dataset<Row> dataset;
    private PipelineManager pipelineManager = null;
    protected SparkSession sparkSession = null;
    protected String dataLevel = SparkImporterVariables.DATA_LEVEL_PROCESS;
    private List<PipelineStep> pipelineSteps = new ArrayList();

    /* loaded from: input_file:de/viadee/ki/sparkimporter/runner/SparkRunner$RUNNING_MODE.class */
    public enum RUNNING_MODE {
        CSV_IMPORT_AND_PROCESSING(SparkImporterVariables.OUTPUT_FORMAT_CSV),
        KAFKA_IMPORT("kafka_import"),
        KAFKA_PROCESSING("kafka_process");

        private String runnerMode;

        RUNNING_MODE(String str) {
            this.runnerMode = str;
        }

        public String getModeString() {
            return this.runnerMode;
        }
    }

    protected abstract void initialize(String[] strArr);

    protected abstract List<PipelineStep> buildDefaultPipeline();

    protected List<PipelineStep> buildMinimalPipeline() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new PipelineStep(new ReduceColumnsStep(), ""));
        arrayList.add(new PipelineStep(new DetermineProcessVariablesStep(), "ReduceColumnsStep"));
        arrayList.add(new PipelineStep(new AddVariableColumnsStep(), "DetermineProcessVariablesStep"));
        arrayList.add(new PipelineStep(new CreateColumnsFromJsonStep(), "AddVariableColumnsStep"));
        return arrayList;
    }

    protected abstract Dataset<Row> loadInitialDataset();

    private void checkConfig() {
        if (ConfigurationUtils.getInstance().getConfiguration(true) != null && !ConfigurationUtils.getInstance().getConfiguration(true).isEmpty()) {
            SparkImporterLogger.getInstance().writeInfo("Configuration file found: " + SparkImporterVariables.getWorkingDirectory() + "/" + ConfigurationUtils.getInstance().getConfigurationFileName());
            return;
        }
        if (!SparkImporterVariables.getRunningMode().equals(RUNNING_MODE.KAFKA_IMPORT)) {
            PreprocessingRunner.minimalPipelineToBeBuild = true;
        }
        PreprocessingRunner.initialConfigToBeWritten = true;
        ConfigurationUtils.getInstance().createEmptyConfig();
    }

    private void writeConfig() {
        if (PreprocessingRunner.initialConfigToBeWritten) {
            ConfigurationUtils.getInstance().writeConfigurationToFile();
        }
    }

    private void registerUDFs() {
        this.sparkSession.udf().register("AllButEmptyString", new AllButEmptyStringAggregationFunction());
        this.sparkSession.udf().register("ProcessState", new ProcessStatesAggregationFunction());
        this.sparkSession.udf().register("isALong", obj -> {
            if (obj instanceof Long) {
                return true;
            }
            return (obj instanceof String) && Longs.tryParse((String) obj) != null;
        }, DataTypes.BooleanType);
        this.sparkSession.udf().register("timestampStringToLong", obj2 -> {
            if (!(obj2 instanceof String) || Longs.tryParse((String) obj2) == null) {
                return null;
            }
            return Long.valueOf(Longs.tryParse((String) obj2).longValue() / 1000);
        }, DataTypes.LongType);
    }

    public void run(String[] strArr) throws FaultyConfigurationException {
        this.sparkSession = SparkSession.builder().getOrCreate();
        LOG.info("Spark application '" + this.sparkSession.sparkContext().appName() + "' (ID: " + this.sparkSession.sparkContext().applicationId() + ") started.");
        this.sparkSession.sparkContext().addSparkListener(new SparkListener() { // from class: de.viadee.ki.sparkimporter.runner.SparkRunner.1
            public void onJobEnd(SparkListenerJobEnd sparkListenerJobEnd) {
                super.onJobEnd(sparkListenerJobEnd);
                SparkRunner.LOG.info("... job " + sparkListenerJobEnd.jobId() + " finished.");
            }

            public void onJobStart(SparkListenerJobStart sparkListenerJobStart) {
                super.onJobStart(sparkListenerJobStart);
                SparkRunner.LOG.info("Spark job " + sparkListenerJobStart.jobId() + " started (has " + sparkListenerJobStart.stageIds().size() + " " + (sparkListenerJobStart.stageIds().size() == 1 ? "stage" : "stages") + ") ...");
            }

            public void onApplicationEnd(SparkListenerApplicationEnd sparkListenerApplicationEnd) {
                super.onApplicationEnd(sparkListenerApplicationEnd);
                SparkRunner.LOG.info("Spark application finished.");
            }
        });
        registerUDFs();
        initialize(strArr);
        checkConfig();
        configurePipelineSteps();
        this.dataset = loadInitialDataset();
        if (SparkImporterVariables.getProcessFilterDefinitionId() != null) {
            this.dataset = this.dataset.filter(this.dataset.col(SparkImporterVariables.VAR_PROCESS_DEF_ID).equalTo(SparkImporterVariables.getProcessFilterDefinitionId()));
        }
        PreprocessingRunner preprocessingRunner = new PreprocessingRunner();
        Iterator<PipelineStep> it = this.pipelineManager.getOrderedPipeline().iterator();
        while (it.hasNext()) {
            preprocessingRunner.addPreprocessorStep(it.next());
        }
        long currentTimeMillis = System.currentTimeMillis();
        preprocessingRunner.run(this.dataset, this.dataLevel);
        String str = "Job ran for " + ((System.currentTimeMillis() - currentTimeMillis) / 1000) + " seconds in total";
        LOG.info(str);
        SparkImporterLogger.getInstance().writeInfo(str);
        if (PreprocessingRunner.minimalPipelineToBeBuild) {
            LOG.info("Filling the minimal configuration pipeline with the applications default pipeline...");
            SparkImporterLogger.getInstance().writeInfo("Filling the minimal configuration pipeline with the applications default pipeline...");
            LOG.info("Execute again to process data with under the newly created configuration.");
            SparkImporterLogger.getInstance().writeInfo("Execute again to process data with under the newly created configuration.");
            overwritePipelineSteps();
        }
        this.sparkSession.close();
        writeConfig();
    }

    public void overwritePipelineSteps() {
        Configuration configuration = ConfigurationUtils.getInstance().getConfiguration();
        if (PreprocessingRunner.initialConfigToBeWritten) {
            this.pipelineSteps = buildDefaultPipeline();
            PipelineStepConfiguration pipelineStepConfiguration = configuration.getPreprocessingConfiguration().getPipelineStepConfiguration();
            ArrayList arrayList = new ArrayList();
            for (PipelineStep pipelineStep : this.pipelineSteps) {
                Step step = new Step();
                step.setClassName(pipelineStep.getClassName());
                step.setDependsOn(pipelineStep.getDependsOn());
                step.setId(pipelineStep.getId());
                step.setParameters(pipelineStep.getStepParameters());
                step.setComment("");
                step.setActive(true);
                arrayList.add(step);
            }
            pipelineStepConfiguration.setSteps(arrayList);
        }
    }

    public void configurePipelineSteps() throws FaultyConfigurationException {
        PreprocessingConfiguration preprocessingConfiguration;
        PipelineStepConfiguration pipelineStepConfiguration;
        List<Step> steps;
        Configuration configuration = ConfigurationUtils.getInstance().getConfiguration();
        if (PreprocessingRunner.initialConfigToBeWritten) {
            if (SparkImporterVariables.getRunningMode().equals(RUNNING_MODE.KAFKA_IMPORT)) {
                this.pipelineSteps = buildDefaultPipeline();
            } else {
                this.pipelineSteps = buildMinimalPipeline();
            }
            PipelineStepConfiguration pipelineStepConfiguration2 = configuration.getPreprocessingConfiguration().getPipelineStepConfiguration();
            ArrayList arrayList = new ArrayList();
            for (PipelineStep pipelineStep : this.pipelineSteps) {
                Step step = new Step();
                step.setClassName(pipelineStep.getClassName());
                step.setDependsOn(pipelineStep.getDependsOn());
                step.setId(pipelineStep.getId());
                step.setParameters(pipelineStep.getStepParameters());
                step.setComment("");
                step.setActive(true);
                arrayList.add(step);
            }
            pipelineStepConfiguration2.setSteps(arrayList);
        } else if (configuration != null && (preprocessingConfiguration = configuration.getPreprocessingConfiguration()) != null && (pipelineStepConfiguration = preprocessingConfiguration.getPipelineStepConfiguration()) != null && (steps = pipelineStepConfiguration.getSteps()) != null) {
            for (Step step2 : steps) {
                if (step2.getActive().booleanValue()) {
                    this.pipelineSteps.add(new PipelineStep(step2));
                }
            }
        }
        this.pipelineManager = new PipelineManager(this.pipelineSteps);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -215925654:
                if (implMethodName.equals("lambda$registerUDFs$45ca9450$1")) {
                    z = true;
                    break;
                }
                break;
            case -215925653:
                if (implMethodName.equals("lambda$registerUDFs$45ca9450$2")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/sql/api/java/UDF1") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("de/viadee/ki/sparkimporter/runner/SparkRunner") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Long;")) {
                    return obj2 -> {
                        if (!(obj2 instanceof String) || Longs.tryParse((String) obj2) == null) {
                            return null;
                        }
                        return Long.valueOf(Longs.tryParse((String) obj2).longValue() / 1000);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/sql/api/java/UDF1") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("de/viadee/ki/sparkimporter/runner/SparkRunner") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Boolean;")) {
                    return obj -> {
                        if (obj instanceof Long) {
                            return true;
                        }
                        return (obj instanceof String) && Longs.tryParse((String) obj) != null;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
