package org.jetbrains.kotlinx.spark.api.jupyter;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import kotlin.Metadata;
import kotlin.collections.ArraysKt;
import kotlin.collections.CollectionsKt;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.Reflection;
import kotlin.reflect.KTypeProjection;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.kotlinx.jupyter.api.FieldsHandlingKt;
import org.jetbrains.kotlinx.jupyter.api.KotlinKernelHost;
import org.jetbrains.kotlinx.jupyter.api.SubtypeThrowableRenderer;
import org.jetbrains.kotlinx.jupyter.api.VariableDeclaration;
import org.jetbrains.kotlinx.jupyter.api.libraries.JupyterIntegration;

/* compiled from: SparkStreamingIntegration.kt */
@Metadata(mv = {1, 6, 0}, k = 1, xi = 48, d1 = {"��<\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u0011\n\u0002\u0010\u000e\n\u0002\b\u0004\n\u0002\u0010#\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u0003\n��\n\u0002\u0010\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\b��\u0018��2\u00020\u0001B\u0005¢\u0006\u0002\u0010\u0002J\u0010\u0010\f\u001a\u00020\u00052\u0006\u0010\r\u001a\u00020\u000eH\u0002J\f\u0010\u000f\u001a\u00020\u0010*\u00020\u0011H\u0016J\f\u0010\u0012\u001a\u00020\u0010*\u00020\u0011H\u0016J\f\u0010\u0013\u001a\u00020\u0010*\u00020\u0014H\u0016R\u001c\u0010\u0003\u001a\b\u0012\u0004\u0012\u00020\u00050\u0004X\u0096\u0004¢\u0006\n\n\u0002\u0010\b\u001a\u0004\b\u0006\u0010\u0007R\u0014\u0010\t\u001a\b\u0012\u0004\u0012\u00020\u000b0\nX\u0082\u0004¢\u0006\u0002\n��¨\u0006\u0015"}, d2 = {"Lorg/jetbrains/kotlinx/spark/api/jupyter/SparkStreamingIntegration;", "Lorg/jetbrains/kotlinx/spark/api/jupyter/Integration;", "()V", "imports", "", "", "getImports", "()[Ljava/lang/String;", "[Ljava/lang/String;", "sscCollection", "", "Lorg/apache/spark/streaming/api/java/JavaStreamingContext;", "cleanUp", "e", "", "onInterrupt", "", "Lorg/jetbrains/kotlinx/jupyter/api/KotlinKernelHost;", "onLoaded", "onLoadedAlsoDo", "Lorg/jetbrains/kotlinx/jupyter/api/libraries/JupyterIntegration$Builder;", "kotlin-spark-api-jupyter-3.1"})
/* loaded from: input_file:org/jetbrains/kotlinx/spark/api/jupyter/SparkStreamingIntegration.class */
public final class SparkStreamingIntegration extends Integration {

    @NotNull
    private final String[] imports = (String[]) ArraysKt.plus(super.getImports(), new String[]{"org.apache.spark.deploy.SparkHadoopUtil", "org.apache.hadoop.conf.Configuration"});

    @NotNull
    private final Set<JavaStreamingContext> sscCollection = new LinkedHashSet();

    @Override // org.jetbrains.kotlinx.spark.api.jupyter.Integration
    @NotNull
    public String[] getImports() {
        return this.imports;
    }

