package org.apache.kyuubi.engine.flink;

import java.io.File;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.client.cli.GenericCLI;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.client.cli.CliOptionsParser;
import org.apache.flink.table.gateway.service.context.DefaultContext;
import org.apache.flink.table.gateway.service.context.SessionContext;
import org.apache.flink.table.gateway.service.result.ResultFetcher;
import org.apache.flink.table.gateway.service.session.Session;
import org.apache.flink.util.JarUtils;
import org.apache.kyuubi.KyuubiException;
import org.apache.kyuubi.KyuubiException$;
import org.apache.kyuubi.Logging;
import org.apache.kyuubi.util.SemanticVersion;
import org.apache.kyuubi.util.SemanticVersion$;
import org.apache.kyuubi.util.reflect.DynConstructors;
import org.apache.kyuubi.util.reflect.ReflectUtils$;
import org.slf4j.Logger;
import scala.Array$;
import scala.Function0;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.convert.ImplicitConversions$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.immutable.Set$;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: FlinkEngineUtils.scala */
/* loaded from: input_file:org/apache/kyuubi/engine/flink/FlinkEngineUtils$.class */
public final class FlinkEngineUtils$ implements Logging {
    public static FlinkEngineUtils$ MODULE$;
    private final Options EMBEDDED_MODE_CLIENT_OPTIONS;
    private final SemanticVersion FLINK_RUNTIME_VERSION;
    private transient Logger org$apache$kyuubi$Logging$$log_;

    static {
        new FlinkEngineUtils$();
    }

    @Override // org.apache.kyuubi.Logging
    public String loggerName() {
        String loggerName;
        loggerName = loggerName();
        return loggerName;
    }

    @Override // org.apache.kyuubi.Logging
    public Logger logger() {
        Logger logger;
        logger = logger();
        return logger;
    }

    @Override // org.apache.kyuubi.Logging
    public void debug(Function0<Object> function0) {
        debug(function0);
    }

    @Override // org.apache.kyuubi.Logging
    public void debug(Function0<Object> function0, Throwable th) {
        debug(function0, th);
    }

    @Override // org.apache.kyuubi.Logging
    public void info(Function0<Object> function0) {
        info(function0);
    }

    @Override // org.apache.kyuubi.Logging
    public void info(Function0<Object> function0, Throwable th) {
        info(function0, th);
    }

    @Override // org.apache.kyuubi.Logging
    public void warn(Function0<Object> function0) {
        warn(function0);
    }

    @Override // org.apache.kyuubi.Logging
    public void warn(Function0<Object> function0, Throwable th) {
        warn(function0, th);
    }

    @Override // org.apache.kyuubi.Logging
    public void error(Function0<Object> function0, Throwable th) {
        error(function0, th);
    }

    @Override // org.apache.kyuubi.Logging
    public void error(Function0<Object> function0) {
        error(function0);
    }

    @Override // org.apache.kyuubi.Logging
    public void initializeLoggerIfNecessary(boolean z) {
        initializeLoggerIfNecessary(z);
    }

    @Override // org.apache.kyuubi.Logging
    public Logger org$apache$kyuubi$Logging$$log_() {
        return this.org$apache$kyuubi$Logging$$log_;
    }

    @Override // org.apache.kyuubi.Logging
    public void org$apache$kyuubi$Logging$$log__$eq(Logger logger) {
        this.org$apache$kyuubi$Logging$$log_ = logger;
    }

    public Options EMBEDDED_MODE_CLIENT_OPTIONS() {
        return this.EMBEDDED_MODE_CLIENT_OPTIONS;
    }

