package de.viadee.bpmnai.core.processing.steps.dataprocessing;

import de.viadee.bpmnai.core.annotation.PreprocessingStepDescription;
import de.viadee.bpmnai.core.processing.interfaces.PreprocessingStepInterface;
import de.viadee.bpmnai.core.runner.config.SparkRunnerConfig;
import de.viadee.bpmnai.core.util.BpmnaiUtils;
import de.viadee.bpmnai.core.util.BpmnaiVariables;
import de.viadee.bpmnai.core.util.logging.BpmnaiLogger;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions;

@PreprocessingStepDescription(name = "Aggregate process instances", description = "In this step the data is aggregated in a way so that there is only one line per process instance in the dataset. In this step the process state for each process instance is also aggregated to the last state the process instance had in the underlying dataset.")
/* loaded from: input_file:de/viadee/bpmnai/core/processing/steps/dataprocessing/AggregateProcessInstancesStep.class */
public class AggregateProcessInstancesStep implements PreprocessingStepInterface {
    @Override // de.viadee.bpmnai.core.processing.interfaces.PreprocessingStepInterface
    public Dataset<Row> runPreprocessingStep(Dataset<Row> dataset, Map<String, Object> map, SparkRunnerConfig sparkRunnerConfig) {
        HashMap hashMap = new HashMap();
        for (String str : dataset.columns()) {
            if (!str.equals(BpmnaiVariables.VAR_PROCESS_INSTANCE_ID)) {
                if (str.equals(BpmnaiVariables.VAR_DURATION) || str.endsWith("_rev")) {
                    hashMap.put(str, "max");
                } else if (str.equals(BpmnaiVariables.VAR_STATE)) {
                    hashMap.put(str, "ProcessState");
                } else {
                    hashMap.put(str, "AllButEmptyString");
                }
            }
        }
        Column not = functions.not(functions.isnull(dataset.col(BpmnaiVariables.VAR_STATE)));
        if (sparkRunnerConfig.isDevProcessStateColumnWorkaroundEnabled() && sparkRunnerConfig.getDataLevel().equals(BpmnaiVariables.DATA_LEVEL_PROCESS)) {
            not = functions.isnull(dataset.col(BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_NAME));
        }
        Dataset agg = dataset.filter(not).groupBy(BpmnaiVariables.VAR_PROCESS_INSTANCE_ID, new String[0]).agg(hashMap);
        Pattern compile = Pattern.compile("(max|allbutemptystring|processstate)\\((.+)\\)");
        for (String str2 : dataset.columns()) {
            Matcher matcher = compile.matcher(str2);
            if (matcher.find()) {
                dataset = dataset.withColumnRenamed(str2, matcher.group(2));
            }
        }
        Column isnull = functions.isnull(dataset.col(BpmnaiVariables.VAR_STATE));
        if (sparkRunnerConfig.isDevProcessStateColumnWorkaroundEnabled() && sparkRunnerConfig.getDataLevel().equals(BpmnaiVariables.DATA_LEVEL_PROCESS)) {
            isnull = functions.not(functions.isnull(dataset.col(BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_NAME)));
        }
        Dataset union = dataset.filter(isnull).groupBy(BpmnaiVariables.VAR_PROCESS_INSTANCE_ID, new String[0]).agg(hashMap).union(agg);
        for (String str3 : union.columns()) {
            Matcher matcher2 = compile.matcher(str3);
            if (matcher2.find()) {
                union = union.withColumnRenamed(str3, matcher2.group(2));
            }
        }
        Dataset<Row> drop = union.drop(BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_NAME).drop(BpmnaiVariables.VAR_ACT_INST_ID).drop(BpmnaiVariables.VAR_DATA_SOURCE);
        drop.cache();
        BpmnaiLogger.getInstance().writeInfo("Found " + drop.count() + " process instances.");
        if (sparkRunnerConfig.isWriteStepResultsIntoFile()) {
            BpmnaiUtils.getInstance().writeDatasetToCSV(drop, "agg_of_process_instances", sparkRunnerConfig);
        }
        return drop;
    }
}
