package org.apache.kyuubi.engine.flink;

import java.io.File;
import java.io.FilenameFilter;
import java.lang.ProcessBuilder;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.kyuubi.KyuubiException;
import org.apache.kyuubi.KyuubiException$;
import org.apache.kyuubi.KyuubiFunSuite;
import org.apache.kyuubi.config.KyuubiConf;
import org.apache.kyuubi.config.KyuubiConf$;
import org.apache.kyuubi.ha.HighAvailabilityConf$;
import org.apache.kyuubi.package$;
import org.apache.kyuubi.util.JavaUtils;
import org.apache.kyuubi.util.command.CommandLineUtils$;
import org.apache.kyuubi.zookeeper.EmbeddedZookeeper;
import org.apache.kyuubi.zookeeper.ZookeeperConf$;
import org.scalactic.Bool$;
import org.scalactic.Prettifier$;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import scala.Array$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Seq$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.collection.mutable.LinkedHashSet;
import scala.collection.mutable.ListBuffer;
import scala.collection.mutable.Map$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: WithFlinkSQLEngineLocal.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005]ba\u0002\f\u0018!\u0003\r\tA\t\u0005\u0006g\u0001!\t\u0001\u000e\u0005\bw\u0001\u0011\r\u0011\"\u0005=\u0011%!\u0005\u00011AA\u0002\u0013EQ\tC\u0005O\u0001\u0001\u0007\t\u0019!C\t\u001f\"I!\u000b\u0001a\u0001\u0002\u0004%\tb\u0015\u0005\n9\u0002\u0001\r\u00111A\u0005\u0012uC\u0011b\u0018\u0001A\u0002\u0003\u0007I\u0011\u00021\t\u0013\u001d\u0004\u0001\u0019!a\u0001\n\u0013A\u0007b\u00026\u0001\u0005\u0004%\tb\u001b\u0005\u0006e\u00021\tb\u001d\u0005\u0007\u007f\u00021\t!!\u0001\t\u0015\u0005%\u0001\u00011AA\u0002\u0013E1\u000fC\u0006\u0002\f\u0001\u0001\r\u00111A\u0005\u0012\u00055\u0001BBA\t\u0001\u0011\u0005C\u0007\u0003\u0004\u0002\u0014\u0001!\t\u0005\u000e\u0005\b\u0003+\u0001A\u0011AA\f\u0011\u0019\ti\u0002\u0001C\u0005i!1\u0011q\u0004\u0001\u0005\u0012MDq!!\t\u0001\t\u0003\t\u0019\u0003C\u0007\u00020\u0001\u0001\n1!A\u0001\n\u0013!\u0014\u0011\u0007\u0005\u000e\u0003g\u0001\u0001\u0013aA\u0001\u0002\u0013%A'!\u000e\u0003/]KG\u000f\u001b$mS:\\7+\u0015'F]\u001eLg.\u001a'pG\u0006d'B\u0001\r\u001a\u0003\u00151G.\u001b8l\u0015\tQ2$\u0001\u0004f]\u001eLg.\u001a\u0006\u00039u\taa[=vk\nL'B\u0001\u0010 \u0003\u0019\t\u0007/Y2iK*\t\u0001%A\u0002pe\u001e\u001c\u0001a\u0005\u0003\u0001G-z\u0003C\u0001\u0013*\u001b\u0005)#B\u0001\u0014(\u0003!1WO\\:vSR,'B\u0001\u0015 \u0003%\u00198-\u00197bi\u0016\u001cH/\u0003\u0002+K\tY\u0011I\\=Gk:\u001cV/\u001b;f!\taS&D\u0001\u001c\u0013\tq3D\u0001\bLsV,(-\u001b$v]N+\u0018\u000e^3\u0011\u0005A\nT\"A\f\n\u0005I:\"AF,ji\"4E.\u001b8l)\u0016\u001cHOU3t_V\u00148-Z:\u0002\r\u0011Jg.\u001b;%)\u0005)\u0004C\u0001\u001c:\u001b\u00059$\"\u0001\u001d\u0002\u000bM\u001c\u0017\r\\1\n\u0005i:$\u0001B+oSR\f1B\u001a7j].\u001cuN\u001c4jOV\tQ\b\u0005\u0002?\u00056\tqH\u0003\u0002A\u0003\u0006i1m\u001c8gS\u001e,(/\u0019;j_:T!\u0001G\u000f\n\u0005\r{$!D\"p]\u001aLw-\u001e:bi&|g.A\u0006nS:L7\t\\;ti\u0016\u0014X#\u0001$\u0011\u0005\u001dcU\"\u0001%\u000b\u0005%S\u0015aC7j]&\u001cG.^:uKJT!aS!\u0002\u000fI,h\u000e^5nK&\u0011Q\n\u0013\u0002\f\u001b&t\u0017n\u00117vgR,'/A\bnS:L7\t\\;ti\u0016\u0014x\fJ3r)\t)\u0004\u000bC\u0004R\t\u0005\u0005\t\u0019\u0001$\u0002\u0007a$\u0013'A\u0007f]\u001eLg.\u001a)s_\u000e,7o]\u000b\u0002)B\u0011QKW\u0007\u0002-*\u0011q\u000bW\u0001\u0005Y\u0006twMC\u0001Z\u0003\u0011Q\u0017M^1\n\u0005m3&a\u0002)s_\u000e,7o]\u0001\u0012K:<\u0017N\\3Qe>\u001cWm]:`I\u0015\fHCA\u001b_\u0011\u001d\tf!!AA\u0002Q\u000b\u0001B_6TKJ4XM]\u000b\u0002CB\u0011!-Z\u0007\u0002G*\u0011AmG\u0001\nu>|7.Z3qKJL!AZ2\u0003#\u0015k'-\u001a3eK\u0012Tvn\\6fKB,'/\u0001\u0007{WN+'O^3s?\u0012*\u0017\u000f\u0006\u00026S\"9\u0011\u000bCA\u0001\u0002\u0004\t\u0017\u0001B2p]\u001a,\u0012\u0001\u001c\t\u0003[Bl\u0011A\u001c\u0006\u0003_n\taaY8oM&<\u0017BA9o\u0005)Y\u00150^;cS\u000e{gNZ\u0001\fK:<\u0017N\\3SK\u001aLE-F\u0001u!\t)HP\u0004\u0002wuB\u0011qoN\u0007\u0002q*\u0011\u00110I\u0001\u0007yI|w\u000e\u001e \n\u0005m<\u0014A\u0002)sK\u0012,g-\u0003\u0002~}\n11\u000b\u001e:j]\u001eT!a_\u001c\u0002\u001d]LG\u000f[&zkV\u0014\u0017nQ8oMV\u0011\u00111\u0001\t\u0006k\u0006\u0015A\u000f^\u0005\u0004\u0003\u000fq(aA'ba\u0006i1m\u001c8oK\u000e$\u0018n\u001c8Ve2\f\u0011cY8o]\u0016\u001cG/[8o+Jdw\fJ3r)\r)\u0014q\u0002\u0005\b#6\t\t\u00111\u0001u\u0003%\u0011WMZ8sK\u0006cG.\u0001\u0005bMR,'/\u00117m\u0003A\u0019H/\u0019:u\r2Lgn[#oO&tW\rF\u00026\u00033Aq!a\u0007\u0011\u0001\u0004\t\u0019!\u0001\u0003f]Z\u001c\u0018\u0001E:uCJ$X*\u001b8j\u00072,8\u000f^3s\u0003)9W\r\u001e&eE\u000e,&\u000f\\\u0001\r[\u0006LgNU3t_V\u00148-\u001a\u000b\u0005\u0003K\tY\u0003\u0005\u00037\u0003O!\u0018bAA\u0015o\t1q\n\u001d;j_:Dq!!\f\u0014\u0001\u0004\t\u0019!A\u0002f]Z\fqb];qKJ$#-\u001a4pe\u0016\fE\u000e\\\u0005\u0004\u0003#i\u0013AD:va\u0016\u0014H%\u00194uKJ\fE\u000e\\\u0005\u0004\u0003'i\u0003")
/* loaded from: input_file:org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.class */
public interface WithFlinkSQLEngineLocal extends KyuubiFunSuite, WithFlinkTestResources {
    void org$apache$kyuubi$engine$flink$WithFlinkSQLEngineLocal$_setter_$flinkConfig_$eq(Configuration configuration);

