package com.lucidworks.spark.example.hadoop;

import com.lucidworks.spark.SparkApp;
import com.lucidworks.spark.util.SolrSupport;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.zip.ZipInputStream;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.log4j.Logger;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.common.SolrInputDocument;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.input.PortableDataStream;
import scala.Tuple2;
import scala.collection.JavaConversions;

/* loaded from: input_file:com/lucidworks/spark/example/hadoop/Logs2SolrRDDProcessor.class */
public class Logs2SolrRDDProcessor implements SparkApp.RDDProcessor {
    public static Logger log = Logger.getLogger(Logs2SolrRDDProcessor.class);

    @Override // com.lucidworks.spark.SparkApp.RDDProcessor
    public String getName() {
        return "logs2solr";
    }

    @Override // com.lucidworks.spark.SparkApp.RDDProcessor
    public Option[] getOptions() {
        OptionBuilder.withArgName("PATH");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(false);
        OptionBuilder.withDescription("HDFS path identifying the directories / files to index");
        return new Option[]{OptionBuilder.create("hdfsPath")};
    }

    @Override // com.lucidworks.spark.SparkApp.RDDProcessor
    public int run(SparkConf sparkConf, CommandLine commandLine) throws Exception {
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
        final String optionValue = commandLine.getOptionValue("zkHost", "localhost:9983");
        final String optionValue2 = commandLine.getOptionValue("collection", "collection1");
        final int parseInt = Integer.parseInt(commandLine.getOptionValue("batchSize", "1000"));
        javaSparkContext.binaryFiles(commandLine.getOptionValue("hdfsPath")).foreach(new VoidFunction<Tuple2<String, PortableDataStream>>() { // from class: com.lucidworks.spark.example.hadoop.Logs2SolrRDDProcessor.1
            public void call(Tuple2<String, PortableDataStream> tuple2) throws Exception {
                CloudSolrClient cachedCloudClient = SolrSupport.getCachedCloudClient(optionValue);
                ArrayList arrayList = new ArrayList(parseInt);
                String str = (String) tuple2._1();
                BufferedReader bufferedReader = null;
                int i = 0;
                try {
                    try {
                        bufferedReader = new BufferedReader(new InputStreamReader(openPortableDataStream((PortableDataStream) tuple2._2()), "UTF-8"));
                        while (true) {
                            String readLine = bufferedReader.readLine();
                            if (readLine == null) {
                                break;
                            }
                            i++;
                            SolrInputDocument solrInputDocument = new SolrInputDocument(new String[0]);
                            solrInputDocument.setField("id", str + ":" + i);
                            solrInputDocument.setField("path_s", str);
                            solrInputDocument.setField("line_t", readLine);
                            arrayList.add(solrInputDocument);
                            if (arrayList.size() >= parseInt) {
                                SolrSupport.sendBatchToSolr(cachedCloudClient, optionValue2, JavaConversions.collectionAsScalaIterable(arrayList));
                            }
                            if (i % 10000 == 0) {
                                Logs2SolrRDDProcessor.log.info("Sent " + i + " docs to Solr from " + str);
                            }
                        }
                        if (!arrayList.isEmpty()) {
                            SolrSupport.sendBatchToSolr(cachedCloudClient, optionValue2, JavaConversions.collectionAsScalaIterable(arrayList));
                        }
                        if (bufferedReader != null) {
                            try {
                                bufferedReader.close();
                            } catch (Exception e) {
                            }
                        }
                    } catch (Throwable th) {
                        if (bufferedReader != null) {
                            try {
                                bufferedReader.close();
                            } catch (Exception e2) {
                            }
                        }
                        throw th;
                    }
                } catch (Exception e3) {
                    Logs2SolrRDDProcessor.log.error("Failed to read '" + str + "' due to: " + e3);
                    if (bufferedReader != null) {
                        try {
                            bufferedReader.close();
                        } catch (Exception e4) {
                        }
                    }
                }
            }

            InputStream openPortableDataStream(PortableDataStream portableDataStream) throws Exception {
                ZipInputStream zipInputStream = null;
                String path = portableDataStream.getPath();
                Logs2SolrRDDProcessor.log.info("Opening InputStream to " + path);
                if (path.endsWith(".zip")) {
                    ZipInputStream zipInputStream2 = new ZipInputStream(portableDataStream.open());
                    zipInputStream2.getNextEntry();
                    zipInputStream = zipInputStream2;
                } else if (path.endsWith(".bz2")) {
                    zipInputStream = new BZip2CompressorInputStream(portableDataStream.open());
                } else if (path.endsWith(".gz")) {
                    zipInputStream = new GzipCompressorInputStream(portableDataStream.open());
                }
                return zipInputStream;
            }
        });
        return 0;
    }
}
