package de.viadee.bpmnai.core.processing.steps.dataprocessing;

import de.viadee.bpmnai.core.annotation.PreprocessingStepDescription;
import de.viadee.bpmnai.core.processing.interfaces.PreprocessingStepInterface;
import de.viadee.bpmnai.core.runner.config.SparkRunnerConfig;
import de.viadee.bpmnai.core.util.BpmnaiUtils;
import de.viadee.bpmnai.core.util.BpmnaiVariables;
import de.viadee.bpmnai.core.util.helper.SparkBroadcastHelper;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions;

@PreprocessingStepDescription(name = "Add variable columns", description = "In this step all process variables detected in prior steps are added as separate columns to the dataset.")
/* loaded from: input_file:de/viadee/bpmnai/core/processing/steps/dataprocessing/AddVariableColumnsStep.class */
public class AddVariableColumnsStep implements PreprocessingStepInterface {
    @Override // de.viadee.bpmnai.core.processing.interfaces.PreprocessingStepInterface
    public Dataset<Row> runPreprocessingStep(Dataset<Row> dataset, Map<String, Object> map, SparkRunnerConfig sparkRunnerConfig) {
        return doAddVariableColumns(doVariableUpdatesAggregation(dataset, sparkRunnerConfig.isWriteStepResultsIntoFile(), sparkRunnerConfig.getDataLevel(), sparkRunnerConfig), sparkRunnerConfig.isWriteStepResultsIntoFile(), sparkRunnerConfig.getDataLevel(), sparkRunnerConfig);
    }

    private Dataset<Row> doVariableUpdatesAggregation(Dataset<Row> dataset, boolean z, String str, SparkRunnerConfig sparkRunnerConfig) {
        Dataset agg;
        List asList = Arrays.asList(BpmnaiVariables.VAR_START_TIME, BpmnaiVariables.VAR_END_TIME);
        HashMap hashMap = new HashMap();
        for (String str2 : dataset.columns()) {
            if (str2.endsWith("_rev")) {
                hashMap.put(str2, "max");
            } else if (asList.contains(str2)) {
                hashMap.put(str2, "first");
            } else {
                hashMap.put(str2, "AllButEmptyString");
            }
        }
        Dataset filter = dataset.filter(dataset.col(BpmnaiVariables.VAR_DATA_SOURCE).equalTo(BpmnaiVariables.EVENT_VARIABLE_UPDATE));
        if (str.equals(BpmnaiVariables.DATA_LEVEL_PROCESS)) {
            if (Arrays.asList(dataset.columns()).contains(BpmnaiVariables.VAR_TIMESTAMP)) {
                filter = filter.orderBy(new Column[]{functions.desc(BpmnaiVariables.VAR_TIMESTAMP)});
            }
            agg = filter.groupBy(BpmnaiVariables.VAR_PROCESS_INSTANCE_ID, new String[]{BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_NAME}).agg(hashMap);
        } else {
            agg = filter.groupBy(BpmnaiVariables.VAR_ACT_INST_ID, new String[]{BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_NAME}).agg(hashMap);
        }
        Dataset drop = agg.drop(BpmnaiVariables.VAR_PROCESS_INSTANCE_ID).drop(BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_NAME);
        if (str.equals(BpmnaiVariables.DATA_LEVEL_ACTIVITY)) {
            drop = drop.drop(BpmnaiVariables.VAR_ACT_INST_ID);
        }
        Pattern compile = Pattern.compile("(first|max|allbutemptystring)\\((.+)\\)");
        for (String str3 : drop.columns()) {
            Matcher matcher = compile.matcher(str3);
            if (matcher.find()) {
                drop = drop.withColumnRenamed(str3, matcher.group(2));
            }
        }
        Dataset<Row> union = str.equals(BpmnaiVariables.DATA_LEVEL_PROCESS) ? dataset.select(BpmnaiVariables.VAR_PROCESS_INSTANCE_ID, new String[]{BpmnaiVariables.VAR_STATE, BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_NAME, BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_TYPE, BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_REVISION, BpmnaiVariables.VAR_LONG, BpmnaiVariables.VAR_DOUBLE, BpmnaiVariables.VAR_TEXT, BpmnaiVariables.VAR_TEXT2, BpmnaiVariables.VAR_DATA_SOURCE}).filter(dataset.col(BpmnaiVariables.VAR_DATA_SOURCE).equalTo(BpmnaiVariables.EVENT_PROCESS_INSTANCE)).union(drop.select(BpmnaiVariables.VAR_PROCESS_INSTANCE_ID, new String[]{BpmnaiVariables.VAR_STATE, BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_NAME, BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_TYPE, BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_REVISION, BpmnaiVariables.VAR_LONG, BpmnaiVariables.VAR_DOUBLE, BpmnaiVariables.VAR_TEXT, BpmnaiVariables.VAR_TEXT2, BpmnaiVariables.VAR_DATA_SOURCE})) : dataset.select(BpmnaiVariables.VAR_PROCESS_INSTANCE_ID, new String[]{BpmnaiVariables.VAR_STATE, BpmnaiVariables.VAR_ACT_INST_ID, BpmnaiVariables.VAR_START_TIME, BpmnaiVariables.VAR_END_TIME, BpmnaiVariables.VAR_DURATION, BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_NAME, BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_TYPE, BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_REVISION, BpmnaiVariables.VAR_LONG, BpmnaiVariables.VAR_DOUBLE, BpmnaiVariables.VAR_TEXT, BpmnaiVariables.VAR_TEXT2, BpmnaiVariables.VAR_DATA_SOURCE}).filter(functions.not(functions.isnull(dataset.col(BpmnaiVariables.VAR_START_TIME)))).union(drop.select(BpmnaiVariables.VAR_PROCESS_INSTANCE_ID, new String[]{BpmnaiVariables.VAR_STATE, BpmnaiVariables.VAR_ACT_INST_ID, BpmnaiVariables.VAR_START_TIME, BpmnaiVariables.VAR_END_TIME, BpmnaiVariables.VAR_DURATION, BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_NAME, BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_TYPE, BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_REVISION, BpmnaiVariables.VAR_LONG, BpmnaiVariables.VAR_DOUBLE, BpmnaiVariables.VAR_TEXT, BpmnaiVariables.VAR_TEXT2, BpmnaiVariables.VAR_DATA_SOURCE})).orderBy(BpmnaiVariables.VAR_ACT_INST_ID, new String[]{BpmnaiVariables.VAR_START_TIME});
        if (z) {
            BpmnaiUtils.getInstance().writeDatasetToCSV(union, "agg_variable_updates", sparkRunnerConfig);
        }
        return union;
    }

