package it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka;

import it.agilelab.bigdata.wasp.consumers.spark.readers.SparkStructuredStreamingReader;
import it.agilelab.bigdata.wasp.consumers.spark.utils.AvroDeserializerExpression;
import it.agilelab.bigdata.wasp.consumers.spark.utils.AvroDeserializerExpression$;
import it.agilelab.bigdata.wasp.consumers.spark.utils.SparkUtils$;
import it.agilelab.bigdata.wasp.core.WaspSystem$;
import it.agilelab.bigdata.wasp.core.kafka.CheckOrCreateTopic;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.core.utils.AvroSchemaConverters$;
import it.agilelab.bigdata.wasp.core.utils.ConfigManager$;
import it.agilelab.bigdata.wasp.core.utils.StringToByteArrayUtil$;
import it.agilelab.bigdata.wasp.models.DatastoreModel;
import it.agilelab.bigdata.wasp.models.MultiTopicModel;
import it.agilelab.bigdata.wasp.models.MultiTopicModel$;
import it.agilelab.bigdata.wasp.models.StreamingReaderModel;
import it.agilelab.bigdata.wasp.models.StructuredStreamingETLModel;
import it.agilelab.bigdata.wasp.models.SubjectStrategy$;
import it.agilelab.bigdata.wasp.models.TopicDataTypes$;
import it.agilelab.bigdata.wasp.models.TopicModel;
import it.agilelab.bigdata.wasp.models.configuration.Handle$;
import it.agilelab.bigdata.wasp.models.configuration.Ignore$;
import it.agilelab.bigdata.wasp.models.configuration.KafkaConfigModel;
import it.agilelab.bigdata.wasp.models.configuration.ParsingMode;
import it.agilelab.bigdata.wasp.models.configuration.ParsingMode$;
import it.agilelab.bigdata.wasp.models.configuration.Strict$;
import it.agilelab.bigdata.wasp.repository.core.bl.ConfigBL$;
import it.agilelab.bigdata.wasp.repository.core.bl.TopicBL;
import it.agilelab.bigdata.wasp.spark.sql.kafka011.KafkaSparkSQLSchemas$;
import it.agilelab.darwin.manager.AvroSchemaManagerFactory$;
import org.apache.avro.Schema;
import org.apache.spark.SparkException;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.expressions.CaseWhen;
import org.apache.spark.sql.catalyst.expressions.CaseWhen$;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import org.apache.spark.sql.catalyst.expressions.Hex$;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Array$;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayOps;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.reflect.ClassTag$;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxesRunTime;
import scala.runtime.LazyRef;
import scala.util.Left;
import scala.util.Right;

/* compiled from: KafkaSparkStructuredStreamingReader.scala */
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/plugins/kafka/KafkaSparkStructuredStreamingReader$.class */
public final class KafkaSparkStructuredStreamingReader$ implements SparkStructuredStreamingReader, Logging {
    public static KafkaSparkStructuredStreamingReader$ MODULE$;
    private final String KAFKA_METADATA_COL;
    private final String RAW_VALUE_ATTRIBUTE_NAME;
    private final String DATA_TYPE_ATTRIBUTE_NAME;
    private final WaspLogger logger;

