package tech.ytsaurus.spyt.streaming;

import org.apache.spark.internal.Logging;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.read.streaming.CompositeReadLimit;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.connector.read.streaming.ReadLimit;
import org.apache.spark.sql.connector.read.streaming.ReadMaxRows;
import org.apache.spark.sql.connector.read.streaming.SupportsAdmissionControl;
import org.apache.spark.sql.execution.StreamingUtils$;
import org.apache.spark.sql.execution.streaming.Source;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SortedMap$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.WrappedArray;
import scala.math.Ordering$Int$;
import scala.math.package$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;
import tech.ytsaurus.client.CompoundClient;
import tech.ytsaurus.spyt.wrapper.YtWrapper$;
import tech.ytsaurus.spyt.wrapper.client.YtClientConfigurationConverter$;
import tech.ytsaurus.spyt.wrapper.client.YtClientProvider$;

/* compiled from: YtStreamingSource.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\re\u0001B\f\u0019\u0001\u0005B\u0001\u0002\u0013\u0001\u0003\u0002\u0003\u0006I!\u0013\u0005\t\u001b\u0002\u0011\t\u0011)A\u0005\u001d\"A1\f\u0001B\u0001B\u0003%a\n\u0003\u0005]\u0001\t\u0015\r\u0011\"\u0001^\u0011!!\u0007A!A!\u0002\u0013q\u0006\u0002C3\u0001\u0005\u0003\u0005\u000b\u0011\u00024\t\u000b%\u0004A\u0011\u00016\t\u000fI\u0004!\u0019!C\u0006g\"1!\u0010\u0001Q\u0001\nQDqa\u001f\u0001C\u0002\u0013%A\u0010\u0003\u0004~\u0001\u0001\u0006IA\u0014\u0005\b}\u0002\u0001\r\u0011\"\u0003��\u0011%\ty\u0001\u0001a\u0001\n\u0013\t\t\u0002\u0003\u0005\u0002\u001e\u0001\u0001\u000b\u0015BA\u0001\u0011%\ty\u0002\u0001b\u0001\n\u0013\t\t\u0003\u0003\u0005\u0002*\u0001\u0001\u000b\u0011BA\u0012\u0011\u001d\tY\u0003\u0001C!\u0003[AaA \u0001\u0005B\u0005U\u0002bBA#\u0001\u0011\u0005\u0013q\t\u0005\b\u0003\u001f\u0002A\u0011IA)\u0011\u001d\tI\b\u0001C!\u0003wBq!a \u0001\t\u0003\n\tIA\tZiN#(/Z1nS:<7k\\;sG\u0016T!!\u0007\u000e\u0002\u0013M$(/Z1nS:<'BA\u000e\u001d\u0003\u0011\u0019\b/\u001f;\u000b\u0005uq\u0012\u0001C=ug\u0006,(/^:\u000b\u0003}\tA\u0001^3dQ\u000e\u00011#\u0002\u0001#Uez\u0004CA\u0012)\u001b\u0005!#BA\u0013'\u0003\u0011a\u0017M\\4\u000b\u0003\u001d\nAA[1wC&\u0011\u0011\u0006\n\u0002\u0007\u001f\nTWm\u0019;\u0011\u0005-:T\"\u0001\u0017\u000b\u0005ei#B\u0001\u00180\u0003%)\u00070Z2vi&|gN\u0003\u00021c\u0005\u00191/\u001d7\u000b\u0005I\u001a\u0014!B:qCJ\\'B\u0001\u001b6\u0003\u0019\t\u0007/Y2iK*\ta'A\u0002pe\u001eL!\u0001\u000f\u0017\u0003\rM{WO]2f!\tQT(D\u0001<\u0015\ta\u0014'\u0001\u0005j]R,'O\\1m\u0013\tq4HA\u0004M_\u001e<\u0017N\\4\u0011\u0005\u00013U\"A!\u000b\u0005e\u0011%BA\"E\u0003\u0011\u0011X-\u00193\u000b\u0005\u0015{\u0013!C2p]:,7\r^8s\u0013\t9\u0015I\u0001\rTkB\u0004xN\u001d;t\u0003\u0012l\u0017n]:j_:\u001cuN\u001c;s_2\f!b]9m\u0007>tG/\u001a=u!\tQ5*D\u00010\u0013\tauF\u0001\u0006T#2\u001buN\u001c;fqR\fAbY8ogVlWM\u001d)bi\"\u0004\"a\u0014-\u000f\u0005A3\u0006CA)U\u001b\u0005\u0011&BA*!\u0003\u0019a$o\\8u})\tQ+A\u0003tG\u0006d\u0017-\u0003\u0002X)\u00061\u0001K]3eK\u001aL!!\u0017.\u0003\rM#(/\u001b8h\u0015\t9F+A\u0005rk\u0016,X\rU1uQ\u000611o\u00195f[\u0006,\u0012A\u0018\t\u0003?\nl\u0011\u0001\u0019\u0006\u0003C>\nQ\u0001^=qKNL!a\u00191\u0003\u0015M#(/^2u)f\u0004X-A\u0004tG\",W.\u0019\u0011\u0002\u0015A\f'/Y7fi\u0016\u00148\u000f\u0005\u0003PO:s\u0015B\u00015[\u0005\ri\u0015\r]\u0001\u0007y%t\u0017\u000e\u001e \u0015\r-lgn\u001c9r!\ta\u0007!D\u0001\u0019\u0011\u0015Au\u00011\u0001J\u0011\u0015iu\u00011\u0001O\u0011\u0015Yv\u00011\u0001O\u0011\u0015av\u00011\u0001_\u0011\u0015)w\u00011\u0001g\u0003\tIH/F\u0001u!\t)\b0D\u0001w\u0015\t9H$\u0001\u0004dY&,g\u000e^\u0005\u0003sZ\u0014abQ8na>,h\u000eZ\"mS\u0016tG/A\u0002zi\u0002\nqa\u00197vgR,'/F\u0001O\u0003!\u0019G.^:uKJ\u0004\u0013\u0001\u00047bi\u0016\u001cHo\u00144gg\u0016$XCAA\u0001!\u0019\t\u0019!!\u0002\u0002\n5\tA+C\u0002\u0002\bQ\u0013aa\u00149uS>t\u0007c\u00017\u0002\f%\u0019\u0011Q\u0002\r\u0003\u001be#\u0018+^3vK>3gm]3u\u0003Aa\u0017\r^3ti>3gm]3u?\u0012*\u0017\u000f\u0006\u0003\u0002\u0014\u0005e\u0001\u0003BA\u0002\u0003+I1!a\u0006U\u0005\u0011)f.\u001b;\t\u0013\u0005mQ\"!AA\u0002\u0005\u0005\u0011a\u0001=%c\u0005iA.\u0019;fgR|eMZ:fi\u0002\nQ#\u001b8dYV$WmU3sm&\u001cWmQ8mk6t7/\u0006\u0002\u0002$A!\u00111AA\u0013\u0013\r\t9\u0003\u0016\u0002\b\u0005>|G.Z1o\u0003YIgn\u00197vI\u0016\u001cVM\u001d<jG\u0016\u001cu\u000e\\;n]N\u0004\u0013aE4fi\u0012+g-Y;miJ+\u0017\r\u001a'j[&$HCAA\u0018!\r\u0001\u0015\u0011G\u0005\u0004\u0003g\t%!\u0003*fC\u0012d\u0015.\\5u)\u0019\t9$!\u0010\u0002BA\u0019\u0001)!\u000f\n\u0007\u0005m\u0012I\u0001\u0004PM\u001a\u001cX\r\u001e\u0005\b\u0003\u007f\u0011\u0002\u0019AA\u001c\u0003-\u0019H/\u0019:u\u001f\u001a47/\u001a;\t\u000f\u0005\r#\u00031\u0001\u00020\u0005)A.[7ji\u0006Iq-\u001a;PM\u001a\u001cX\r^\u000b\u0003\u0003\u0013\u0002b!a\u0001\u0002\u0006\u0005-\u0003cA\u0016\u0002N%\u0019\u00111\b\u0017\u0002\u0011\u001d,GOQ1uG\"$b!a\u0015\u0002r\u0005U\u0004\u0003BA+\u0003WrA!a\u0016\u0002h9!\u0011\u0011LA3\u001d\u0011\tY&a\u0019\u000f\t\u0005u\u0013\u0011\r\b\u0004#\u0006}\u0013\"\u0001\u001c\n\u0005Q*\u0014B\u0001\u001a4\u0013\t\u0001\u0014'C\u0002\u0002j=\nq\u0001]1dW\u0006<W-\u0003\u0003\u0002n\u0005=$!\u0003#bi\u00064%/Y7f\u0015\r\tIg\f\u0005\b\u0003g\"\u0002\u0019AA%\u0003\u0015\u0019H/\u0019:u\u0011\u001d\t9\b\u0006a\u0001\u0003\u0017\n1!\u001a8e\u0003\u0019\u0019w.\\7jiR!\u00111CA?\u0011\u001d\t9(\u0006a\u0001\u0003\u0017\nAa\u001d;paR\u0011\u00111\u0003")
/* loaded from: input_file:tech/ytsaurus/spyt/streaming/YtStreamingSource.class */
public class YtStreamingSource implements Source, Logging, SupportsAdmissionControl {
    private final SQLContext sqlContext;
    private final String consumerPath;
    private final String queuePath;
    private final StructType schema;
    private final Map<String, String> parameters;
    private final CompoundClient yt;
    private final String cluster;
    private Option<YtQueueOffset> latestOffset;
    private final boolean includeServiceColumns;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public Offset reportLatestOffset() {
        return super.reportLatestOffset();
    }

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void initializeForcefully(boolean z, boolean z2) {
        Logging.initializeForcefully$(this, z, z2);
    }

