package de.viadee.bpmnai.core.processing.steps.userconfig;

import de.viadee.bpmnai.core.processing.interfaces.PreprocessingStepInterface;
import de.viadee.bpmnai.core.runner.config.SparkRunnerConfig;
import de.viadee.bpmnai.core.util.BpmnaiUtils;
import de.viadee.bpmnai.core.util.BpmnaiVariables;
import de.viadee.bpmnai.core.util.helper.SparkBroadcastHelper;
import de.viadee.bpmnai.core.util.logging.BpmnaiLogger;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions;

/* loaded from: input_file:de/viadee/bpmnai/core/processing/steps/userconfig/DataFilterOnActivityStep.class */
public class DataFilterOnActivityStep implements PreprocessingStepInterface {
    @Override // de.viadee.bpmnai.core.processing.interfaces.PreprocessingStepInterface
    public Dataset<Row> runPreprocessingStep(Dataset<Row> dataset, Map<String, Object> map, SparkRunnerConfig sparkRunnerConfig) {
        if (map == null || map.size() == 0) {
            BpmnaiLogger.getInstance().writeWarn("No parameters found for the DataFilterOnActivityStep");
            return dataset;
        }
        String str = (String) map.get("query");
        BpmnaiLogger.getInstance().writeInfo("Filtering data with activity instance filter query: " + str + ".");
        dataset.cache();
        Long valueOf = Long.valueOf(dataset.count());
        Dataset sortWithinPartitions = dataset.repartition(new Column[]{dataset.col(BpmnaiVariables.VAR_PROCESS_INSTANCE_ID)}).sortWithinPartitions(BpmnaiVariables.VAR_START_TIME, new String[0]);
        Dataset filter = sortWithinPartitions.filter(functions.col(BpmnaiVariables.VAR_PROCESS_INSTANCE_VARIABLE_TYPE).isNotNull());
        sortWithinPartitions.filter(sortWithinPartitions.col(BpmnaiVariables.VAR_ACT_ID).equalTo(str)).filter(sortWithinPartitions.col(BpmnaiVariables.VAR_END_TIME).isNull());
        Map map2 = (Map) sortWithinPartitions.filter(sortWithinPartitions.col(BpmnaiVariables.VAR_ACT_ID).like(str)).filter(sortWithinPartitions.col(BpmnaiVariables.VAR_END_TIME).isNull()).select(BpmnaiVariables.VAR_PROCESS_INSTANCE_ID, new String[]{BpmnaiVariables.VAR_START_TIME}).collectAsList().stream().collect(Collectors.toMap(row -> {
            return (String) row.getAs(BpmnaiVariables.VAR_PROCESS_INSTANCE_ID);
        }, row2 -> {
            return (String) row2.getAs(BpmnaiVariables.VAR_START_TIME);
        }));
        SparkBroadcastHelper.getInstance().broadcastVariable(SparkBroadcastHelper.BROADCAST_VARIABLE.PROCESS_INSTANCE_TIMESTAMP_MAP, map2);
        Dataset filter2 = sortWithinPartitions.filter(functions.col(BpmnaiVariables.VAR_PROCESS_INSTANCE_ID).isin(map2.keySet().toArray()));
        Dataset withColumnRenamed = filter2.withColumn("data_filter_on_activity", functions.callUDF("activityBeforeTimestamp", new Column[]{filter2.col(BpmnaiVariables.VAR_PROCESS_INSTANCE_ID), filter2.col(BpmnaiVariables.VAR_START_TIME)})).filter(functions.col("data_filter_on_activity").like("TRUE")).drop("data_filter_on_activity").withColumnRenamed(BpmnaiVariables.VAR_ACT_INST_ID, "act_inst_id__RIGHT");
        Dataset<Row> union = withColumnRenamed.withColumnRenamed("act_inst_id__RIGHT", BpmnaiVariables.VAR_ACT_INST_ID).union(filter.join(withColumnRenamed.select("act_inst_id__RIGHT", new String[0]).distinct(), filter.col(BpmnaiVariables.VAR_ACT_INST_ID).equalTo(withColumnRenamed.col("act_inst_id__RIGHT")), "inner").drop("act_inst_id__RIGHT"));
        union.cache();
        BpmnaiLogger.getInstance().writeInfo("DataFilterOnActivityStep: The filtered DataSet contains " + union.count() + " rows, (before: " + valueOf + " rows)");
        if (sparkRunnerConfig.isWriteStepResultsIntoFile()) {
            BpmnaiUtils.getInstance().writeDatasetToCSV(union, "data_filter_on_activity_step", sparkRunnerConfig);
        }
        return union;
    }
}