    void org$apache$kyuubi$engine$flink$WithFlinkSQLEngineLocal$_setter_$conf_$eq(KyuubiConf kyuubiConf);

    /* synthetic */ void org$apache$kyuubi$engine$flink$WithFlinkSQLEngineLocal$$super$beforeAll();

    /* synthetic */ void org$apache$kyuubi$engine$flink$WithFlinkSQLEngineLocal$$super$afterAll();

    Configuration flinkConfig();

    MiniCluster miniCluster();

    void miniCluster_$eq(MiniCluster miniCluster);

    Process engineProcess();

    void engineProcess_$eq(Process process);

    EmbeddedZookeeper org$apache$kyuubi$engine$flink$WithFlinkSQLEngineLocal$$zkServer();

    void org$apache$kyuubi$engine$flink$WithFlinkSQLEngineLocal$$zkServer_$eq(EmbeddedZookeeper embeddedZookeeper);

    KyuubiConf conf();

    String engineRefId();

    Map<String, String> withKyuubiConf();

    String connectionUrl();

    void connectionUrl_$eq(String str);

    default void beforeAll() {
        withKyuubiConf().foreach(tuple2 -> {
            $anonfun$beforeAll$1(this, tuple2);
            return BoxedUnit.UNIT;
        });
        withKyuubiConf().foreach(tuple22 -> {
            if (tuple22 == null) {
                throw new MatchError(tuple22);
            }
            return this.conf().set((String) tuple22._1(), (String) tuple22._2());
        });
        org$apache$kyuubi$engine$flink$WithFlinkSQLEngineLocal$$zkServer_$eq(new EmbeddedZookeeper());
        conf().set(ZookeeperConf$.MODULE$.ZK_CLIENT_PORT(), BoxesRunTime.boxToInteger(0)).set(ZookeeperConf$.MODULE$.ZK_CLIENT_PORT_ADDRESS(), "localhost");
        org$apache$kyuubi$engine$flink$WithFlinkSQLEngineLocal$$zkServer().initialize(conf());
        org$apache$kyuubi$engine$flink$WithFlinkSQLEngineLocal$$zkServer().start();
        conf().set(HighAvailabilityConf$.MODULE$.HA_ADDRESSES(), org$apache$kyuubi$engine$flink$WithFlinkSQLEngineLocal$$zkServer().getConnectString());
        scala.collection.mutable.Map apply = Map$.MODULE$.apply(Nil$.MODULE$);
        File[] listFiles = Paths.get((String) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(JavaUtils.getCodeSourceLocation(getClass()).split("externals"))).head(), "externals", "kyuubi-download", "target").toFile().listFiles(file -> {
            return file.getName().contains("flink");
        });
        None$ headOption = listFiles == null ? None$.MODULE$ : new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(listFiles)).map(file2 -> {
            return file2.toPath();
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Path.class))))).headOption();
        if (headOption.isDefined()) {
            apply.update("FLINK_HOME", headOption.get().toString());
            apply.update("FLINK_CONF_DIR", Paths.get(headOption.get().toString(), "conf").toString());
        }
        apply.update("JAVA_HOME", System.getProperty("java.home"));
        apply.update("JAVA_EXEC", Paths.get((String) apply.apply("JAVA_HOME"), "bin", "java").toString());
        startMiniCluster();
        startFlinkEngine(apply.toMap(Predef$.MODULE$.$conforms()));
        org$apache$kyuubi$engine$flink$WithFlinkSQLEngineLocal$$super$beforeAll();
    }

    default void afterAll() {
        org$apache$kyuubi$engine$flink$WithFlinkSQLEngineLocal$$super$afterAll();
        if (engineProcess() != null) {
            engineProcess().destroy();
            engineProcess_$eq(null);
        }
        if (miniCluster() != null) {
            miniCluster().close();
            miniCluster_$eq(null);
        }
        if (org$apache$kyuubi$engine$flink$WithFlinkSQLEngineLocal$$zkServer() != null) {
            org$apache$kyuubi$engine$flink$WithFlinkSQLEngineLocal$$zkServer().stop();
            org$apache$kyuubi$engine$flink$WithFlinkSQLEngineLocal$$zkServer_$eq(null);
        }
    }

    default void startFlinkEngine(Map<String, String> map) {
        String str = (String) map.apply("FLINK_HOME");
        ProcessBuilder processBuilder = new ProcessBuilder(new String[0]);
        processBuilder.environment().putAll((java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(map).asJava());
        conf().set(KyuubiConf$.MODULE$.ENGINE_FLINK_EXTRA_CLASSPATH(), udfJar().getAbsolutePath());
        ListBuffer listBuffer = new ListBuffer();
        listBuffer.$plus$eq(map.apply("JAVA_EXEC"));
        listBuffer.$plus$eq(new StringBuilder(4).append("-Xmx").append((String) conf().get(KyuubiConf$.MODULE$.ENGINE_FLINK_MEMORY())).toString());
        Option filter = ((Option) conf().get(KyuubiConf$.MODULE$.ENGINE_FLINK_JAVA_OPTIONS())).filter(str2 -> {
            return BoxesRunTime.boxToBoolean(StringUtils.isNotBlank(str2));
        });
        if (filter.isDefined()) {
            listBuffer.$plus$eq(filter.get());
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        mainResource(map).foreach(str3 -> {
            return BoxesRunTime.boxToBoolean(linkedHashSet.add(str3));
        });
        final WithFlinkSQLEngineLocal withFlinkSQLEngineLocal = null;
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(Paths.get(str, new String[0]).resolve("opt").toFile().listFiles(new FilenameFilter(withFlinkSQLEngineLocal) { // from class: org.apache.kyuubi.engine.flink.WithFlinkSQLEngineLocal$$anon$1
            @Override // java.io.FilenameFilter
            public boolean accept(File file, String str4) {
                return str4.toLowerCase().startsWith("flink-sql-client") || str4.toLowerCase().startsWith("flink-sql-gateway");
            }
        }))).foreach(file -> {
            return BoxesRunTime.boxToBoolean($anonfun$startFlinkEngine$3(linkedHashSet, file));
        });
        linkedHashSet.add(new StringBuilder(4).append(str).append(File.separator).append("lib").append(File.separator).append("*").toString());
        linkedHashSet.add(map.getOrElse("FLINK_CONF_DIR", () -> {
            return "";
        }));
        String[] strArr = (String[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(System.getProperty("java.class.path").split(":"))).filter(str4 -> {
            return BoxesRunTime.boxToBoolean($anonfun$startFlinkEngine$5(str4));
        });
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(strArr)).foreach(str5 -> {
            return BoxesRunTime.boxToBoolean(linkedHashSet.add(str5));
        });
        Option option = (Option) conf().get(KyuubiConf$.MODULE$.ENGINE_FLINK_EXTRA_CLASSPATH());
        option.foreach(str6 -> {
            return BoxesRunTime.boxToBoolean(linkedHashSet.add(str6));
        });
        if (new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(strArr)).isEmpty() && option.isEmpty()) {
            mainResource(map).foreach(str7 -> {
                return BoxesRunTime.boxToBoolean($anonfun$startFlinkEngine$8(linkedHashSet, str7));
            });
        }
        listBuffer.$plus$plus$eq(CommandLineUtils$.MODULE$.genClasspathOption(linkedHashSet));
        listBuffer.$plus$eq("org.apache.kyuubi.engine.flink.FlinkSQLEngine");
        listBuffer.$plus$plus$eq(CommandLineUtils$.MODULE$.confKeyValues(conf().getAll()));
        processBuilder.command((List<String>) JavaConverters$.MODULE$.seqAsJavaListConverter(listBuffer.toList()).asJava());
        processBuilder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
        processBuilder.redirectError(ProcessBuilder.Redirect.INHERIT);
        info(() -> {
            return "staring flink local engine...";
        });
        engineProcess_$eq(processBuilder.start());
    }

    private default void startMiniCluster() {
        miniCluster_$eq(new MiniCluster(new MiniClusterConfiguration.Builder().setConfiguration(flinkConfig()).setNumSlotsPerTaskManager(1).setNumTaskManagers(2).build()));
        miniCluster().start();
        flinkConfig().setString(RestOptions.ADDRESS, ((URI) miniCluster().getRestAddress().get()).getHost());
        flinkConfig().setInteger(RestOptions.PORT, ((URI) miniCluster().getRestAddress().get()).getPort());
    }

    default String getJdbcUrl() {
        return new StringBuilder(15).append("jdbc:hive2://").append(connectionUrl()).append("/;").toString();
    }

    default Option<String> mainResource(Map<String, String> map) {
        String str = "kyuubi-flink-sql-engine";
        String str2 = "flink";
        String sb = new StringBuilder(6).append("kyuubi-flink-sql-engine").append("_").append(package$.MODULE$.SCALA_COMPILE_VERSION()).append("-").append(package$.MODULE$.KYUUBI_VERSION()).append(".jar").toString();
        return conf().getOption(new StringBuilder(36).append("kyuubi.session.engine.").append("flink").append(".main.resource").toString()).filter(str3 -> {
            return BoxesRunTime.boxToBoolean($anonfun$mainResource$1(str3));
        }).orElse(() -> {
            return ((IterableLike) Option$.MODULE$.option2Iterable(map.get("KYUUBI_HOME")).toSeq().flatMap(str4 -> {
                return new $colon.colon(Paths.get(str4, "externals", "engines", str2, sb), new $colon.colon(Paths.get(str4, "externals", str, "target", sb), Nil$.MODULE$));
            }, Seq$.MODULE$.canBuildFrom())).find(path -> {
                return BoxesRunTime.boxToBoolean($anonfun$mainResource$4(path));
            }).map(path2 -> {
                return path2.toAbsolutePath().toFile().getCanonicalPath();
            });
        }).orElse(() -> {
            String[] split = JavaUtils.getCodeSourceLocation(this.getClass()).split("externals");
            int length = split.length;
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(BoxesRunTime.boxToInteger(length), ">", BoxesRunTime.boxToInteger(1), length > 1, Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("WithFlinkSQLEngineLocal.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 222));
            return Option$.MODULE$.apply(Paths.get((String) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(split)).head(), "externals", str, "target", sb)).map(path -> {
                return path.toAbsolutePath().toFile().getCanonicalPath();
            });
        });
    }

    static /* synthetic */ void $anonfun$beforeAll$1(WithFlinkSQLEngineLocal withFlinkSQLEngineLocal, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        String str = (String) tuple2._1();
        String str2 = (String) tuple2._2();
        if (!str.startsWith("flink.")) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            withFlinkSQLEngineLocal.flinkConfig().setString(new StringOps(Predef$.MODULE$.augmentString(str)).stripPrefix("flink."), str2);
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    static /* synthetic */ boolean $anonfun$startFlinkEngine$3(LinkedHashSet linkedHashSet, File file) {
        return linkedHashSet.add(file.getAbsolutePath());
    }

    static /* synthetic */ boolean $anonfun$startFlinkEngine$5(String str) {
        return !str.contains("flink");
    }

    static /* synthetic */ boolean $anonfun$startFlinkEngine$8(LinkedHashSet linkedHashSet, String str) {
        Path resolve = Paths.get(str, new String[0]).getParent().resolve(new StringBuilder(6).append("scala-").append(package$.MODULE$.SCALA_COMPILE_VERSION()).toString()).resolve("jars");
        if (Files.exists(resolve, new LinkOption[0])) {
            return linkedHashSet.add(new StringBuilder(1).append(resolve).append(File.separator).append("*").toString());
        }
        throw new KyuubiException(new StringBuilder(117).append("The path ").append(resolve).append(" does not exists. ").append("Please set FLINK_HADOOP_CLASSPATH or ").append(KyuubiConf$.MODULE$.ENGINE_FLINK_EXTRA_CLASSPATH().key()).append(" for configuring location of hadoop client jars, etc.").toString(), KyuubiException$.MODULE$.$lessinit$greater$default$2());
    }

    static /* synthetic */ boolean $anonfun$mainResource$1(String str) {
        URI uri = new URI(str);
        if ("file".equals(uri.getScheme() != null ? uri.getScheme() : "file")) {
            return Files.exists(Paths.get(str, new String[0]), new LinkOption[0]);
        }
        return true;
    }

    static /* synthetic */ boolean $anonfun$mainResource$4(Path path) {
        return Files.exists(path, new LinkOption[0]);
    }

    static void $init$(WithFlinkSQLEngineLocal withFlinkSQLEngineLocal) {
        withFlinkSQLEngineLocal.org$apache$kyuubi$engine$flink$WithFlinkSQLEngineLocal$_setter_$flinkConfig_$eq(new Configuration());
        withFlinkSQLEngineLocal.org$apache$kyuubi$engine$flink$WithFlinkSQLEngineLocal$_setter_$conf_$eq(new KyuubiConf(false));
    }
}