    private Set<SemanticVersion> SUPPORTED_FLINK_VERSIONS() {
        return (Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{"1.16", "1.17", "1.18"})).map(str -> {
            return SemanticVersion$.MODULE$.apply(str);
        }, Set$.MODULE$.canBuildFrom());
    }

    public SemanticVersion FLINK_RUNTIME_VERSION() {
        return this.FLINK_RUNTIME_VERSION;
    }

    public void checkFlinkVersion() {
        String version = EnvironmentInformation.getVersion();
        if (!SUPPORTED_FLINK_VERSIONS().contains(FLINK_RUNTIME_VERSION())) {
            throw new UnsupportedOperationException(new StringBuilder(72).append("You are using unsupported Flink version ").append(version).append(", ").append("only Flink ").append(SUPPORTED_FLINK_VERSIONS().mkString(", ")).append(" are supported now.").toString());
        }
        info(() -> {
            return new StringBuilder(29).append("The current Flink version is ").append(version).toString();
        });
    }

    private List<URL> discoverDependencies(List<URL> list, List<URL> list2) {
        ArrayList arrayList = new ArrayList();
        try {
            ImplicitConversions$.MODULE$.list$u0020asScalaBuffer(list).foreach(url -> {
                return BoxesRunTime.boxToBoolean($anonfun$discoverDependencies$1(arrayList, url));
            });
            ImplicitConversions$.MODULE$.list$u0020asScalaBuffer(list2).foreach(url2 -> {
                $anonfun$discoverDependencies$2(arrayList, url2);
                return BoxedUnit.UNIT;
            });
            return arrayList;
        } catch (Exception e) {
            throw new SqlClientException("Could not load all required JAR files.", e);
        }
    }

    public DefaultContext getDefaultContext(String[] strArr, Configuration configuration, String str) {
        CommandLine parse = new DefaultParser().parse(EMBEDDED_MODE_CLIENT_OPTIONS(), strArr, true);
        List<URL> discoverDependencies = discoverDependencies((List) Option$.MODULE$.apply(checkUrls(parse, CliOptionsParser.OPTION_JAR)).getOrElse(() -> {
            return Collections.emptyList();
        }), (List) Option$.MODULE$.apply(checkUrls(parse, CliOptionsParser.OPTION_LIBRARY)).getOrElse(() -> {
            return Collections.emptyList();
        }));
        if (FLINK_RUNTIME_VERSION().$eq$eq$eq("1.16")) {
            return (DefaultContext) DynConstructors.builder().impl(DefaultContext.class, Configuration.class, List.class).build().newInstance(configuration, (List) JavaConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(new GenericCLI(configuration, str), new $colon.colon(new DefaultCLI(), Nil$.MODULE$))).asJava());
        }
        if (FLINK_RUNTIME_VERSION().$greater$eq("1.17")) {
            return (DefaultContext) ReflectUtils$.MODULE$.invokeAs(DefaultContext.class, "load", Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2(Configuration.class, configuration), new Tuple2(List.class, discoverDependencies), new Tuple2(Boolean.TYPE, Boolean.TRUE), new Tuple2(Boolean.TYPE, Boolean.FALSE)}));
        }
        throw new KyuubiException(new StringBuilder(43).append("Flink version ").append(EnvironmentInformation.getVersion()).append(" are not supported currently.").toString(), KyuubiException$.MODULE$.$lessinit$greater$default$2());
    }

    public SessionContext getSessionContext(Session session) {
        return (SessionContext) ReflectUtils$.MODULE$.getField(session, "sessionContext");
    }

    public Option<JobID> getResultJobId(ResultFetcher resultFetcher) {
        if (FLINK_RUNTIME_VERSION().$less$eq("1.16")) {
            return None$.MODULE$;
        }
        try {
            return Option$.MODULE$.apply(ReflectUtils$.MODULE$.getField(resultFetcher, "jobID"));
        } catch (NullPointerException unused) {
            return None$.MODULE$;
        } catch (Throwable th) {
            throw new IllegalStateException("Unexpected error occurred while fetching query ID", th);
        }
    }

    public String checkSessionId(CommandLine commandLine) {
        String optionValue = commandLine.getOptionValue(CliOptionsParser.OPTION_SESSION.getOpt());
        if (optionValue == null || optionValue.matches("[a-zA-Z0-9_\\-.]+")) {
            return optionValue;
        }
        throw new SqlClientException("Session identifier must only consists of 'a-zA-Z0-9_-.'.");
    }

    public URL checkUrl(CommandLine commandLine, org.apache.commons.cli.Option option) {
        List<URL> checkUrls = checkUrls(commandLine, option);
        if (checkUrls == null || !ImplicitConversions$.MODULE$.list$u0020asScalaBuffer(checkUrls).nonEmpty()) {
            return null;
        }
        return (URL) ImplicitConversions$.MODULE$.list$u0020asScalaBuffer(checkUrls).head();
    }

    public List<URL> checkUrls(CommandLine commandLine, org.apache.commons.cli.Option option) {
        if (commandLine.hasOption(option.getOpt())) {
            return ImplicitConversions$.MODULE$.seq$u0020AsJavaList(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(commandLine.getOptionValues(option.getOpt()))).distinct())).map(str -> {
                CliOptionsParser.checkFilePath(str);
                try {
                    return Path.fromLocalFile(new File(str).getAbsoluteFile()).toUri().toURL();
                } catch (Exception e) {
                    throw new SqlClientException(new StringBuilder(28).append("Invalid path for option '").append(option.getLongOpt()).append("': ").append(str).toString(), e);
                }
            }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(URL.class))))).toList());
        }
        return null;
    }

    public static final /* synthetic */ boolean $anonfun$discoverDependencies$1(List list, URL url) {
        JarUtils.checkJarFile(url);
        return list.add(url);
    }

    public static final /* synthetic */ boolean $anonfun$discoverDependencies$3(File file) {
        return file.isFile() && file.getAbsolutePath().toLowerCase().endsWith(".jar");
    }

    public static final /* synthetic */ boolean $anonfun$discoverDependencies$4(List list, File file) {
        URL url = file.toURI().toURL();
        JarUtils.checkJarFile(url);
        return list.add(url);
    }

    public static final /* synthetic */ void $anonfun$discoverDependencies$2(List list, URL url) {
        File file = new File(url.toURI());
        if (!file.isDirectory()) {
            throw new SqlClientException(new StringBuilder(20).append("Directory expected: ").append(file).toString());
        }
        if (!file.canRead()) {
            throw new SqlClientException(new StringBuilder(26).append("Directory cannot be read: ").append(file).toString());
        }
        File[] listFiles = file.listFiles();
        if (listFiles == null) {
            throw new SqlClientException(new StringBuilder(26).append("Directory cannot be read: ").append(file).toString());
        }
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(listFiles)).filter(file2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$discoverDependencies$3(file2));
        }))).foreach(file3 -> {
            return BoxesRunTime.boxToBoolean($anonfun$discoverDependencies$4(list, file3));
        });
    }

    private FlinkEngineUtils$() {
        MODULE$ = this;
        Logging.$init$(this);
        this.EMBEDDED_MODE_CLIENT_OPTIONS = CliOptionsParser.getEmbeddedModeClientOptions(new Options());
        this.FLINK_RUNTIME_VERSION = SemanticVersion$.MODULE$.apply(EnvironmentInformation.getVersion());
    }
}