    static {
        new KafkaSparkStructuredStreamingReader$();
    }

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger waspLogger) {
        this.logger = waspLogger;
    }

    private String KAFKA_METADATA_COL() {
        return this.KAFKA_METADATA_COL;
    }

    private String RAW_VALUE_ATTRIBUTE_NAME() {
        return this.RAW_VALUE_ATTRIBUTE_NAME;
    }

    private String DATA_TYPE_ATTRIBUTE_NAME() {
        return this.DATA_TYPE_ATTRIBUTE_NAME;
    }

    public Dataset<Row> createStructuredStream(StructuredStreamingETLModel structuredStreamingETLModel, StreamingReaderModel streamingReaderModel, SparkSession sparkSession) {
        logger().info(() -> {
            return new StringBuilder(37).append("Creating stream from input: ").append(streamingReaderModel).append(" of ETL: ").append(structuredStreamingETLModel).toString();
        });
        logger().info(() -> {
            return new StringBuilder(45).append("Retrieving topic datastore model with name \"").append(streamingReaderModel.datastoreModelName()).append("\"").toString();
        });
        Seq<TopicModel> retrieveTopicModelsRecursively = retrieveTopicModelsRecursively(ConfigBL$.MODULE$.topicBL(), streamingReaderModel.datastoreModelName());
        logger().info(() -> {
            return new StringBuilder(26).append("Retrieved topic model(s): ").append(retrieveTopicModelsRecursively).toString();
        });
        KafkaConfigModel resolve = ConfigManager$.MODULE$.getKafkaConfig().resolve(((TopicModel) retrieveTopicModelsRecursively.head()).clusterAlias());
        logger().info(() -> {
            return new StringBuilder(21).append("Kafka configuration: ").append(resolve).toString();
        });
        if (!BoxesRunTime.unboxToBoolean(((TraversableOnce) retrieveTopicModelsRecursively.map(topicModel -> {
            return BoxesRunTime.boxToBoolean($anonfun$createStructuredStream$5(topicModel));
        }, Seq$.MODULE$.canBuildFrom())).reduce((obj, obj2) -> {
            return BoxesRunTime.boxToBoolean($anonfun$createStructuredStream$6(BoxesRunTime.unboxToBoolean(obj), BoxesRunTime.unboxToBoolean(obj2)));
        }))) {
            String sb = new StringBuilder(50).append("Unable to check/create one or more topic; topics: ").append(retrieveTopicModelsRecursively).toString();
            logger().error(() -> {
                return sb;
            });
            throw new Exception(sb);
        }
        long triggerIntervalMs = SparkUtils$.MODULE$.getTriggerIntervalMs(ConfigManager$.MODULE$.getSparkStreamingConfig(), structuredStreamingETLModel);
        Option map = streamingReaderModel.rateLimit().map(i -> {
            return triggerIntervalMs == 0 ? i : (long) ((triggerIntervalMs / 1000.0d) * i);
        }).map(obj3 -> {
            return $anonfun$createStructuredStream$8(BoxesRunTime.unboxToLong(obj3));
        });
        Map empty = Map$.MODULE$.empty();
        empty.$plus$plus$eq(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("subscribe"), ((TraversableOnce) retrieveTopicModelsRecursively.map(topicModel2 -> {
            return topicModel2.name();
        }, Seq$.MODULE$.canBuildFrom())).mkString(",")), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("kafka.bootstrap.servers"), ((TraversableOnce) resolve.connections().map(connectionConfig -> {
            return connectionConfig.toString();
        }, Seq$.MODULE$.canBuildFrom())).mkString(",")), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("kafkaConsumer.pollTimeoutMs"), BoxesRunTime.boxToInteger(resolve.ingestRateToMills()).toString())})));
        empty.$plus$plus$eq(Option$.MODULE$.option2Iterable(map));
        empty.$plus$plus$eq(((TraversableOnce) resolve.others().map(kafkaEntryConfig -> {
            return kafkaEntryConfig.toTupla();
        }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()));
        empty.$plus$plus$eq(streamingReaderModel.options());
        logger().info(() -> {
            return new StringBuilder(48).append("Final options to be pushed to DataStreamReader: ").append(empty).toString();
        });
        return parseDF(retrieveTopicModelsRecursively, sparkSession.readStream().format("kafka").options(empty).load().withColumn(RAW_VALUE_ATTRIBUTE_NAME(), functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME())), (ParsingMode) ParsingMode$.MODULE$.fromString((String) streamingReaderModel.options().getOrElse("parsingMode", () -> {
            return "strict";
        })).getOrElse(() -> {
            return Strict$.MODULE$;
        }));
    }

    private Column selectMetadata(Column column) {
        return functions$.MODULE$.struct(((List) ((TraversableOnce) ((TraversableLike) KafkaSparkSQLSchemas$.MODULE$.INPUT_SCHEMA().map(structField -> {
            return structField.name();
        }, Seq$.MODULE$.canBuildFrom())).filter(str -> {
            return BoxesRunTime.boxToBoolean($anonfun$selectMetadata$2(str));
        })).toList().map(str2 -> {
            return functions$.MODULE$.col(str2);
        }, List$.MODULE$.canBuildFrom())).$colon$colon(column)).as(KAFKA_METADATA_COL());
    }

    private Column selectMetadata$default$1() {
        return functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.KEY_ATTRIBUTE_NAME());
    }

    private Dataset<Row> parseDF(Seq<TopicModel> seq, Dataset<Row> dataset, ParsingMode parsingMode) {
        Dataset<Row> selectForOneSchema;
        Left areTopicsHealthy = MultiTopicModel$.MODULE$.areTopicsHealthy(seq);
        if (areTopicsHealthy instanceof Left) {
            throw new IllegalArgumentException((String) areTopicsHealthy.value());
        }
        if (!(areTopicsHealthy instanceof Right)) {
            throw new MatchError(areTopicsHealthy);
        }
        Left areTopicsEqualForReading = TopicModelUtils$.MODULE$.areTopicsEqualForReading(seq);
        if (areTopicsEqualForReading instanceof Left) {
            String str = (String) areTopicsEqualForReading.value();
            Left left = TopicModelUtils$.MODULE$.topicsShareKeySchema(seq);
            if (left instanceof Left) {
                throw new IllegalArgumentException((String) left.value());
            }
            if (!(left instanceof Right)) {
                throw new MatchError(left);
            }
            logger().debug(() -> {
                return new StringBuilder(61).append("Suppressing error: '").append(str).append("' and trying with multipleSchema strategy").toString();
            });
            selectForOneSchema = selectForMultipleSchema(seq, dataset, parsingMode);
        } else {
            if (!(areTopicsEqualForReading instanceof Right)) {
                throw new MatchError(areTopicsEqualForReading);
            }
            selectForOneSchema = selectForOneSchema((TopicModel) seq.head(), dataset, parsingMode);
        }
        return selectForOneSchema;
    }

    public Dataset<Row> selectForOneSchema(TopicModel topicModel, Dataset<Row> dataset, ParsingMode parsingMode) {
        Dataset<Row> select;
        String str = topicModel.topicDataType();
        Handle$ handle$ = Handle$.MODULE$;
        if (parsingMode != null ? parsingMode.equals(handle$) : handle$ == null) {
            if (!Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{TopicDataTypes$.MODULE$.JSON(), TopicDataTypes$.MODULE$.AVRO()})).contains(str)) {
                logger().warn(() -> {
                    return new StringBuilder(293).append("Handle parsing mode is not supported for ").append(str).append(" topic type, it will be managed as in ").append("Strict mode (so no raw column will be produced). To remove this warning please set Strict as parsing mode ").append("of your topic. Look at KafkaSparkStructuredStreamingReader#createStructuredStream java doc for more details.").toString();
                });
            }
        }
        String AVRO = TopicDataTypes$.MODULE$.AVRO();
        if (AVRO != null ? !AVRO.equals(str) : str != null) {
            String JSON = TopicDataTypes$.MODULE$.JSON();
            if (JSON != null ? !JSON.equals(str) : str != null) {
                String PLAINTEXT = TopicDataTypes$.MODULE$.PLAINTEXT();
                if (PLAINTEXT != null ? !PLAINTEXT.equals(str) : str != null) {
                    String BINARY = TopicDataTypes$.MODULE$.BINARY();
                    if (BINARY != null ? !BINARY.equals(str) : str != null) {
                        throw new UnsupportedOperationException(new StringBuilder(30).append("Unsupported topic data type \"").append(topicModel.topicDataType()).append("\"").toString());
                    }
                    select = dataset.select(Predef$.MODULE$.wrapRefArray(new Column[]{selectMetadata(selectMetadata$default$1()), functions$.MODULE$.expr(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME())}));
                } else {
                    select = dataset.withColumn("value_string", parseString()).select(Predef$.MODULE$.wrapRefArray(new Column[]{selectMetadata(selectMetadata$default$1()), functions$.MODULE$.expr("value_string AS value")}));
                }
            } else {
                DataType dataType = getDataType(topicModel.getJsonSchema());
                select = checkParsingMode(dataset.withColumn(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME(), functions$.MODULE$.from_json(parseString(), dataType)), parsingMode, TopicDataTypes$.MODULE$.JSON(), checkParsingMode$default$4(), new Some(dataType));
            }
        } else {
            select = checkParsingMode(dataset.withColumn(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME(), parseAvroValue(topicModel)), parsingMode, TopicDataTypes$.MODULE$.AVRO(), parseKey(topicModel), checkParsingMode$default$5());
        }
        Dataset<Row> dataset2 = select;
        logger().debug(() -> {
            return new StringBuilder(18).append("DataFrame schema: ").append(dataset2.schema().treeString()).toString();
        });
        return dataset2;
    }

    public Dataset<Row> checkParsingMode(Dataset<Row> dataset, ParsingMode parsingMode, String str, Column column, Option<DataType> option) {
        Dataset<Row> select;
        if (Strict$.MODULE$.equals(parsingMode)) {
            select = dataset.withColumn("computedValue", functions$.MODULE$.when(functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME()).isNull().$bar$bar(isNull(option).apply(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME())}))), strictExceptionLauncherUdf().apply(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col(RAW_VALUE_ATTRIBUTE_NAME()), functions$.MODULE$.lit(str)}))).otherwise(functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME()))).select(Predef$.MODULE$.wrapRefArray(new Column[]{selectMetadata(column), functions$.MODULE$.col(new StringBuilder(2).append("computedValue").append(".*").toString())}));
        } else if (Ignore$.MODULE$.equals(parsingMode)) {
            select = dataset.select(Predef$.MODULE$.wrapRefArray(new Column[]{selectMetadata(column), functions$.MODULE$.col(new StringBuilder(2).append(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME()).append(".*").toString())})).where(functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME()).isNotNull().$amp$amp(isNull(option).apply(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME())})).unary_$bang()));
        } else {
            if (!Handle$.MODULE$.equals(parsingMode)) {
                throw new MatchError(parsingMode);
            }
            select = dataset.select(Predef$.MODULE$.wrapRefArray(new Column[]{selectMetadata(column), functions$.MODULE$.when(functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME()).isNull().$bar$bar(isNull(option).apply(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME())}))), functions$.MODULE$.col(RAW_VALUE_ATTRIBUTE_NAME())).otherwise((Object) null).as(RAW_VALUE_ATTRIBUTE_NAME()), functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME())}));
        }
        return select;
    }

    public Column checkParsingMode$default$4() {
        return functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.KEY_ATTRIBUTE_NAME());
    }

    public Option<DataType> checkParsingMode$default$5() {
        return None$.MODULE$;
    }

    public UserDefinedFunction isNull(Option<DataType> option) {
        UserDefinedFunction udf;
        if (option instanceof Some) {
            DataType dataType = (DataType) ((Some) option).value();
            udf = functions$.MODULE$.udf(obj -> {
                return BoxesRunTime.boxToBoolean($anonfun$isNull$2(dataType, obj));
            }, package$.MODULE$.universe().TypeTag().Boolean(), package$.MODULE$.universe().TypeTag().Any());
        } else {
            if (!None$.MODULE$.equals(option)) {
                throw new MatchError(option);
            }
            udf = functions$.MODULE$.udf(obj2 -> {
                return BoxesRunTime.boxToBoolean($anonfun$isNull$3(obj2));
            }, package$.MODULE$.universe().TypeTag().Boolean(), package$.MODULE$.universe().TypeTag().Any());
        }
        return udf;
    }

    public UserDefinedFunction strictExceptionLauncherUdf() {
        functions$ functions_ = functions$.MODULE$;
        Function2 function2 = (bArr, str) -> {
            String JSON = TopicDataTypes$.MODULE$.JSON();
            if (JSON != null ? JSON.equals(str) : str == null) {
                throw new SparkException(new StringBuilder(32).append("Unable to parse raw value [").append(StringToByteArrayUtil$.MODULE$.byteArrayToString(bArr)).append("] to ").append(str).toString());
            }
            String AVRO = TopicDataTypes$.MODULE$.AVRO();
            if (AVRO != null ? !AVRO.equals(str) : str != null) {
                throw new MatchError(str);
            }
            throw new SparkException(new StringBuilder(32).append("Unable to parse raw value [").append(Hex$.MODULE$.hex(bArr)).append("] to ").append(str).toString());
        };
        TypeTags.TypeTag Nothing = package$.MODULE$.universe().TypeTag().Nothing();
        TypeTags universe = package$.MODULE$.universe();
        TypeTags.TypeTag apply = universe.TypeTag().apply(package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator() { // from class: it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaSparkStructuredStreamingReader$$typecreator1$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe2 = mirror.universe();
                return universe2.internal().reificationSupport().TypeRef(universe2.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticClass("scala.Array"), new $colon.colon(mirror.staticClass("scala.Byte").asType().toTypeConstructor(), Nil$.MODULE$));
            }
        });
        TypeTags universe2 = package$.MODULE$.universe();
        return functions_.udf(function2, Nothing, apply, universe2.TypeTag().apply(package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator() { // from class: it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaSparkStructuredStreamingReader$$typecreator2$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe3 = mirror.universe();
                return universe3.internal().reificationSupport().TypeRef(universe3.internal().reificationSupport().SingleType(mirror.staticPackage("scala").asModule().moduleClass().asType().toTypeConstructor(), mirror.staticModule("scala.Predef")), universe3.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$);
            }
        }));
    }

    public Dataset<Row> selectForMultipleSchema(Seq<TopicModel> seq, Dataset<Row> dataset, ParsingMode parsingMode) {
        Column selectMetadata;
        List reverse = ((List) seq.foldLeft(List$.MODULE$.empty(), (list, topicModel) -> {
            List $colon$colon;
            String str = topicModel.topicDataType();
            String AVRO = TopicDataTypes$.MODULE$.AVRO();
            if (AVRO != null ? !AVRO.equals(str) : str != null) {
                String JSON = TopicDataTypes$.MODULE$.JSON();
                if (JSON != null ? !JSON.equals(str) : str != null) {
                    String PLAINTEXT = TopicDataTypes$.MODULE$.PLAINTEXT();
                    if (PLAINTEXT != null ? !PLAINTEXT.equals(str) : str != null) {
                        String BINARY = TopicDataTypes$.MODULE$.BINARY();
                        if (BINARY != null ? !BINARY.equals(str) : str != null) {
                            throw new UnsupportedOperationException(new StringBuilder(30).append("Unsupported topic data type \"").append(topicModel.topicDataType()).append("\"").toString());
                        }
                        $colon$colon = list.$colon$colon(MODULE$.parseIfMyTopicOrNull(topicModel.name(), MODULE$.parseBinary(), str));
                    } else {
                        $colon$colon = list.$colon$colon(MODULE$.parseIfMyTopicOrNull(topicModel.name(), MODULE$.parseString(), str));
                    }
                } else {
                    $colon$colon = list.$colon$colon(MODULE$.parseIfMyTopicOrNull(topicModel.name(), MODULE$.parseJson(topicModel), str));
                }
            } else {
                $colon$colon = list.$colon$colon(MODULE$.parseIfMyTopicOrNull(topicModel.name(), MODULE$.parseAvroValue(topicModel), str));
            }
            return $colon$colon;
        })).reverse();
        logger().info(() -> {
            return new StringBuilder(33).append("Selecting the following columns:\n").append(reverse.mkString("\t- ", "\n\t- ", "")).toString();
        });
        Some collectFirst = seq.collectFirst(new KafkaSparkStructuredStreamingReader$$anonfun$1());
        if (collectFirst instanceof Some) {
            selectMetadata = selectMetadata(parseKey((TopicModel) collectFirst.value()));
        } else {
            if (!None$.MODULE$.equals(collectFirst)) {
                throw new MatchError(collectFirst);
            }
            selectMetadata = selectMetadata(selectMetadata$default$1());
        }
        Dataset<Row> select = dataset.select(reverse.$colon$colon(functions$.MODULE$.col(RAW_VALUE_ATTRIBUTE_NAME())).$colon$colon(selectMetadata));
        logger().debug(() -> {
            return new StringBuilder(44).append("DataFrame schema before parsing mode check: ").append(select.schema().treeString()).toString();
        });
        Dataset<Row> checkParsingModeMultipleTopics = checkParsingModeMultipleTopics(select, parsingMode, ((TraversableOnce) seq.map(topicModel2 -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(TopicModelUtils$.MODULE$.topicNameToColumnName(topicModel2.name())), MODULE$.getDataType(topicModel2.getJsonSchema()));
        }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()));
        logger().debug(() -> {
            return new StringBuilder(43).append("DataFrame schema after parsing mode check: ").append(select.schema().treeString()).toString();
        });
        return checkParsingModeMultipleTopics;
    }

    public Dataset<Row> checkParsingModeMultipleTopics(Dataset<Row> dataset, ParsingMode parsingMode, scala.collection.immutable.Map<String, DataType> map) {
        Dataset<Row> select;
        String[] strArr = (String[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(dataset.columns())).diff(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{KAFKA_METADATA_COL(), RAW_VALUE_ATTRIBUTE_NAME()})));
        Column[] columnArr = (Column[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(strArr)).map(str -> {
            return functions$.MODULE$.when(functions$.MODULE$.col(str).isNotNull(), parsedValueCol$1(str)).otherwise((Object) null).as(str);
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Column.class)));
        Column column = (Column) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(strArr)).map(str2 -> {
            return functions$.MODULE$.col(str2).isNull().$bar$bar(this.dataTypeToCheckCondition$1(str2).unary_$bang()).$bar$bar(this.dataTypeToCheckCondition$1(str2).$amp$amp(parsedValueCol$1(str2).isNotNull().$amp$amp(MODULE$.isNull(map.get(str2)).apply(Predef$.MODULE$.wrapRefArray(new Column[]{parsedValueCol$1(str2)})).unary_$bang())));
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Column.class))))).reduce((column2, column3) -> {
            return column2.and(column3);
        });
        if (Strict$.MODULE$.equals(parsingMode)) {
            select = ((Dataset) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(strArr)).foldLeft(dataset, (dataset2, str3) -> {
                return dataset2.withColumn("workingColumn", functions$.MODULE$.when(this.dataTypeToCheckCondition$1(str3).$amp$amp(functions$.MODULE$.col(str3).isNotNull()).$amp$amp(parsedValueCol$1(str3).isNull().$bar$bar(MODULE$.isNull(map.get(str3)).apply(Predef$.MODULE$.wrapRefArray(new Column[]{parsedValueCol$1(str3)})))), MODULE$.strictExceptionLauncherUdf().apply(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col(MODULE$.RAW_VALUE_ATTRIBUTE_NAME()), this.parsedDataTypeCol$1(str3)}))).otherwise((Object) null));
            })).select(Predef$.MODULE$.wrapRefArray((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(columnArr)).$plus$colon(functions$.MODULE$.col(KAFKA_METADATA_COL()), ClassTag$.MODULE$.apply(Column.class))));
        } else if (Ignore$.MODULE$.equals(parsingMode)) {
            select = dataset.where(column).select(Predef$.MODULE$.wrapRefArray((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(columnArr)).$plus$colon(functions$.MODULE$.col(KAFKA_METADATA_COL()), ClassTag$.MODULE$.apply(Column.class))));
        } else {
            if (!Handle$.MODULE$.equals(parsingMode)) {
                throw new MatchError(parsingMode);
            }
            select = dataset.select((Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col(KAFKA_METADATA_COL()), functions$.MODULE$.when(column.unary_$bang(), functions$.MODULE$.col(RAW_VALUE_ATTRIBUTE_NAME())).otherwise((Object) null).as(RAW_VALUE_ATTRIBUTE_NAME())})).$plus$plus(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(columnArr)), Seq$.MODULE$.canBuildFrom()));
        }
        return select;
    }

    private Column parseBinary() {
        return functions$.MODULE$.col(RAW_VALUE_ATTRIBUTE_NAME());
    }

    private Column parseString() {
        return functions$.MODULE$.col(RAW_VALUE_ATTRIBUTE_NAME()).cast(StringType$.MODULE$);
    }

    private Column parseJson(TopicModel topicModel) {
        return functions$.MODULE$.from_json(parseString(), getDataType(topicModel.getJsonSchema()));
    }

    private Column parseKey(TopicModel topicModel) {
        Column col;
        Some keySchema = topicModel.keySchema();
        if (keySchema instanceof Some) {
            String str = (String) keySchema.value();
            LazyRef lazyRef = new LazyRef();
            logger().debug(() -> {
                return new StringBuilder(17).append("AVRO key schema: ").append(new Schema.Parser().parse(str).toString(true)).toString();
            });
            Some some = topicModel.useAvroSchemaManager() ? new Some(ConfigManager$.MODULE$.getAvroSchemaManagerConfig()) : None$.MODULE$;
            col = new Column(new AvroDeserializerExpression(functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.KEY_ATTRIBUTE_NAME()).expr(), (String) (str.isEmpty() ? avroSchemaManager$1(lazyRef, some).flatMap(avroSchemaManager -> {
                return SubjectStrategy$.MODULE$.subjectFor(topicModel.getJsonSchema(), topicModel, true).map(str2 -> {
                    return ((Schema) ((Tuple2) avroSchemaManager.retrieveLatestSchema(str2).getOrElse(() -> {
                        throw new RuntimeException(new StringBuilder(78).append("Reader schema not specified and fetching latest schema with subject '").append(str2).append("' failed.").toString());
                    }))._2()).toString();
                });
            }) : new Some(str)).get(), some, true, AvroDeserializerExpression$.MODULE$.apply$default$5()));
        } else {
            if (!None$.MODULE$.equals(keySchema)) {
                throw new MatchError(keySchema);
            }
            col = functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.KEY_ATTRIBUTE_NAME());
        }
        return col;
    }

    private Column parseAvroValue(TopicModel topicModel) {
        LazyRef lazyRef = new LazyRef();
        logger().debug(() -> {
            return new StringBuilder(19).append("AVRO value schema: ").append(new Schema.Parser().parse(topicModel.getJsonSchema()).toString(true)).toString();
        });
        Some some = topicModel.useAvroSchemaManager() ? new Some(ConfigManager$.MODULE$.getAvroSchemaManagerConfig()) : None$.MODULE$;
        return new Column(new AvroDeserializerExpression(functions$.MODULE$.col(RAW_VALUE_ATTRIBUTE_NAME()).expr(), (String) (topicModel.schema().isEmpty() ? avroSchemaManager$2(lazyRef, some).flatMap(avroSchemaManager -> {
            return SubjectStrategy$.MODULE$.subjectFor(topicModel.getJsonSchema(), topicModel, false).map(str -> {
                return ((Schema) ((Tuple2) avroSchemaManager.retrieveLatestSchema(str).getOrElse(() -> {
                    throw new RuntimeException(new StringBuilder(78).append("Reader schema not specified and fetching latest schema with subject '").append(str).append("' failed.").toString());
                }))._2()).toString();
            });
        }) : new Some(topicModel.getJsonSchema())).get(), some, true, AvroDeserializerExpression$.MODULE$.apply$default$5()));
    }

    private Seq<TopicModel> retrieveTopicModelsRecursively(TopicBL topicBL, String str) {
        return innerRetrieveTopicModelsRecursively$1(str, topicBL);
    }

    private DataType getDataType(String str) {
        return AvroSchemaConverters$.MODULE$.toSqlType(new Schema.Parser().parse(str)).dataType();
    }

    private Column parseIfMyTopicOrNull(String str, Column column, String str2) {
        return when$1(functions$.MODULE$.col(KafkaSparkSQLSchemas$.MODULE$.TOPIC_ATTRIBUTE_NAME()).equalTo(str), functions$.MODULE$.struct(Predef$.MODULE$.wrapRefArray(new Column[]{column.as(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME()), functions$.MODULE$.lit(str2).as(DATA_TYPE_ATTRIBUTE_NAME())}))).otherwise((Object) null).as(TopicModelUtils$.MODULE$.topicNameToColumnName(str));
    }

    public static final /* synthetic */ boolean $anonfun$createStructuredStream$5(TopicModel topicModel) {
        return BoxesRunTime.unboxToBoolean(WaspSystem$.MODULE$.$qmark$qmark(WaspSystem$.MODULE$.kafkaAdminActor(topicModel.clusterAlias()), new CheckOrCreateTopic(topicModel.name(), topicModel.partitions(), topicModel.replicas()), WaspSystem$.MODULE$.$qmark$qmark$default$3()));
    }

    public static final /* synthetic */ boolean $anonfun$createStructuredStream$6(boolean z, boolean z2) {
        return z && z2;
    }

    public static final /* synthetic */ Tuple2 $anonfun$createStructuredStream$8(long j) {
        return new Tuple2("maxOffsetsPerTrigger", BoxesRunTime.boxToLong(j).toString());
    }

    public static final /* synthetic */ boolean $anonfun$selectMetadata$2(String str) {
        String VALUE_ATTRIBUTE_NAME = KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME();
        if (str != null ? !str.equals(VALUE_ATTRIBUTE_NAME) : VALUE_ATTRIBUTE_NAME != null) {
            String KEY_ATTRIBUTE_NAME = KafkaSparkSQLSchemas$.MODULE$.KEY_ATTRIBUTE_NAME();
            if (str != null ? !str.equals(KEY_ATTRIBUTE_NAME) : KEY_ATTRIBUTE_NAME != null) {
                String RAW_VALUE_ATTRIBUTE_NAME = MODULE$.RAW_VALUE_ATTRIBUTE_NAME();
                if (str != null ? !str.equals(RAW_VALUE_ATTRIBUTE_NAME) : RAW_VALUE_ATTRIBUTE_NAME != null) {
                    return true;
                }
            }
        }
        return false;
    }

    public static final /* synthetic */ boolean $anonfun$isNull$1(Tuple2 tuple2) {
        if (tuple2 != null) {
            return !((StructField) tuple2._2()).nullable() && tuple2._1() == null;
        }
        throw new MatchError(tuple2);
    }

    private static final boolean isNullImpl$1(GenericRowWithSchema genericRowWithSchema) {
        return ((IterableLike) genericRowWithSchema.toSeq().zip(genericRowWithSchema.schema(), Seq$.MODULE$.canBuildFrom())).exists(tuple2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$isNull$1(tuple2));
        });
    }

    public static final /* synthetic */ boolean $anonfun$isNull$2(DataType dataType, Object obj) {
        return isNullImpl$1(new GenericRowWithSchema((Object[]) ((GenericRowWithSchema) obj).toSeq().toArray(ClassTag$.MODULE$.Any()), (StructType) dataType));
    }

    public static final /* synthetic */ boolean $anonfun$isNull$3(Object obj) {
        return isNullImpl$1((GenericRowWithSchema) obj);
    }

    private static final Column parsedValueCol$1(String str) {
        return functions$.MODULE$.col(new StringBuilder(1).append(str).append(".").append(KafkaSparkSQLSchemas$.MODULE$.VALUE_ATTRIBUTE_NAME()).toString());
    }

    private final Column parsedDataTypeCol$1(String str) {
        return functions$.MODULE$.col(new StringBuilder(1).append(str).append(".").append(DATA_TYPE_ATTRIBUTE_NAME()).toString());
    }

    private final Column dataTypeToCheckCondition$1(String str) {
        return functions$.MODULE$.col(str).isNull().$bar$bar(functions$.MODULE$.lower(parsedDataTypeCol$1(str)).isin(Predef$.MODULE$.genericWrapArray(new Object[]{TopicDataTypes$.MODULE$.JSON(), TopicDataTypes$.MODULE$.AVRO()})));
    }

    private static final /* synthetic */ Option avroSchemaManager$lzycompute$1(LazyRef lazyRef, Option option) {
        Option option2;
        synchronized (lazyRef) {
            option2 = lazyRef.initialized() ? (Option) lazyRef.value() : (Option) lazyRef.initialize(option.map(config -> {
                return AvroSchemaManagerFactory$.MODULE$.initialize(config);
            }));
        }
        return option2;
    }

    private static final Option avroSchemaManager$1(LazyRef lazyRef, Option option) {
        return lazyRef.initialized() ? (Option) lazyRef.value() : avroSchemaManager$lzycompute$1(lazyRef, option);
    }

    private static final /* synthetic */ Option avroSchemaManager$lzycompute$2(LazyRef lazyRef, Option option) {
        Option option2;
        synchronized (lazyRef) {
            option2 = lazyRef.initialized() ? (Option) lazyRef.value() : (Option) lazyRef.initialize(option.map(config -> {
                return AvroSchemaManagerFactory$.MODULE$.initialize(config);
            }));
        }
        return option2;
    }

    private static final Option avroSchemaManager$2(LazyRef lazyRef, Option option) {
        return lazyRef.initialized() ? (Option) lazyRef.value() : avroSchemaManager$lzycompute$2(lazyRef, option);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final Seq innerRetrieveTopicModelsRecursively$1(String str, TopicBL topicBL) {
        Seq seq;
        TopicModel topicModel = (DatastoreModel) topicBL.getByName(str).getOrElse(() -> {
            throw new IllegalArgumentException(new StringBuilder(29).append("Cannot find topic with name: ").append(str).toString());
        });
        if (topicModel instanceof TopicModel) {
            seq = (Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new TopicModel[]{topicModel}));
        } else {
            if (!(topicModel instanceof MultiTopicModel)) {
                throw new MatchError(topicModel);
            }
            seq = (Seq) ((MultiTopicModel) topicModel).topicModelNames().flatMap(str2 -> {
                return innerRetrieveTopicModelsRecursively$1(str2, topicBL);
            }, Seq$.MODULE$.canBuildFrom());
        }
        return seq;
    }

    private static final Column when$1(Column column, Column column2) {
        return new Column(new CaseWhen(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2(column.expr(), column2.expr())})), CaseWhen$.MODULE$.apply$default$2()));
    }

    private KafkaSparkStructuredStreamingReader$() {
        MODULE$ = this;
        Logging.$init$(this);
        this.KAFKA_METADATA_COL = "kafkaMetadata";
        this.RAW_VALUE_ATTRIBUTE_NAME = "raw";
        this.DATA_TYPE_ATTRIBUTE_NAME = "dataType";
    }
}
