package it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka;

import it.agilelab.bigdata.wasp.consumers.spark.readers.SparkStructuredStreamingReader;
import it.agilelab.bigdata.wasp.consumers.spark.utils.AvroDeserializerExpression;
import it.agilelab.bigdata.wasp.consumers.spark.utils.AvroDeserializerExpression$;
import it.agilelab.bigdata.wasp.consumers.spark.utils.SparkUtils$;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.core.utils.AvroSchemaConverters$;
import it.agilelab.bigdata.wasp.core.utils.ConfigManager$;
import it.agilelab.bigdata.wasp.models.DatastoreModel;
import it.agilelab.bigdata.wasp.models.MultiTopicModel;
import it.agilelab.bigdata.wasp.models.MultiTopicModel$;
import it.agilelab.bigdata.wasp.models.StreamingReaderModel;
import it.agilelab.bigdata.wasp.models.StructuredStreamingETLModel;
import it.agilelab.bigdata.wasp.models.TopicDataTypes$;
import it.agilelab.bigdata.wasp.models.TopicModel;
import it.agilelab.bigdata.wasp.models.configuration.Handle$;
import it.agilelab.bigdata.wasp.models.configuration.Ignore$;
import it.agilelab.bigdata.wasp.models.configuration.KafkaConfigModel;
import it.agilelab.bigdata.wasp.models.configuration.ParsingMode;
import it.agilelab.bigdata.wasp.models.configuration.ParsingMode$;
import it.agilelab.bigdata.wasp.models.configuration.Strict$;
import it.agilelab.bigdata.wasp.repository.core.bl.ConfigBL$;
import it.agilelab.bigdata.wasp.repository.core.bl.TopicBL;
import it.agilelab.bigdata.wasp.spark.sql.kafka011.KafkaSparkSQLSchemas$;
import org.apache.avro.Schema;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.expressions.CaseWhen;
import org.apache.spark.sql.catalyst.expressions.CaseWhen$;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StringType$;
import scala.Array$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.reflect.ClassTag$;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.VolatileByteRef;
import scala.util.Left;
import scala.util.Right;

/* compiled from: KafkaSparkStructuredStreamingReader.scala */
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/plugins/kafka/KafkaSparkStructuredStreamingReader$.class */
public final class KafkaSparkStructuredStreamingReader$ implements SparkStructuredStreamingReader, Logging {
    public static final KafkaSparkStructuredStreamingReader$ MODULE$ = null;
    private final String KAFKA_METADATA_COL;
    private final String it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME;
    private final String DATA_TYPE_ATTRIBUTE_NAME;
    private final UserDefinedFunction isNull;
    private final WaspLogger logger;

