package tech.ytsaurus.spyt.streaming;

import java.util.Set;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.internal.Logging;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.connector.catalog.TableCapability;
import org.apache.spark.sql.execution.streaming.Sink;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import scala.Function0;
import scala.collection.Iterator;
import scala.collection.immutable.Map;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import tech.ytsaurus.spyt.format.YtDynamicTableWriter;
import tech.ytsaurus.spyt.format.conf.SparkYtWriteConfiguration;
import tech.ytsaurus.spyt.format.conf.SparkYtWriteConfiguration$;
import tech.ytsaurus.spyt.fs.path.YPathEnriched;
import tech.ytsaurus.spyt.fs.path.YPathEnriched$;
import tech.ytsaurus.spyt.wrapper.client.YtClientConfiguration;
import tech.ytsaurus.spyt.wrapper.client.YtClientConfigurationConverter$;
import tech.ytsaurus.spyt.wrapper.client.YtClientProvider$;

/* compiled from: YtStreamingSink.scala */
@ScalaSignature(bytes = "\u0006\u0001e4AAC\u0006\u0001)!A!\u0007\u0001B\u0001B\u0003%1\u0007\u0003\u00058\u0001\t\u0005\t\u0015!\u00039\u0011!)\u0005A!A!\u0002\u00131\u0005\"B%\u0001\t\u0003Q\u0005b\u0002)\u0001\u0001\u0004%I!\u0015\u0005\b-\u0002\u0001\r\u0011\"\u0003X\u0011\u0019i\u0006\u0001)Q\u0005%\")!\r\u0001C!G\")A\r\u0001C!K\ny\u0011\f^*ue\u0016\fW.\u001b8h'&t7N\u0003\u0002\r\u001b\u0005I1\u000f\u001e:fC6Lgn\u001a\u0006\u0003\u001d=\tAa\u001d9zi*\u0011\u0001#E\u0001\tsR\u001c\u0018-\u001e:vg*\t!#\u0001\u0003uK\u000eD7\u0001A\n\u0005\u0001UiB\u0006\u0005\u0002\u001775\tqC\u0003\u0002\u00193\u0005!A.\u00198h\u0015\u0005Q\u0012\u0001\u00026bm\u0006L!\u0001H\f\u0003\r=\u0013'.Z2u!\tq\"&D\u0001 \u0015\ta\u0001E\u0003\u0002\"E\u0005IQ\r_3dkRLwN\u001c\u0006\u0003G\u0011\n1a]9m\u0015\t)c%A\u0003ta\u0006\u00148N\u0003\u0002(Q\u00051\u0011\r]1dQ\u0016T\u0011!K\u0001\u0004_J<\u0017BA\u0016 \u0005\u0011\u0019\u0016N\\6\u0011\u00055\u0002T\"\u0001\u0018\u000b\u0005=\"\u0013\u0001C5oi\u0016\u0014h.\u00197\n\u0005Er#a\u0002'pO\u001eLgnZ\u0001\u000bgFd7i\u001c8uKb$\bC\u0001\u001b6\u001b\u0005\u0011\u0013B\u0001\u001c#\u0005)\u0019\u0016\u000bT\"p]R,\u0007\u0010^\u0001\ncV,W/\u001a)bi\"\u0004\"!\u000f\"\u000f\u0005i\u0002\u0005CA\u001e?\u001b\u0005a$BA\u001f\u0014\u0003\u0019a$o\\8u})\tq(A\u0003tG\u0006d\u0017-\u0003\u0002B}\u00051\u0001K]3eK\u001aL!a\u0011#\u0003\rM#(/\u001b8h\u0015\t\te(\u0001\u0006qCJ\fW.\u001a;feN\u0004B!O$9q%\u0011\u0001\n\u0012\u0002\u0004\u001b\u0006\u0004\u0018A\u0002\u001fj]&$h\b\u0006\u0003L\u001b:{\u0005C\u0001'\u0001\u001b\u0005Y\u0001\"\u0002\u001a\u0005\u0001\u0004\u0019\u0004\"B\u001c\u0005\u0001\u0004A\u0004\"B#\u0005\u0001\u00041\u0015!\u00047bi\u0016\u001cHOQ1uG\"LE-F\u0001S!\t\u0019F+D\u0001?\u0013\t)fH\u0001\u0003M_:<\u0017!\u00057bi\u0016\u001cHOQ1uG\"LEm\u0018\u0013fcR\u0011\u0001l\u0017\t\u0003'fK!A\u0017 \u0003\tUs\u0017\u000e\u001e\u0005\b9\u001a\t\t\u00111\u0001S\u0003\rAH%M\u0001\u000fY\u0006$Xm\u001d;CCR\u001c\u0007.\u00133!Q\t9q\f\u0005\u0002TA&\u0011\u0011M\u0010\u0002\tm>d\u0017\r^5mK\u0006AAo\\*ue&tw\rF\u00019\u0003!\tG\r\u001a\"bi\u000eDGc\u0001-gQ\")q-\u0003a\u0001%\u00069!-\u0019;dQ&#\u0007\"B5\n\u0001\u0004Q\u0017\u0001\u00023bi\u0006\u0004\"a\u001b<\u000f\u00051$hBA7t\u001d\tq'O\u0004\u0002pc:\u00111\b]\u0005\u0002S%\u0011q\u0005K\u0005\u0003K\u0019J!a\t\u0013\n\u0005U\u0014\u0013a\u00029bG.\fw-Z\u0005\u0003ob\u0014\u0011\u0002R1uC\u001a\u0013\u0018-\\3\u000b\u0005U\u0014\u0003")
/* loaded from: input_file:tech/ytsaurus/spyt/streaming/YtStreamingSink.class */
public class YtStreamingSink implements Sink, Logging {
    private final SQLContext sqlContext;
    private final String queuePath;
    private final Map<String, String> parameters;
    private volatile long latestBatchId;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void initializeForcefully(boolean z, boolean z2) {
        Logging.initializeForcefully$(this, z, z2);
    }