    @Override // org.jetbrains.kotlinx.spark.api.jupyter.Integration
    public void onLoaded(@NotNull KotlinKernelHost kotlinKernelHost) {
        Intrinsics.checkNotNullParameter(kotlinKernelHost, "<this>");
        FieldsHandlingKt.declare(kotlinKernelHost, new VariableDeclaration[]{new VariableDeclaration("sscCollection", this.sscCollection, Reflection.mutableCollectionType(Reflection.typeOf(Set.class, KTypeProjection.Companion.invariant(Reflection.typeOf(JavaStreamingContext.class)))), false)});
        kotlinKernelHost.execute("%dumpClassesForSpark");
        List listOf = CollectionsKt.listOf(new String[]{"@JvmOverloads\nfun withSparkStreaming(\n    batchDuration: Duration = Durations.seconds(1L),\n    checkpointPath: String? = null,\n    hadoopConf: Configuration = SparkHadoopUtil.get().conf(),\n    createOnError: Boolean = false,\n    props: Map<String, Any> = emptyMap(),\n    master: String = SparkConf().get(\"spark.master\", \"local[*]\"),\n    appName: String = \"Kotlin Spark Sample\",\n    timeout: Long = -1L,\n    startStreamingContext: Boolean = true,\n    func: KSparkStreamingSession.() -> Unit,\n) {\n\n    // will only be set when a new context is created\n    var kSparkStreamingSession: KSparkStreamingSession? = null\n\n    val creatingFunc = {\n        val sc = SparkConf()\n            .setAppName(appName)\n            .setMaster(master)\n            .setAll(\n                props\n                    .map { (key, value) -> key X value.toString() }\n                    .asScalaIterable()\n            )\n\n        val ssc = JavaStreamingContext(sc, batchDuration)\n        ssc.checkpoint(checkpointPath)\n\n        kSparkStreamingSession = KSparkStreamingSession(ssc)\n        func(kSparkStreamingSession!!)\n\n        ssc\n    }\n\n    val ssc = when {\n        checkpointPath != null ->\n            JavaStreamingContext.getOrCreate(checkpointPath, creatingFunc, hadoopConf, createOnError)\n\n        else -> creatingFunc()\n    }\n    sscCollection += ssc\n\n    if (startStreamingContext) {\n        ssc.start()\n        kSparkStreamingSession?.invokeRunAfterStart()\n    }\n    ssc.awaitTerminationOrTimeout(timeout)\n    ssc.stop()\n}", "println(\"To start a spark streaming session, simply use `withSparkStreaming { }` inside a cell. To use Spark normally, use `withSpark { }` in a cell, or use `%use spark` to start a Spark session for the whole notebook.\")"});
        ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(listOf, 10));
        Iterator it = listOf.iterator();
        while (it.hasNext()) {
            arrayList.add(kotlinKernelHost.execute((String) it.next()));
        }
    }

    /*  JADX ERROR: JadxRuntimeException in pass: BlockSplitter
        jadx.core.utils.exceptions.JadxRuntimeException: Unexpected missing predecessor for block: B:10:0x0031
        	at jadx.core.dex.visitors.blocks.BlockSplitter.addTempConnectionsForExcHandlers(BlockSplitter.java:275)
        	at jadx.core.dex.visitors.blocks.BlockSplitter.visit(BlockSplitter.java:68)
        */
    /* JADX INFO: Access modifiers changed from: private */
    public final java.lang.String cleanUp(java.lang.Throwable r5) {
        /*
            r4 = this;
        L0:
            r0 = r4
            java.util.Set<org.apache.spark.streaming.api.java.JavaStreamingContext> r0 = r0.sscCollection
            java.util.Collection r0 = (java.util.Collection) r0
            boolean r0 = r0.isEmpty()
            if (r0 != 0) goto L13
            r0 = 1
            goto L14
        L13:
            r0 = 0
        L14:
            if (r0 == 0) goto L4e
            r0 = r4
            java.util.Set<org.apache.spark.streaming.api.java.JavaStreamingContext> r0 = r0.sscCollection
            java.lang.Iterable r0 = (java.lang.Iterable) r0
            java.lang.Object r0 = kotlin.collections.CollectionsKt.first(r0)
            org.apache.spark.streaming.api.java.JavaStreamingContext r0 = (org.apache.spark.streaming.api.java.JavaStreamingContext) r0
            r6 = r0
            r0 = 0
            r7 = r0
        L27:
            r0 = r6
            org.apache.spark.streaming.StreamingContextState r0 = r0.getState()
            org.apache.spark.streaming.StreamingContextState r1 = org.apache.spark.streaming.StreamingContextState.STOPPED
            if (r0 == r1) goto L40
        L32:
            r0 = r6
            r1 = 1
            r2 = 1
            r0.stop(r1, r2)     // Catch: java.lang.Exception -> L3b
            goto L27
        L3b:
            r8 = move-exception
            goto L27
        L40:
            r0 = r4
            java.util.Set<org.apache.spark.streaming.api.java.JavaStreamingContext> r0 = r0.sscCollection
            r1 = r6
            boolean r0 = r0.remove(r1)
            goto L0
        L4e:
            java.lang.StringBuilder r0 = new java.lang.StringBuilder
            r1 = r0
            r1.<init>()
            java.lang.String r1 = "Spark streams cleaned up. Cause: "
            java.lang.StringBuilder r0 = r0.append(r1)
            r1 = r5
            java.lang.StringBuilder r0 = r0.append(r1)
            java.lang.String r0 = r0.toString()
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: org.jetbrains.kotlinx.spark.api.jupyter.SparkStreamingIntegration.cleanUp(java.lang.Throwable):java.lang.String");
    }

    @Override // org.jetbrains.kotlinx.spark.api.jupyter.Integration
    public void onLoadedAlsoDo(@NotNull JupyterIntegration.Builder builder) {
        Intrinsics.checkNotNullParameter(builder, "<this>");
        builder.addThrowableRenderer(new SubtypeThrowableRenderer(Reflection.getOrCreateKotlinClass(IllegalMonitorStateException.class), new Function1<IllegalMonitorStateException, Object>() { // from class: org.jetbrains.kotlinx.spark.api.jupyter.SparkStreamingIntegration$onLoadedAlsoDo$1
            /* JADX INFO: Access modifiers changed from: package-private */
            {
                super(1);
            }

            @NotNull
            public final Object invoke(@NotNull IllegalMonitorStateException illegalMonitorStateException) {
                String cleanUp;
                Intrinsics.checkNotNullParameter(illegalMonitorStateException, "it");
                cleanUp = SparkStreamingIntegration.this.cleanUp(illegalMonitorStateException);
                return cleanUp;
            }
        }));
    }

    @Override // org.jetbrains.kotlinx.spark.api.jupyter.Integration
    public void onInterrupt(@NotNull KotlinKernelHost kotlinKernelHost) {
        Intrinsics.checkNotNullParameter(kotlinKernelHost, "<this>");
        System.out.println((Object) cleanUp(new InterruptedException("Kernel was interrupted.")));
    }
}
