package it.agilelab.bigdata.wasp.consumers.spark.plugins.elastic;

import akka.actor.ActorRef;
import it.agilelab.bigdata.wasp.consumers.spark.writers.SparkLegacyStreamingWriter;
import it.agilelab.bigdata.wasp.core.WaspSystem$;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.core.utils.ElasticConfiguration;
import it.agilelab.bigdata.wasp.models.IndexModel;
import it.agilelab.bigdata.wasp.models.configuration.ConnectionConfig;
import it.agilelab.bigdata.wasp.models.configuration.ElasticConfigModel;
import it.agilelab.bigdata.wasp.repository.core.bl.IndexBL;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.dstream.DStream;
import org.elasticsearch.spark.package$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: ElasticWriters.scala */
@ScalaSignature(bytes = "\u0006\u0001Q4Aa\u0002\u0005\u00013!AA\u0007\u0001B\u0001B\u0003%Q\u0007\u0003\u0005?\u0001\t\u0005\t\u0015!\u0003@\u0011!Q\u0005A!A!\u0002\u0013Y\u0005\u0002\u0003,\u0001\u0005\u0003\u0005\u000b\u0011B,\t\u000b}\u0003A\u0011\u00011\t\u000b\u001d\u0004A\u0011\t5\u0003O\u0015c\u0017m\u001d;jGN,\u0017M]2i'B\f'o\u001b'fO\u0006\u001c\u0017p\u0015;sK\u0006l\u0017N\\4Xe&$XM\u001d\u0006\u0003\u0013)\tq!\u001a7bgRL7M\u0003\u0002\f\u0019\u00059\u0001\u000f\\;hS:\u001c(BA\u0007\u000f\u0003\u0015\u0019\b/\u0019:l\u0015\ty\u0001#A\u0005d_:\u001cX/\\3sg*\u0011\u0011CE\u0001\u0005o\u0006\u001c\bO\u0003\u0002\u0014)\u00059!-[4eCR\f'BA\u000b\u0017\u0003!\tw-\u001b7fY\u0006\u0014'\"A\f\u0002\u0005%$8\u0001A\n\u0006\u0001i\u0001cE\f\t\u00037yi\u0011\u0001\b\u0006\u0002;\u0005)1oY1mC&\u0011q\u0004\b\u0002\u0007\u0003:L(+\u001a4\u0011\u0005\u0005\"S\"\u0001\u0012\u000b\u0005\rb\u0011aB<sSR,'o]\u0005\u0003K\t\u0012!d\u00159be.dUmZ1dsN#(/Z1nS:<wK]5uKJ\u0004\"a\n\u0017\u000e\u0003!R!!\u000b\u0016\u0002\u000bU$\u0018\u000e\\:\u000b\u0005-\u0002\u0012\u0001B2pe\u0016L!!\f\u0015\u0003)\u0015c\u0017m\u001d;jG\u000e{gNZ5hkJ\fG/[8o!\ty#'D\u00011\u0015\t\t$&A\u0004m_\u001e<\u0017N\\4\n\u0005M\u0002$a\u0002'pO\u001eLgnZ\u0001\bS:$W\r\u001f\"M!\t1D(D\u00018\u0015\tA\u0014(\u0001\u0002cY*\u00111F\u000f\u0006\u0003wA\t!B]3q_NLGo\u001c:z\u0013\titGA\u0004J]\u0012,\u0007P\u0011'\u0002\u0007M\u001c8\r\u0005\u0002A\u00116\t\u0011I\u0003\u0002C\u0007\u0006I1\u000f\u001e:fC6Lgn\u001a\u0006\u0003\u001b\u0011S!!\u0012$\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u00059\u0015aA8sO&\u0011\u0011*\u0011\u0002\u0011'R\u0014X-Y7j]\u001e\u001cuN\u001c;fqR\fAA\\1nKB\u0011Aj\u0015\b\u0003\u001bF\u0003\"A\u0014\u000f\u000e\u0003=S!\u0001\u0015\r\u0002\rq\u0012xn\u001c;?\u0013\t\u0011F$\u0001\u0004Qe\u0016$WMZ\u0005\u0003)V\u0013aa\u0015;sS:<'B\u0001*\u001d\u0003E)G.Y:uS\u000e\fE-\\5o\u0003\u000e$xN\u001d\t\u00031vk\u0011!\u0017\u0006\u00035n\u000bQ!Y2u_JT\u0011\u0001X\u0001\u0005C.\\\u0017-\u0003\u0002_3\nA\u0011i\u0019;peJ+g-\u0001\u0004=S:LGO\u0010\u000b\u0006C\u000e$WM\u001a\t\u0003E\u0002i\u0011\u0001\u0003\u0005\u0006i\u0015\u0001\r!\u000e\u0005\u0006}\u0015\u0001\ra\u0010\u0005\u0006\u0015\u0016\u0001\ra\u0013\u0005\u0006-\u0016\u0001\raV\u0001\u0006oJLG/\u001a\u000b\u0003S2\u0004\"a\u00076\n\u0005-d\"\u0001B+oSRDQ!\u001c\u0004A\u00029\faa\u001d;sK\u0006l\u0007cA8s\u00176\t\u0001O\u0003\u0002r\u0003\u00069Am\u001d;sK\u0006l\u0017BA:q\u0005\u001d!5\u000b\u001e:fC6\u0004")
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/plugins/elastic/ElasticsearchSparkLegacyStreamingWriter.class */
public class ElasticsearchSparkLegacyStreamingWriter implements SparkLegacyStreamingWriter, ElasticConfiguration, Logging {
    private final IndexBL indexBL;
    private final StreamingContext ssc;
    private final String name;
    private final ActorRef elasticAdminActor;
    private final WaspLogger logger;
    private ElasticConfigModel elasticConfig;
    private volatile boolean bitmap$0;

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger waspLogger) {
        this.logger = waspLogger;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [it.agilelab.bigdata.wasp.consumers.spark.plugins.elastic.ElasticsearchSparkLegacyStreamingWriter] */
    private ElasticConfigModel elasticConfig$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                this.elasticConfig = ElasticConfiguration.elasticConfig$(this);
                r0 = this;
                r0.bitmap$0 = true;
            }
        }
        return this.elasticConfig;
    }

    public ElasticConfigModel elasticConfig() {
        return !this.bitmap$0 ? elasticConfig$lzycompute() : this.elasticConfig;
    }

    public void write(DStream<String> dStream) {
        Option byName = this.indexBL.getByName(this.name);
        if (!byName.isDefined()) {
            logger().warn(() -> {
                return new StringBuilder(59).append("The index '").append(this.name).append("' does not exits pay ATTENTION spark won't start").toString();
            });
            return;
        }
        IndexModel indexModel = (IndexModel) byName.get();
        String eventuallyTimedName = indexModel.eventuallyTimedName();
        logger().info(() -> {
            return new StringBuilder(57).append("Check or create the index model: '").append(indexModel.toString()).append(" with this index name: ").append(eventuallyTimedName).toString();
        });
        if (!BoxesRunTime.unboxToBoolean(WaspSystem$.MODULE$.$qmark$qmark(this.elasticAdminActor, new CheckOrCreateIndex(eventuallyTimedName, indexModel.name(), indexModel.dataType(), indexModel.getJsonSchema()), WaspSystem$.MODULE$.$qmark$qmark$default$3()))) {
            String sb = new StringBuilder(21).append("Error creating index ").append(indexModel).toString();
            logger().error(() -> {
                return sb;
            });
            throw new Exception(sb);
        }
        Broadcast broadcast = this.ssc.sparkContext().broadcast(indexModel.resource(), ClassTag$.MODULE$.apply(String.class));
        Broadcast broadcast2 = this.ssc.sparkContext().broadcast(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("es.nodes"), ((TraversableOnce) elasticConfig().connections().filter(connectionConfig -> {
            return BoxesRunTime.boxToBoolean($anonfun$write$2(connectionConfig));
        })).mkString(",")), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("es.input.json"), "true"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("es.batch.size.entries"), "1")})).$plus$plus(Option$.MODULE$.option2Iterable(((IndexModel) byName.get()).idField().map(str -> {
            return new Tuple2("es.mapping.id", str);
        }))), ClassTag$.MODULE$.apply(Map.class));
        logger().info(() -> {
            return new StringBuilder(82).append("Write to elastic with spark streaming. Configuration passed: options: ").append(broadcast2.value()).append(", resource: ").append(broadcast.value()).toString();
        });
        dStream.foreachRDD(rdd -> {
            $anonfun$write$7(broadcast, broadcast2, rdd);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ boolean $anonfun$write$2(ConnectionConfig connectionConfig) {
        Object orElse = connectionConfig.metadata().flatMap(map -> {
            return map.get("connectiontype");
        }).getOrElse(() -> {
            return "";
        });
        return orElse != null ? orElse.equals("rest") : "rest" == 0;
    }

    public static final /* synthetic */ void $anonfun$write$7(Broadcast broadcast, Broadcast broadcast2, RDD rdd) {
        package$.MODULE$.sparkRDDFunctions(rdd, ClassTag$.MODULE$.apply(String.class)).saveToEs((String) broadcast.value(), (scala.collection.Map) broadcast2.value());
    }

    public ElasticsearchSparkLegacyStreamingWriter(IndexBL indexBL, StreamingContext streamingContext, String str, ActorRef actorRef) {
        this.indexBL = indexBL;
        this.ssc = streamingContext;
        this.name = str;
        this.elasticAdminActor = actorRef;
        ElasticConfiguration.$init$(this);
        Logging.$init$(this);
    }
}