    public String name() {
        return Sink.name$(this);
    }

    public StructType schema() {
        return Sink.schema$(this);
    }

    public Set<TableCapability> capabilities() {
        return Sink.capabilities$(this);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    private long latestBatchId() {
        return this.latestBatchId;
    }

    private void latestBatchId_$eq(long j) {
        this.latestBatchId = j;
    }

    public String toString() {
        return "YtStreamingSink";
    }

    public void addBatch(long j, Dataset<Row> dataset) {
        if (j <= latestBatchId()) {
            logInfo(() -> {
                return new StringBuilder(33).append("Skipping already committed batch ").append(j).toString();
            });
            return;
        }
        SparkContext sparkContext = this.sqlContext.sparkSession().sparkContext();
        Broadcast broadcast = sparkContext.broadcast(YtClientConfigurationConverter$.MODULE$.ytClientConfiguration(sparkContext.getConf()), ClassTag$.MODULE$.apply(YtClientConfiguration.class));
        SparkYtWriteConfiguration apply = SparkYtWriteConfiguration$.MODULE$.apply(this.sqlContext);
        Broadcast broadcast2 = sparkContext.broadcast(apply.copy(apply.copy$default$1(), apply.copy$default$2(), Integer.MAX_VALUE, apply.copy$default$4(), apply.copy$default$5()), ClassTag$.MODULE$.apply(SparkYtWriteConfiguration.class));
        Broadcast broadcast3 = sparkContext.broadcast(YPathEnriched$.MODULE$.fromPath(new Path(this.queuePath), YPathEnriched$.MODULE$.fromPath$default$2()), ClassTag$.MODULE$.apply(YPathEnriched.class));
        Broadcast broadcast4 = sparkContext.broadcast(this.parameters, ClassTag$.MODULE$.apply(Map.class));
        Broadcast broadcast5 = sparkContext.broadcast(dataset.schema(), ClassTag$.MODULE$.apply(StructType.class));
        dataset.queryExecution().toRdd().foreachPartition(iterator -> {
            $anonfun$addBatch$2(broadcast, broadcast3, broadcast5, broadcast2, broadcast4, iterator);
            return BoxedUnit.UNIT;
        });
        latestBatchId_$eq(j);
    }

    public static final /* synthetic */ void $anonfun$addBatch$2(Broadcast broadcast, Broadcast broadcast2, Broadcast broadcast3, Broadcast broadcast4, Broadcast broadcast5, Iterator iterator) {
        if (iterator.hasNext()) {
            YtDynamicTableWriter ytDynamicTableWriter = new YtDynamicTableWriter((YPathEnriched) broadcast2.value(), (StructType) broadcast3.value(), (SparkYtWriteConfiguration) broadcast4.value(), (Map) broadcast5.value(), YtClientProvider$.MODULE$.ytClient((YtClientConfiguration) broadcast.value()));
            try {
                iterator.foreach(internalRow -> {
                    ytDynamicTableWriter.write(internalRow);
                    return BoxedUnit.UNIT;
                });
            } finally {
                ytDynamicTableWriter.close();
            }
        }
    }

    public YtStreamingSink(SQLContext sQLContext, String str, Map<String, String> map) {
        this.sqlContext = sQLContext;
        this.queuePath = str;
        this.parameters = map;
        Sink.$init$(this);
        Logging.$init$(this);
        this.latestBatchId = -1L;
    }
}
