package com.lucidworks.spark.query;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Random;
import org.apache.log4j.Logger;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.stream.SolrStream;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;

/* loaded from: input_file:com/lucidworks/spark/query/StreamingExpressionResultIterator.class */
public class StreamingExpressionResultIterator extends TupleStreamIterator {
    private static final Logger log = Logger.getLogger(StreamingExpressionResultIterator.class);
    protected String zkHost;
    protected String collection;
    protected String qt;
    protected CloudSolrClient cloudSolrClient;
    protected HttpSolrClient httpSolrClient;
    protected SolrClientCache solrClientCache;
    private final Random random;

    public StreamingExpressionResultIterator(CloudSolrClient cloudSolrClient, HttpSolrClient httpSolrClient, String str, SolrParams solrParams) {
        super(solrParams);
        this.random = new Random(5150L);
        this.cloudSolrClient = cloudSolrClient;
        this.httpSolrClient = httpSolrClient;
        this.collection = str;
        this.qt = solrParams.get("qt");
        if (this.qt == null) {
            this.qt = "/stream";
        }
    }

    @Override // com.lucidworks.spark.query.TupleStreamIterator
    protected TupleStream openStream() {
        ModifiableSolrParams modifiableSolrParams = new ModifiableSolrParams();
        modifiableSolrParams.set("qt", new String[]{this.qt});
        String str = this.solrParams.get("aggregationMode");
        log.info("aggregationMode=" + str + ", solrParams: " + this.solrParams);
        if (str != null) {
            modifiableSolrParams.set("aggregationMode", new String[]{str});
        } else {
            modifiableSolrParams.set("aggregationMode", new String[]{"facet"});
        }
        if ("/sql".equals(this.qt)) {
            String replaceAll = this.solrParams.get("sql").replaceAll("\\s+", " ");
            log.info("Executing SQL statement " + replaceAll + " against collection " + this.collection);
            modifiableSolrParams.set("stmt", new String[]{replaceAll});
        } else {
            String replaceAll2 = this.solrParams.get("expr").replaceAll("\\s+", " ");
            log.info("Executing streaming expression " + replaceAll2 + " against collection " + this.collection);
            modifiableSolrParams.set("expr", new String[]{replaceAll2});
        }
        try {
            String coreUrl = new ZkCoreNodeProps(getRandomReplica()).getCoreUrl();
            log.info("Sending " + this.qt + " request to replica " + coreUrl + " of " + this.collection + " with params: " + modifiableSolrParams);
            long currentTimeMillis = System.currentTimeMillis();
            SolrStream solrStream = new SolrStream(coreUrl, modifiableSolrParams);
            solrStream.setStreamContext(getStreamContext());
            solrStream.open();
            log.debug("Open stream to " + coreUrl + " took " + (System.currentTimeMillis() - currentTimeMillis) + " (ms)");
            return solrStream;
        } catch (Exception e) {
            log.error("Failed to execute request [" + this.solrParams + "] due to: " + e, e);
            if (e instanceof RuntimeException) {
                throw ((RuntimeException) e);
            }
            throw new RuntimeException(e);
        }
    }

    protected StreamContext getStreamContext() {
        StreamContext streamContext = new StreamContext();
        this.solrClientCache = new SparkSolrClientCache(this.cloudSolrClient, this.httpSolrClient);
        streamContext.setSolrClientCache(this.solrClientCache);
        return streamContext;
    }

    protected Replica getRandomReplica() {
        Collection activeSlices = this.cloudSolrClient.getZkStateReader().getClusterState().getCollection(this.collection.split(",")[0]).getActiveSlices();
        if (activeSlices == null || activeSlices.size() == 0) {
            throw new IllegalStateException("No active shards found " + this.collection);
        }
        ArrayList arrayList = new ArrayList();
        Iterator it2 = activeSlices.iterator();
        while (it2.hasNext()) {
            arrayList.addAll(((Slice) it2.next()).getReplicas());
        }
        return (Replica) arrayList.get(this.random.nextInt(arrayList.size()));
    }
}
