package com.lucidworks.spark.example.query;

import com.lucidworks.spark.SparkApp;
import com.lucidworks.spark.rdd.SolrJavaRDD;
import com.lucidworks.spark.util.ConfigurationConstants;
import com.lucidworks.spark.util.PivotField;
import com.lucidworks.spark.util.SolrQuerySupport;
import com.lucidworks.spark.util.SolrSupport;
import java.sql.Timestamp;
import java.util.HashMap;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;

/* loaded from: input_file:com/lucidworks/spark/example/query/KMeansAnomaly.class */
public class KMeansAnomaly implements SparkApp.RDDProcessor {
    private static final String UID_FIELD = "clientip_s";
    private static final String TS_FIELD = "timestamp_tdt";

    @Override // com.lucidworks.spark.SparkApp.RDDProcessor
    public String getName() {
        return "kmeans-anomaly";
    }

    @Override // com.lucidworks.spark.SparkApp.RDDProcessor
    public Option[] getOptions() {
        OptionBuilder.withArgName("QUERY");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(false);
        OptionBuilder.withDescription("URL encoded Solr query to send to Solr");
        OptionBuilder.withArgName("SQL");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(false);
        OptionBuilder.withDescription("File containing a SQL query to execute to generate the aggregated data.");
        return new Option[]{OptionBuilder.create("query"), OptionBuilder.create("aggregationSQL")};
    }

    @Override // com.lucidworks.spark.SparkApp.RDDProcessor
    public int run(SparkConf sparkConf, CommandLine commandLine) throws Exception {
        String optionValue = commandLine.getOptionValue("zkHost", "localhost:9983");
        String optionValue2 = commandLine.getOptionValue("collection", "apache_logs");
        String optionValue3 = commandLine.getOptionValue("query", "+clientip_s:[* TO *] +timestamp_tdt:[* TO *] +bytes_s:[* TO *] +verb_s:[* TO *] +response_s:[* TO *]");
        SparkSession orCreate = SparkSession.builder().config(sparkConf).getOrCreate();
        JavaSparkContext javaSparkContext = new JavaSparkContext(orCreate.sparkContext());
        HashMap hashMap = new HashMap();
        hashMap.put("zkhost", optionValue);
        hashMap.put("collection", optionValue2);
        hashMap.put("query", optionValue3);
        hashMap.put(ConfigurationConstants.SOLR_SPLIT_FIELD_PARAM(), "_version_");
        hashMap.put(ConfigurationConstants.SOLR_SPLITS_PER_SHARD_PARAM(), "4");
        hashMap.put(ConfigurationConstants.SOLR_FIELD_PARAM(), "id,_version_,clientip_s,timestamp_tdt,bytes_s,response_s,verb_s");
        SolrQuerySupport.withPivotFields(orCreate.read().format("solr").options(hashMap).load(), new PivotField[]{new PivotField("verb_s", "http_method_"), new PivotField("response_s", "http_code_")}, SolrJavaRDD.get(optionValue, optionValue2, javaSparkContext.sc()).m89rdd(), false).registerTempTable("logs");
        orCreate.udf().register("ts2ms", new UDF1<Timestamp, Long>() { // from class: com.lucidworks.spark.example.query.KMeansAnomaly.1
            public Long call(Timestamp timestamp) throws Exception {
                return Long.valueOf(timestamp != null ? timestamp.getTime() : 0L);
            }
        }, DataTypes.LongType);
        orCreate.sql("SELECT *, sum(IF(diff_ms > 30000, 1, 0)) OVER (PARTITION BY clientip_s ORDER BY timestamp_tdt) session_id FROM (SELECT *, ts2ms(" + TS_FIELD + ") - lag(ts2ms(" + TS_FIELD + ")) OVER (PARTITION BY clientip_s ORDER BY timestamp_tdt) as diff_ms FROM logs) tmp").createOrReplaceTempView("sessions");
        orCreate.udf().register("asInt", new UDF1<String, Integer>() { // from class: com.lucidworks.spark.example.query.KMeansAnomaly.2
            public Integer call(String str) throws Exception {
                return Integer.valueOf(str != null ? new Integer(str).intValue() : 0);
            }
        }, DataTypes.IntegerType);
        Dataset sql = orCreate.sql("SELECT   concat_ws('||', clientip_s,session_id) as id,          first(clientip_s) as clientip_s,          min(timestamp_tdt) as session_start_tdt,          max(timestamp_tdt) as session_end_tdt,          (ts2ms(max(timestamp_tdt)) - ts2ms(min(timestamp_tdt))) as session_len_ms_l,          sum(asInt(bytes_s)) as total_bytes_l,          count(*) as total_requests_l,          sum(http_method_get) as num_get_l,          sum(http_method_head) as num_head_l,          sum(http_method_post) as num_post_l    FROM sessions GROUP BY clientip_s,session_id");
        sql.cache();
        sql.printSchema();
        String str = optionValue2 + "_aggr";
        HashMap hashMap2 = new HashMap();
        hashMap2.put("zkhost", optionValue);
        hashMap2.put("collection", str);
        sql.write().format("solr").options(hashMap2).mode(SaveMode.Overwrite).save();
        SolrSupport.getCachedCloudClient(optionValue).commit(str);
        JavaRDD map = sql.javaRDD().map(new Function<Row, Vector>() { // from class: com.lucidworks.spark.example.query.KMeansAnomaly.3
            public Vector call(Row row) throws Exception {
                return Vectors.dense(new double[]{row.getLong(row.fieldIndex("session_len_ms_l")), row.getLong(row.fieldIndex("total_bytes_l")), row.getLong(row.fieldIndex("num_get_l")), row.getLong(row.fieldIndex("num_head_l"))});
            }
        });
        map.cache();
        System.out.println("Within Set Sum of Squared Errors = " + KMeans.train(map.rdd(), 8, 20).computeCost(map.rdd()));
        javaSparkContext.stop();
        return 0;
    }
}