    static {
        new KafkaSparkStructuredStreamingReader$();
    }

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger waspLogger) {
        this.logger = waspLogger;
    }

    private String KAFKA_METADATA_COL() {
        return this.KAFKA_METADATA_COL;
    }

    public String it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME() {
        return this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME;
    }

    private String DATA_TYPE_ATTRIBUTE_NAME() {
        return this.DATA_TYPE_ATTRIBUTE_NAME;
    }

    public Dataset<Row> createStructuredStream(StructuredStreamingETLModel structuredStreamingETLModel, StreamingReaderModel streamingReaderModel, SparkSession sparkSession) {
        logger().info(new KafkaSparkStructuredStreamingReader$$anonfun$createStructuredStream$1(structuredStreamingETLModel, streamingReaderModel));
        logger().info(new KafkaSparkStructuredStreamingReader$$anonfun$createStructuredStream$2(streamingReaderModel));
        Seq<TopicModel> retrieveTopicModelsRecursively = retrieveTopicModelsRecursively(ConfigBL$.MODULE$.topicBL(), streamingReaderModel.datastoreModelName());
        logger().info(new KafkaSparkStructuredStreamingReader$$anonfun$createStructuredStream$3(retrieveTopicModelsRecursively));
        KafkaConfigModel resolve = ConfigManager$.MODULE$.getKafkaConfig().resolve(((TopicModel) retrieveTopicModelsRecursively.head()).clusterAlias());
        logger().info(new KafkaSparkStructuredStreamingReader$$anonfun$createStructuredStream$4(resolve));
        if (!BoxesRunTime.unboxToBoolean(((TraversableOnce) retrieveTopicModelsRecursively.map(new KafkaSparkStructuredStreamingReader$$anonfun$3(), Seq$.MODULE$.canBuildFrom())).reduce(new KafkaSparkStructuredStreamingReader$$anonfun$4()))) {
            String s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Unable to check/create one or more topic; topics: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{retrieveTopicModelsRecursively}));
            logger().error(new KafkaSparkStructuredStreamingReader$$anonfun$createStructuredStream$9(s));
            throw new Exception(s);
        }
        Option map = streamingReaderModel.rateLimit().map(new KafkaSparkStructuredStreamingReader$$anonfun$1(SparkUtils$.MODULE$.getTriggerIntervalMs(ConfigManager$.MODULE$.getSparkStreamingConfig(), structuredStreamingETLModel))).map(new KafkaSparkStructuredStreamingReader$$anonfun$5());
        Map empty = Map$.MODULE$.empty();
        empty.$plus$plus$eq(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("subscribe"), ((TraversableOnce) retrieveTopicModelsRecursively.map(new KafkaSparkStructuredStreamingReader$$anonfun$createStructuredStream$5(), Seq$.MODULE$.canBuildFrom())).mkString(",")), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("kafka.bootstrap.servers"), ((TraversableOnce) resolve.connections().map(new KafkaSparkStructuredStreamingReader$$anonfun$createStructuredStream$6(), Seq$.MODULE$.canBuildFrom())).mkString(",")), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("kafkaConsumer.pollTimeoutMs"), BoxesRunTime.boxToInteger(resolve.ingestRateToMills()).toString())})));
        empty.$plus$plus$eq(Option$.MODULE$.option2Iterable(map));
        empty.$plus$plus$eq(((TraversableOnce) resolve.others().map(new KafkaSparkStructuredStreamingReader$$anonfun$createStructuredStream$7(), Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()));
        empty.$plus$plus$eq(streamingReaderModel.options());
        logger().info(new KafkaSparkStructuredStreamingReader$$anonfun$createStructuredStream$8(empty));
        return parseDF(retrieveTopicModelsRecursively, sparkSession.readStream().format("kafka").options(empty).load().withColumn(it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME(), functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME())), (ParsingMode) ParsingMode$.MODULE$.fromString((String) streamingReaderModel.options().getOrElse("parsingMode", new KafkaSparkStructuredStreamingReader$$anonfun$6())).getOrElse(new KafkaSparkStructuredStreamingReader$$anonfun$7()));
    }

    private Column selectMetadata(Column column) {
        return functions$.MODULE$.struct(((List) ((TraversableOnce) ((TraversableLike) KafkaSparkSQLSchemas$.MODULE$.INPUT_SCHEMA().map(new KafkaSparkStructuredStreamingReader$$anonfun$8(), Seq$.MODULE$.canBuildFrom())).filter(new KafkaSparkStructuredStreamingReader$$anonfun$9())).toList().map(new KafkaSparkStructuredStreamingReader$$anonfun$10(), List$.MODULE$.canBuildFrom())).$colon$colon(column)).as(KAFKA_METADATA_COL());
    }

    private Column selectMetadata$default$1() {
        return functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.KEY_ATTRIBUTE_NAME());
    }

    private Dataset<Row> parseDF(Seq<TopicModel> seq, Dataset<Row> dataset, ParsingMode parsingMode) {
        Dataset<Row> selectForOneSchema;
        Left areTopicsHealthy = MultiTopicModel$.MODULE$.areTopicsHealthy(seq);
        if (areTopicsHealthy instanceof Left) {
            throw new IllegalArgumentException((String) areTopicsHealthy.a());
        }
        if (!(areTopicsHealthy instanceof Right)) {
            throw new MatchError(areTopicsHealthy);
        }
        Left areTopicsEqualForReading = TopicModelUtils$.MODULE$.areTopicsEqualForReading(seq);
        if (areTopicsEqualForReading instanceof Left) {
            String str = (String) areTopicsEqualForReading.a();
            Left left = TopicModelUtils$.MODULE$.topicsShareKeySchema(seq);
            if (left instanceof Left) {
                throw new IllegalArgumentException((String) left.a());
            }
            if (!(left instanceof Right)) {
                throw new MatchError(left);
            }
            logger().debug(new KafkaSparkStructuredStreamingReader$$anonfun$parseDF$1(str));
            selectForOneSchema = selectForMultipleSchema(seq, dataset, parsingMode);
        } else {
            if (!(areTopicsEqualForReading instanceof Right)) {
                throw new MatchError(areTopicsEqualForReading);
            }
            selectForOneSchema = selectForOneSchema((TopicModel) seq.head(), dataset, parsingMode);
        }
        return selectForOneSchema;
    }

    public Dataset<Row> selectForOneSchema(TopicModel topicModel, Dataset<Row> dataset, ParsingMode parsingMode) {
        Dataset<Row> select;
        String str = topicModel.topicDataType();
        Handle$ handle$ = Handle$.MODULE$;
        if (parsingMode != null ? parsingMode.equals(handle$) : handle$ == null) {
            if (!Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{TopicDataTypes$.MODULE$.JSON(), TopicDataTypes$.MODULE$.AVRO()})).contains(str)) {
                logger().warn(new KafkaSparkStructuredStreamingReader$$anonfun$selectForOneSchema$1(str));
            }
        }
        String AVRO = TopicDataTypes$.MODULE$.AVRO();
        if (AVRO != null ? !AVRO.equals(str) : str != null) {
            String JSON = TopicDataTypes$.MODULE$.JSON();
            if (JSON != null ? !JSON.equals(str) : str != null) {
                String PLAINTEXT = TopicDataTypes$.MODULE$.PLAINTEXT();
                if (PLAINTEXT != null ? !PLAINTEXT.equals(str) : str != null) {
                    String BINARY = TopicDataTypes$.MODULE$.BINARY();
                    if (BINARY != null ? !BINARY.equals(str) : str != null) {
                        throw new UnsupportedOperationException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Unsupported topic data type \"", "\""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{topicModel.topicDataType()})));
                    }
                    select = dataset.select(Predef$.MODULE$.wrapRefArray(new Column[]{selectMetadata(selectMetadata$default$1()), functions$.MODULE$.expr(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME())}));
                } else {
                    select = dataset.withColumn("value_string", it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parseString()).select(Predef$.MODULE$.wrapRefArray(new Column[]{selectMetadata(selectMetadata$default$1()), functions$.MODULE$.expr("value_string AS value")}));
                }
            } else {
                select = checkParsingMode(dataset.withColumn(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME(), functions$.MODULE$.from_json(it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parseString(), getDataType(topicModel.getJsonSchema()))), parsingMode, TopicDataTypes$.MODULE$.JSON(), checkParsingMode$default$4());
            }
        } else {
            select = checkParsingMode(dataset.withColumn(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME(), it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parseAvroValue(topicModel)), parsingMode, TopicDataTypes$.MODULE$.AVRO(), parseKey(topicModel));
        }
        Dataset<Row> dataset2 = select;
        logger().debug(new KafkaSparkStructuredStreamingReader$$anonfun$selectForOneSchema$2(dataset2));
        return dataset2;
    }

    public Dataset<Row> checkParsingMode(Dataset<Row> dataset, ParsingMode parsingMode, String str, Column column) {
        Dataset<Row> select;
        if (Strict$.MODULE$.equals(parsingMode)) {
            select = dataset.withColumn("computedValue", functions$.MODULE$.when(functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME()).isNull().$bar$bar(isNull().apply(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME())}))), strictExceptionLauncherUdf().apply(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col(it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME()), functions$.MODULE$.lit(str)}))).otherwise(functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME()))).select(Predef$.MODULE$.wrapRefArray(new Column[]{selectMetadata(column), functions$.MODULE$.col(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", ".*"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{"computedValue"})))}));
        } else if (Ignore$.MODULE$.equals(parsingMode)) {
            select = dataset.select(Predef$.MODULE$.wrapRefArray(new Column[]{selectMetadata(column), functions$.MODULE$.col(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", ".*"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME()})))})).where(functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME()).isNotNull().$amp$amp(isNull().apply(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME())})).unary_$bang()));
        } else {
            if (!Handle$.MODULE$.equals(parsingMode)) {
                throw new MatchError(parsingMode);
            }
            select = dataset.select(Predef$.MODULE$.wrapRefArray(new Column[]{selectMetadata(column), functions$.MODULE$.when(functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME()).isNull().$bar$bar(isNull().apply(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME())}))), functions$.MODULE$.col(it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME())).otherwise((Object) null).as(it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME()), functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME())}));
        }
        return select;
    }

    public Column checkParsingMode$default$4() {
        return functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.KEY_ATTRIBUTE_NAME());
    }

    public UserDefinedFunction isNull() {
        return this.isNull;
    }

    public UserDefinedFunction strictExceptionLauncherUdf() {
        functions$ functions_ = functions$.MODULE$;
        KafkaSparkStructuredStreamingReader$$anonfun$strictExceptionLauncherUdf$1 kafkaSparkStructuredStreamingReader$$anonfun$strictExceptionLauncherUdf$1 = new KafkaSparkStructuredStreamingReader$$anonfun$strictExceptionLauncherUdf$1();
        TypeTags.TypeTag Nothing = package$.MODULE$.universe().TypeTag().Nothing();
        TypeTags universe = package$.MODULE$.universe();
        TypeTags.TypeTag apply = universe.TypeTag().apply(package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator() { // from class: it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaSparkStructuredStreamingReader$$typecreator1$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe2 = mirror.universe();
                return universe2.internal().reificationSupport().TypeRef(universe2.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticClass("scala.Array"), List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Types.TypeApi[]{mirror.staticClass("scala.Byte").asType().toTypeConstructor()})));
            }
        });
        TypeTags universe2 = package$.MODULE$.universe();
        return functions_.udf(kafkaSparkStructuredStreamingReader$$anonfun$strictExceptionLauncherUdf$1, Nothing, apply, universe2.TypeTag().apply(package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator() { // from class: it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaSparkStructuredStreamingReader$$typecreator2$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe3 = mirror.universe();
                return universe3.internal().reificationSupport().TypeRef(universe3.internal().reificationSupport().SingleType(universe3.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticModule("scala.Predef")), universe3.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$);
            }
        }));
    }

    public Dataset<Row> selectForMultipleSchema(Seq<TopicModel> seq, Dataset<Row> dataset, ParsingMode parsingMode) {
        Column selectMetadata;
        List reverse = ((List) seq.foldLeft(List$.MODULE$.empty(), new KafkaSparkStructuredStreamingReader$$anonfun$12())).reverse();
        logger().info(new KafkaSparkStructuredStreamingReader$$anonfun$selectForMultipleSchema$1(reverse));
        Some collectFirst = seq.collectFirst(new KafkaSparkStructuredStreamingReader$$anonfun$2());
        if (collectFirst instanceof Some) {
            selectMetadata = selectMetadata(parseKey((TopicModel) collectFirst.x()));
        } else {
            if (!None$.MODULE$.equals(collectFirst)) {
                throw new MatchError(collectFirst);
            }
            selectMetadata = selectMetadata(selectMetadata$default$1());
        }
        Dataset<Row> select = dataset.select(reverse.$colon$colon(functions$.MODULE$.col(it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME())).$colon$colon(selectMetadata));
        logger().debug(new KafkaSparkStructuredStreamingReader$$anonfun$selectForMultipleSchema$2(select));
        Dataset<Row> checkParsingModeMultipleTopics = checkParsingModeMultipleTopics(select, parsingMode);
        logger().debug(new KafkaSparkStructuredStreamingReader$$anonfun$selectForMultipleSchema$3(select));
        return checkParsingModeMultipleTopics;
    }

    public Dataset<Row> checkParsingModeMultipleTopics(Dataset<Row> dataset, ParsingMode parsingMode) {
        Dataset<Row> select;
        String[] strArr = (String[]) Predef$.MODULE$.refArrayOps(dataset.columns()).diff(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{KAFKA_METADATA_COL(), it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME()})));
        Column[] columnArr = (Column[]) Predef$.MODULE$.refArrayOps(strArr).map(new KafkaSparkStructuredStreamingReader$$anonfun$13(), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Column.class)));
        Column column = (Column) Predef$.MODULE$.refArrayOps((Object[]) Predef$.MODULE$.refArrayOps(strArr).map(new KafkaSparkStructuredStreamingReader$$anonfun$14(), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Column.class)))).reduce(new KafkaSparkStructuredStreamingReader$$anonfun$15());
        if (Strict$.MODULE$.equals(parsingMode)) {
            select = ((Dataset) Predef$.MODULE$.refArrayOps(strArr).foldLeft(dataset, new KafkaSparkStructuredStreamingReader$$anonfun$checkParsingModeMultipleTopics$1())).select(Predef$.MODULE$.wrapRefArray((Object[]) Predef$.MODULE$.refArrayOps(columnArr).$plus$colon(functions$.MODULE$.col(KAFKA_METADATA_COL()), ClassTag$.MODULE$.apply(Column.class))));
        } else if (Ignore$.MODULE$.equals(parsingMode)) {
            select = dataset.where(column).select(Predef$.MODULE$.wrapRefArray((Object[]) Predef$.MODULE$.refArrayOps(columnArr).$plus$colon(functions$.MODULE$.col(KAFKA_METADATA_COL()), ClassTag$.MODULE$.apply(Column.class))));
        } else {
            if (!Handle$.MODULE$.equals(parsingMode)) {
                throw new MatchError(parsingMode);
            }
            select = dataset.select((Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col(KAFKA_METADATA_COL()), functions$.MODULE$.when(column.unary_$bang(), functions$.MODULE$.col(it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME())).otherwise((Object) null).as(it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME())})).$plus$plus(Predef$.MODULE$.refArrayOps(columnArr), Seq$.MODULE$.canBuildFrom()));
        }
        return select;
    }

    public Column it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parseBinary() {
        return functions$.MODULE$.col(it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME());
    }

    public Column it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parseString() {
        return functions$.MODULE$.col(it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME()).cast(StringType$.MODULE$);
    }

    public Column it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parseJson(TopicModel topicModel) {
        return functions$.MODULE$.from_json(it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parseString(), getDataType(topicModel.getJsonSchema()));
    }

    private Column parseKey(TopicModel topicModel) {
        Column col;
        VolatileByteRef create = VolatileByteRef.create((byte) 0);
        Some keySchema = topicModel.keySchema();
        if (keySchema instanceof Some) {
            String str = (String) keySchema.x();
            ObjectRef zero = ObjectRef.zero();
            logger().debug(new KafkaSparkStructuredStreamingReader$$anonfun$parseKey$1(str));
            Some some = topicModel.useAvroSchemaManager() ? new Some(ConfigManager$.MODULE$.getAvroSchemaManagerConfig()) : None$.MODULE$;
            col = new Column(new AvroDeserializerExpression(functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.KEY_ATTRIBUTE_NAME()).expr(), (String) (str.isEmpty() ? avroSchemaManager$1(some, zero, create).flatMap(new KafkaSparkStructuredStreamingReader$$anonfun$16(topicModel)) : new Some(str)).get(), some, true, AvroDeserializerExpression$.MODULE$.apply$default$5()));
        } else {
            if (!None$.MODULE$.equals(keySchema)) {
                throw new MatchError(keySchema);
            }
            col = functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.KEY_ATTRIBUTE_NAME());
        }
        return col;
    }

    public Column it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parseAvroValue(TopicModel topicModel) {
        ObjectRef zero = ObjectRef.zero();
        VolatileByteRef create = VolatileByteRef.create((byte) 0);
        logger().debug(new KafkaSparkStructuredStreamingReader$$anonfun$it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parseAvroValue$1(topicModel));
        Some some = topicModel.useAvroSchemaManager() ? new Some(ConfigManager$.MODULE$.getAvroSchemaManagerConfig()) : None$.MODULE$;
        return new Column(new AvroDeserializerExpression(functions$.MODULE$.col(it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME()).expr(), (String) (topicModel.schema().isEmpty() ? avroSchemaManager$2(some, zero, create).flatMap(new KafkaSparkStructuredStreamingReader$$anonfun$18(topicModel)) : new Some(topicModel.getJsonSchema())).get(), some, true, AvroDeserializerExpression$.MODULE$.apply$default$5()));
    }

    private Seq<TopicModel> retrieveTopicModelsRecursively(TopicBL topicBL, String str) {
        return it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$innerRetrieveTopicModelsRecursively$1(str, topicBL);
    }

    private DataType getDataType(String str) {
        return AvroSchemaConverters$.MODULE$.toSqlType(new Schema.Parser().parse(str)).dataType();
    }

    public Column it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parseIfMyTopicOrNull(String str, Column column, String str2) {
        return when$1(functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.TOPIC_ATTRIBUTE_NAME()).equalTo(str), functions$.MODULE$.struct(Predef$.MODULE$.wrapRefArray(new Column[]{column.as(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME()), functions$.MODULE$.lit(str2).as(DATA_TYPE_ATTRIBUTE_NAME())}))).otherwise((Object) null).as(TopicModelUtils$.MODULE$.topicNameToColumnName(str));
    }

    public final Column it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parsedValueCol$1(String str) {
        return functions$.MODULE$.col(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", ".", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str, KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME()})));
    }

    public final Column it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parsedDataTypeCol$1(String str) {
        return functions$.MODULE$.col(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", ".", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str, DATA_TYPE_ATTRIBUTE_NAME()})));
    }

    public final Column it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$dataTypeToCheckCondition$1(String str) {
        return functions$.MODULE$.col(str).isNull().$bar$bar(functions$.MODULE$.lower(it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$parsedDataTypeCol$1(str)).isin(Predef$.MODULE$.genericWrapArray(new Object[]{TopicDataTypes$.MODULE$.JSON(), TopicDataTypes$.MODULE$.AVRO()})));
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v7 */
    private final Option avroSchemaManager$lzycompute$1(Option option, ObjectRef objectRef, VolatileByteRef volatileByteRef) {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (volatileByteRef.elem & 1)) == 0) {
                objectRef.elem = option.map(new KafkaSparkStructuredStreamingReader$$anonfun$avroSchemaManager$lzycompute$1$1());
                volatileByteRef.elem = (byte) (volatileByteRef.elem | 1);
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return (Option) objectRef.elem;
        }
    }

    private final Option avroSchemaManager$1(Option option, ObjectRef objectRef, VolatileByteRef volatileByteRef) {
        return ((byte) (volatileByteRef.elem & 1)) == 0 ? avroSchemaManager$lzycompute$1(option, objectRef, volatileByteRef) : (Option) objectRef.elem;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v7 */
    private final Option avroSchemaManager$lzycompute$2(Option option, ObjectRef objectRef, VolatileByteRef volatileByteRef) {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (volatileByteRef.elem & 1)) == 0) {
                objectRef.elem = option.map(new KafkaSparkStructuredStreamingReader$$anonfun$avroSchemaManager$lzycompute$2$1());
                volatileByteRef.elem = (byte) (volatileByteRef.elem | 1);
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return (Option) objectRef.elem;
        }
    }

    private final Option avroSchemaManager$2(Option option, ObjectRef objectRef, VolatileByteRef volatileByteRef) {
        return ((byte) (volatileByteRef.elem & 1)) == 0 ? avroSchemaManager$lzycompute$2(option, objectRef, volatileByteRef) : (Option) objectRef.elem;
    }

    public final Seq it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$innerRetrieveTopicModelsRecursively$1(String str, TopicBL topicBL) {
        Seq seq;
        TopicModel topicModel = (DatastoreModel) topicBL.getByName(str).getOrElse(new KafkaSparkStructuredStreamingReader$$anonfun$20(str));
        if (topicModel instanceof TopicModel) {
            seq = (Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new TopicModel[]{topicModel}));
        } else {
            if (!(topicModel instanceof MultiTopicModel)) {
                throw new MatchError(topicModel);
            }
            seq = (Seq) ((MultiTopicModel) topicModel).topicModelNames().flatMap(new KafkaSparkStructuredStreamingReader$$anonfun$it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$innerRetrieveTopicModelsRecursively$1$1(topicBL), Seq$.MODULE$.canBuildFrom());
        }
        return seq;
    }

    private final Column when$1(Column column, Column column2) {
        return new Column(new CaseWhen(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2(column.expr(), column2.expr())})), CaseWhen$.MODULE$.apply$default$2()));
    }

    private KafkaSparkStructuredStreamingReader$() {
        MODULE$ = this;
        Logging.class.$init$(this);
        this.KAFKA_METADATA_COL = "kafkaMetadata";
        this.it$agilelab$bigdata$wasp$consumers$spark$plugins$kafka$KafkaSparkStructuredStreamingReader$$RAW_VALUE_ATTRIBUTE_NAME = "raw";
        this.DATA_TYPE_ATTRIBUTE_NAME = "dataType";
        this.isNull = functions$.MODULE$.udf(new KafkaSparkStructuredStreamingReader$$anonfun$11(), package$.MODULE$.universe().TypeTag().Boolean(), package$.MODULE$.universe().TypeTag().Any());
    }
}
