package de.viadee.bpmnai.core.processing.steps.dataprocessing;

import de.viadee.bpmnai.core.annotation.PreprocessingStepDescription;
import de.viadee.bpmnai.core.processing.PreprocessingRunner;
import de.viadee.bpmnai.core.processing.interfaces.PreprocessingStepInterface;
import de.viadee.bpmnai.core.runner.config.SparkRunnerConfig;
import de.viadee.bpmnai.core.util.BpmnaiUtils;
import de.viadee.bpmnai.core.util.BpmnaiVariables;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import scala.collection.Seq;

@PreprocessingStepDescription(name = "Add reduced columns", description = "In the beginning the non relevant columns where removed to speed up the processing. These columns are now added back to the dataset by using the processInstanceId as a reference.")
/* loaded from: input_file:de/viadee/bpmnai/core/processing/steps/dataprocessing/AddReducedColumnsToDatasetStep.class */
public class AddReducedColumnsToDatasetStep implements PreprocessingStepInterface {
    @Override // de.viadee.bpmnai.core.processing.interfaces.PreprocessingStepInterface
    public Dataset<Row> runPreprocessingStep(Dataset<Row> dataset, Map<String, Object> map, SparkRunnerConfig sparkRunnerConfig) {
        List asList = Arrays.asList(dataset.columns());
        Dataset<Row> dataset2 = PreprocessingRunner.helper_datasets.get("startColumns_" + sparkRunnerConfig.getDataLevel());
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        List asList2 = Arrays.asList(BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_TYPE, BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_REVISION, BpmnaiVariables.VAR_LONG, BpmnaiVariables.VAR_DOUBLE, BpmnaiVariables.VAR_TEXT, BpmnaiVariables.VAR_TEXT2, BpmnaiVariables.VAR_TIMESTAMP, BpmnaiVariables.VAR_SEQUENCE_COUNTER, BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_INSTANCE_ID, BpmnaiVariables.VAR_DATA_SOURCE);
        if (!sparkRunnerConfig.isDevProcessStateColumnWorkaroundEnabled()) {
            asList2 = (List) Stream.concat(asList2.stream(), Stream.of(BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_NAME)).collect(Collectors.toList());
        }
        arrayList2.add(new Column(BpmnaiVariables.VAR_PROCESS_INSTANCE_ID));
        arrayList2.add(new Column(BpmnaiVariables.VAR_STATE));
        arrayList2.add(new Column(BpmnaiVariables.VAR_ACT_INST_ID));
        Iterator it = dataset2.collectAsList().iterator();
        while (it.hasNext()) {
            String string = ((Row) it.next()).getString(0);
            if (!asList.contains(string) && !asList2.contains(string)) {
                arrayList.add(string);
                arrayList2.add(new Column(string));
            }
        }
        Seq asSeq = BpmnaiUtils.getInstance().asSeq(arrayList2);
        Dataset<Row> dataset3 = PreprocessingRunner.helper_datasets.get("initial_" + sparkRunnerConfig.getDataLevel());
        HashMap hashMap = new HashMap();
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            hashMap.put((String) it2.next(), "first");
        }
        Column isNotNull = dataset3.col(BpmnaiVariables.VAR_STATE).isNotNull();
        if (sparkRunnerConfig.isDevProcessStateColumnWorkaroundEnabled() && sparkRunnerConfig.getDataLevel().equals(BpmnaiVariables.DATA_LEVEL_PROCESS)) {
            isNotNull = dataset3.col(BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_NAME).isNull();
        }
        Dataset withColumnRenamed = sparkRunnerConfig.getDataLevel().equals(BpmnaiVariables.DATA_LEVEL_PROCESS) ? dataset3.select(asSeq).filter(isNotNull).groupBy(BpmnaiVariables.VAR_PROCESS_INSTANCE_ID, new String[0]).agg(hashMap).withColumnRenamed(BpmnaiVariables.VAR_PROCESS_INSTANCE_ID, "proc_inst_id__right") : dataset3.select(asSeq).filter(dataset3.col(BpmnaiVariables.VAR_ACT_ID).isNotNull()).groupBy(BpmnaiVariables.VAR_PROCESS_INSTANCE_ID, new String[]{BpmnaiVariables.VAR_ACT_INST_ID}).agg(hashMap).withColumnRenamed(BpmnaiVariables.VAR_PROCESS_INSTANCE_ID, "proc_inst_id__right").withColumnRenamed(BpmnaiVariables.VAR_ACT_INST_ID, "act_inst_id__right");
        Pattern compile = Pattern.compile("(first)\\((.+)\\)");
        for (String str : withColumnRenamed.columns()) {
            Matcher matcher = compile.matcher(str);
            if (matcher.find()) {
                withColumnRenamed = withColumnRenamed.withColumnRenamed(str, matcher.group(2));
            }
        }
        Dataset join = sparkRunnerConfig.getDataLevel().equals(BpmnaiVariables.DATA_LEVEL_PROCESS) ? dataset.join(withColumnRenamed, dataset.col(BpmnaiVariables.VAR_PROCESS_INSTANCE_ID).equalTo(withColumnRenamed.col("proc_inst_id__right")), "left") : dataset.join(withColumnRenamed, dataset.col(BpmnaiVariables.VAR_PROCESS_INSTANCE_ID).equalTo(withColumnRenamed.col("proc_inst_id__right")).and(dataset.col(BpmnaiVariables.VAR_ACT_INST_ID).equalTo(withColumnRenamed.col("act_inst_id__right"))), "left");
        if (sparkRunnerConfig.isDevProcessStateColumnWorkaroundEnabled() && sparkRunnerConfig.getDataLevel().equals(BpmnaiVariables.DATA_LEVEL_PROCESS)) {
            join = join.drop(BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_NAME);
        }
        Dataset<Row> drop = join.drop("proc_inst_id__right");
        if (sparkRunnerConfig.getDataLevel().equals(BpmnaiVariables.DATA_LEVEL_ACTIVITY)) {
            drop = drop.drop("act_inst_id__right");
        }
        if (sparkRunnerConfig.isWriteStepResultsIntoFile()) {
            BpmnaiUtils.getInstance().writeDatasetToCSV(drop, "joined_columns", sparkRunnerConfig);
        }
        return drop;
    }
}