    public Offset initialOffset() {
        return Source.initialOffset$(this);
    }

    public Offset deserializeOffset(String str) {
        return Source.deserializeOffset$(this, str);
    }

    public void commit(Offset offset) {
        Source.commit$(this, offset);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    public StructType schema() {
        return this.schema;
    }

    private CompoundClient yt() {
        return this.yt;
    }

    private String cluster() {
        return this.cluster;
    }

    private Option<YtQueueOffset> latestOffset() {
        return this.latestOffset;
    }

    private void latestOffset_$eq(Option<YtQueueOffset> option) {
        this.latestOffset = option;
    }

    private boolean includeServiceColumns() {
        return this.includeServiceColumns;
    }

    public ReadLimit getDefaultReadLimit() {
        ReadLimit allAvailable;
        Some some = this.parameters.get("max_rows_per_partition");
        if (some instanceof Some) {
            allAvailable = ReadLimit.compositeLimit(new ReadLimit[]{ReadLimit.maxRows(new StringOps(Predef$.MODULE$.augmentString((String) some.value())).toLong())});
        } else {
            if (!None$.MODULE$.equals(some)) {
                throw new MatchError(some);
            }
            allAvailable = ReadLimit.allAvailable();
        }
        return allAvailable;
    }

    public Offset latestOffset(Offset offset, ReadLimit readLimit) {
        WrappedArray wrapRefArray = readLimit instanceof CompositeReadLimit ? Predef$.MODULE$.wrapRefArray(((CompositeReadLimit) readLimit).getReadLimits()) : (Seq) new $colon.colon(readLimit, Nil$.MODULE$);
        YtQueueOffset ytQueueOffset = (YtQueueOffset) getOffset().get();
        Option collectFirst = wrapRefArray.collectFirst(new YtStreamingSource$$anonfun$1(null));
        if (!collectFirst.isDefined()) {
            return ytQueueOffset;
        }
        long maxRows = ((ReadMaxRows) collectFirst.get()).maxRows();
        YtQueueOffset apply = offset != null ? YtQueueOffset$.MODULE$.apply(offset) : null;
        return new YtQueueOffset(cluster(), this.queuePath, SortedMap$.MODULE$.apply((Seq) ytQueueOffset.partitions().toSeq().map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            int _1$mcI$sp = tuple2._1$mcI$sp();
            return new Tuple2.mcIJ.sp(_1$mcI$sp, package$.MODULE$.min((apply != null ? BoxesRunTime.unboxToLong(apply.partitions().getOrElse(BoxesRunTime.boxToInteger(_1$mcI$sp), () -> {
                return -1L;
            })) : BoxesRunTime.unboxToLong(YtQueueOffset$.MODULE$.getCurrentOffset(this.cluster(), this.consumerPath, this.queuePath, this.yt()).partitions().getOrElse(BoxesRunTime.boxToInteger(_1$mcI$sp), () -> {
                return -1L;
            }))) + maxRows, tuple2._2$mcJ$sp()));
        }, Seq$.MODULE$.canBuildFrom()), Ordering$Int$.MODULE$));
    }

    public Option<org.apache.spark.sql.execution.streaming.Offset> getOffset() {
        Option<YtQueueOffset> orElse;
        logDebug(() -> {
            return new StringBuilder(15).append("Get offset for ").append(this.queuePath).toString();
        });
        Success maxOffset = YtQueueOffset$.MODULE$.getMaxOffset(cluster(), this.queuePath, yt());
        if (maxOffset instanceof Success) {
            latestOffset_$eq(new Some((YtQueueOffset) maxOffset.value()));
            orElse = latestOffset();
        } else {
            if (!(maxOffset instanceof Failure)) {
                throw new MatchError(maxOffset);
            }
            logWarning(() -> {
                return "Error while getting new offset";
            }, ((Failure) maxOffset).exception());
            orElse = latestOffset().orElse(() -> {
                throw new IllegalStateException("Latest and new offsets are lost");
            });
        }
        return orElse;
    }

    public Dataset<Row> getBatch(Option<org.apache.spark.sql.execution.streaming.Offset> option, org.apache.spark.sql.execution.streaming.Offset offset) {
        RDD<InternalRow> name;
        if (option.isDefined()) {
            Object obj = option.get();
            if (obj != null ? obj.equals(offset) : offset == null) {
                name = this.sqlContext.sparkContext().emptyRDD(ClassTag$.MODULE$.apply(InternalRow.class)).setName("empty");
                return StreamingUtils$.MODULE$.createStreamingDataFrame(this.sqlContext, name, schema());
            }
        }
        YtQueueOffset currentOffset = YtQueueOffset$.MODULE$.getCurrentOffset(cluster(), this.consumerPath, this.queuePath, yt());
        YtQueueOffset ytQueueOffset = (YtQueueOffset) option.map(offset2 -> {
            return YtQueueOffset$.MODULE$.apply(offset2);
        }).getOrElse(() -> {
            return currentOffset;
        });
        Predef$.MODULE$.require(ytQueueOffset.$greater$eq(currentOffset), () -> {
            return "Committed offset was queried";
        });
        YtQueueOffset apply = YtQueueOffset$.MODULE$.apply(offset);
        Predef$.MODULE$.require(apply.$greater$eq(ytQueueOffset), () -> {
            return "Batch end is less than batch start";
        });
        name = new YtQueueRDD(this.sqlContext.sparkContext(), schema(), this.consumerPath, this.queuePath, YtQueueOffset$.MODULE$.getRanges(ytQueueOffset, apply), includeServiceColumns()).setName("yt");
        return StreamingUtils$.MODULE$.createStreamingDataFrame(this.sqlContext, name, schema());
    }

    public void commit(org.apache.spark.sql.execution.streaming.Offset offset) {
        try {
            YtQueueOffset$.MODULE$.advance(this.consumerPath, YtQueueOffset$.MODULE$.apply(offset), yt());
        } catch (Throwable th) {
            logWarning(() -> {
                return "Error in committing new offset";
            }, th);
        }
    }

    public void stop() {
        logDebug(() -> {
            return "Close YtStreamingSource";
        });
    }

    public static final /* synthetic */ boolean $anonfun$includeServiceColumns$1(String str) {
        return new StringOps(Predef$.MODULE$.augmentString(str)).toBoolean();
    }

    public YtStreamingSource(SQLContext sQLContext, String str, String str2, StructType structType, Map<String, String> map) {
        this.sqlContext = sQLContext;
        this.consumerPath = str;
        this.queuePath = str2;
        this.schema = structType;
        this.parameters = map;
        Source.$init$(this);
        Logging.$init$(this);
        this.yt = YtClientProvider$.MODULE$.ytClient(YtClientConfigurationConverter$.MODULE$.ytClientConfiguration(sQLContext.sparkSession()));
        this.cluster = YtWrapper$.MODULE$.clusterName(yt());
        this.latestOffset = None$.MODULE$;
        this.includeServiceColumns = map.get("include_service_columns").exists(str3 -> {
            return BoxesRunTime.boxToBoolean($anonfun$includeServiceColumns$1(str3));
        });
    }
}