    private Dataset<Row> doAddVariableColumns(Dataset<Row> dataset, boolean z, String str, SparkRunnerConfig sparkRunnerConfig) {
        for (String str2 : ((Map) SparkBroadcastHelper.getInstance().getBroadcastVariable(SparkBroadcastHelper.BROADCAST_VARIABLE.PROCESS_VARIABLES_ESCALATED)).keySet()) {
            dataset = dataset.withColumn(str2, functions.when(dataset.col(BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_NAME).equalTo(str2), functions.when(dataset.col(BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_TYPE).equalTo("string"), dataset.col(BpmnaiVariables.VAR_TEXT)).when(dataset.col(BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_TYPE).equalTo("null"), dataset.col(BpmnaiVariables.VAR_TEXT)).when(dataset.col(BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_TYPE).equalTo("boolean"), dataset.col(BpmnaiVariables.VAR_LONG)).when(dataset.col(BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_TYPE).equalTo("integer"), dataset.col(BpmnaiVariables.VAR_LONG)).when(dataset.col(BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_TYPE).equalTo("long"), dataset.col(BpmnaiVariables.VAR_LONG)).when(dataset.col(BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_TYPE).equalTo("double"), dataset.col(BpmnaiVariables.VAR_DOUBLE)).when(dataset.col(BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_TYPE).equalTo("date"), dataset.col(BpmnaiVariables.VAR_LONG)).otherwise(dataset.col(BpmnaiVariables.VAR_TEXT2))).otherwise((Object) null));
            if (str.equals(BpmnaiVariables.DATA_LEVEL_PROCESS) && sparkRunnerConfig.isRevCountEnabled()) {
                dataset = dataset.withColumn(str2 + "_rev", functions.when(dataset.col(BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_NAME).equalTo(str2), dataset.col(BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_REVISION)).otherwise("0"));
            }
        }
        Dataset<Row> drop = dataset.drop(new String[]{BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_TYPE, BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_REVISION, BpmnaiVariables.VAR_DOUBLE, BpmnaiVariables.VAR_LONG, BpmnaiVariables.VAR_TEXT, BpmnaiVariables.VAR_TEXT2});
        if (!sparkRunnerConfig.isDevProcessStateColumnWorkaroundEnabled()) {
            drop = drop.drop(BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_NAME);
        }
        if (z) {
            BpmnaiUtils.getInstance().writeDatasetToCSV(drop, "add_var_columns", sparkRunnerConfig);
        }
        return drop;
    }
}
