package org.apache.kyuubi.engine.flink;

import java.io.File;
import java.nio.file.Paths;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.gateway.service.context.DefaultContext;
import org.apache.kyuubi.Logging;
import org.apache.kyuubi.Utils$;
import org.apache.kyuubi.config.ConfigEntry;
import org.apache.kyuubi.config.KyuubiConf;
import org.apache.kyuubi.config.KyuubiConf$;
import org.apache.kyuubi.util.SignalRegister$;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.immutable.Map$;
import scala.collection.immutable.StringOps;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.sys.package$;

/* compiled from: FlinkSQLEngine.scala */
/* loaded from: input_file:org/apache/kyuubi/engine/flink/FlinkSQLEngine$.class */
public final class FlinkSQLEngine$ implements Logging, Serializable {
    public static FlinkSQLEngine$ MODULE$;
    private final KyuubiConf kyuubiConf;
    private Option<FlinkSQLEngine> currentEngine;
    private final String user;
    private final CountDownLatch org$apache$kyuubi$engine$flink$FlinkSQLEngine$$countDownLatch;
    private transient Logger org$apache$kyuubi$Logging$$log_;

    static {
        new FlinkSQLEngine$();
    }

    @Override // org.apache.kyuubi.Logging
    public String loggerName() {
        String loggerName;
        loggerName = loggerName();
        return loggerName;
    }

    @Override // org.apache.kyuubi.Logging
    public Logger logger() {
        Logger logger;
        logger = logger();
        return logger;
    }

    @Override // org.apache.kyuubi.Logging
    public void debug(Function0<Object> function0) {
        debug(function0);
    }

    @Override // org.apache.kyuubi.Logging
    public void debug(Function0<Object> function0, Throwable th) {
        debug(function0, th);
    }

    @Override // org.apache.kyuubi.Logging
    public void info(Function0<Object> function0) {
        info(function0);
    }

    @Override // org.apache.kyuubi.Logging
    public void info(Function0<Object> function0, Throwable th) {
        info(function0, th);
    }

    @Override // org.apache.kyuubi.Logging
    public void warn(Function0<Object> function0) {
        warn(function0);
    }

    @Override // org.apache.kyuubi.Logging
    public void warn(Function0<Object> function0, Throwable th) {
        warn(function0, th);
    }

    @Override // org.apache.kyuubi.Logging
    public void error(Function0<Object> function0, Throwable th) {
        error(function0, th);
    }

    @Override // org.apache.kyuubi.Logging
    public void error(Function0<Object> function0) {
        error(function0);
    }

    @Override // org.apache.kyuubi.Logging
    public void initializeLoggerIfNecessary(boolean z) {
        initializeLoggerIfNecessary(z);
    }

    @Override // org.apache.kyuubi.Logging
    public Logger org$apache$kyuubi$Logging$$log_() {
        return this.org$apache$kyuubi$Logging$$log_;
    }

    @Override // org.apache.kyuubi.Logging
    public void org$apache$kyuubi$Logging$$log__$eq(Logger logger) {
        this.org$apache$kyuubi$Logging$$log_ = logger;
    }

    public KyuubiConf kyuubiConf() {
        return this.kyuubiConf;
    }

    public Option<FlinkSQLEngine> currentEngine() {
        return this.currentEngine;
    }

    public void currentEngine_$eq(Option<FlinkSQLEngine> option) {
        this.currentEngine = option;
    }

    private String user() {
        return this.user;
    }

    public CountDownLatch org$apache$kyuubi$engine$flink$FlinkSQLEngine$$countDownLatch() {
        return this.org$apache$kyuubi$engine$flink$FlinkSQLEngine$$countDownLatch;
    }

    public void main(String[] strArr) {
        SignalRegister$.MODULE$.registerLogger(logger());
        info(() -> {
            return new StringBuilder(28).append("Flink SQL engine classpath: ").append(System.getProperty("java.class.path")).toString();
        });
        FlinkEngineUtils$.MODULE$.checkFlinkVersion();
        try {
            kyuubiConf().loadFileDefaults();
            Utils$.MODULE$.fromCommandLineArgs(strArr, kyuubiConf());
            String str = (String) package$.MODULE$.env().getOrElse("FLINK_CONF_DIR", () -> {
                return Paths.get((String) package$.MODULE$.env().getOrElse("FLINK_HOME", () -> {
                    return new File(GlobalConfiguration.class.getProtectionDomain().getCodeSource().getLocation().toURI()).getParentFile().getParent();
                }), "conf").toString();
            });
            Configuration loadConfiguration = GlobalConfiguration.loadConfiguration(str);
            loadConfiguration.addAll(Configuration.fromMap((Map) JavaConverters$.MODULE$.mapAsJavaMapConverter((scala.collection.immutable.Map) kyuubiConf().getAll().filterKeys(str2 -> {
                return BoxesRunTime.boxToBoolean(str2.startsWith("flink."));
            }).map(tuple2 -> {
                if (tuple2 == null) {
                    throw new MatchError(tuple2);
                }
                String str3 = (String) tuple2._1();
                return new Tuple2(new StringOps(Predef$.MODULE$.augmentString(str3)).stripPrefix("flink."), (String) tuple2._2());
            }, Map$.MODULE$.canBuildFrom())).asJava()));
            String string = loadConfiguration.getString(DeploymentOptions.TARGET);
            setDeploymentConf(string, loadConfiguration);
            kyuubiConf().setIfMissing((ConfigEntry<ConfigEntry<Object>>) KyuubiConf$.MODULE$.FRONTEND_THRIFT_BINARY_BIND_PORT(), (ConfigEntry<Object>) BoxesRunTime.boxToInteger(0));
            startEngine(FlinkEngineUtils$.MODULE$.getDefaultContext(strArr, loadConfiguration, str));
            info(() -> {
                return "Flink engine started";
            });
            bootstrap(string);
            org$apache$kyuubi$engine$flink$FlinkSQLEngine$$countDownLatch().await();
        } catch (Throwable th) {
            if (th != null && currentEngine().isDefined()) {
                error(() -> {
                    return "Fatal error occurs, thus stopping the engines";
                }, th);
                currentEngine().foreach(flinkSQLEngine -> {
                    flinkSQLEngine.stop();
                    return BoxedUnit.UNIT;
                });
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                if (th == null) {
                    throw th;
                }
                error(() -> {
                    return "Failed to create FlinkSQL Engine";
                }, th);
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
        }
    }

    public void startEngine(DefaultContext defaultContext) {
        debug(() -> {
            return new StringBuilder(54).append("Starting Flink SQL engine with default configuration: ").append(defaultContext.getFlinkConfig()).toString();
        });
        currentEngine_$eq(new Some(new FlinkSQLEngine(defaultContext)));
        currentEngine().foreach(flinkSQLEngine -> {
            $anonfun$startEngine$2(flinkSQLEngine);
            return BoxedUnit.UNIT;
        });
    }

    private void bootstrap(String str) {
        Configuration configuration = new Configuration();
        TableEnvironment create = TableEnvironment.create(configuration);
        if ("yarn-application".equalsIgnoreCase(str)) {
            configuration.set(PipelineOptions.NAME, "kyuubi-bootstrap-sql");
            debug(() -> {
                return new StringBuilder(66).append("Running bootstrap Flink SQL in application mode with flink conf: ").append(configuration).append(".").toString();
            });
            create.executeSql("select 'kyuubi'").await();
        }
        ((IterableLike) kyuubiConf().get(KyuubiConf$.MODULE$.ENGINE_FLINK_INITIALIZE_SQL())).foreach(str2 -> {
            $anonfun$bootstrap$2(create, str2);
            return BoxedUnit.UNIT;
        });
        info(() -> {
            return "Bootstrap SQL finished.";
        });
    }

    private void setDeploymentConf(String str, Configuration configuration) {
        kyuubiConf().getOption("flink.app.name").foreach(str2 -> {
            configuration.setString("kyuubi.engine.name", str2);
            return BoxedUnit.UNIT;
        });
        kyuubiConf().getOption("kyuubi.session.user").foreach(str3 -> {
            configuration.setString("kyuubi.session.user", str3);
            return BoxedUnit.UNIT;
        });
        if (!("yarn-per-job".equals(str) ? true : "yarn-application".equals(str))) {
            debug(() -> {
                return new StringBuilder(50).append("Skip setting deployment conf for execution target ").append(str).toString();
            });
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else if (!configuration.containsKey("high-availability.cluster-id")) {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        } else {
            configuration.setString("yarn.application.id", (String) configuration.toMap().get("high-availability.cluster-id"));
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        }
    }

    public FlinkSQLEngine apply(DefaultContext defaultContext) {
        return new FlinkSQLEngine(defaultContext);
    }

    public Option<DefaultContext> unapply(FlinkSQLEngine flinkSQLEngine) {
        return flinkSQLEngine == null ? None$.MODULE$ : new Some(flinkSQLEngine.engineContext());
    }

    private Object readResolve() {
        return MODULE$;
    }

    public static final /* synthetic */ void $anonfun$startEngine$2(FlinkSQLEngine flinkSQLEngine) {
        flinkSQLEngine.initialize(MODULE$.kyuubiConf());
        flinkSQLEngine.start();
        Utils$.MODULE$.addShutdownHook(() -> {
            flinkSQLEngine.stop();
        }, Utils$.MODULE$.FLINK_ENGINE_SHUTDOWN_PRIORITY() + 1);
    }

    public static final /* synthetic */ void $anonfun$bootstrap$2(TableEnvironment tableEnvironment, String str) {
        tableEnvironment.executeSql(str).await();
    }

    private FlinkSQLEngine$() {
        MODULE$ = this;
        Logging.$init$(this);
        this.kyuubiConf = new KyuubiConf(KyuubiConf$.MODULE$.apply$default$1());
        this.currentEngine = None$.MODULE$;
        this.user = Utils$.MODULE$.currentUser();
        this.org$apache$kyuubi$engine$flink$FlinkSQLEngine$$countDownLatch = new CountDownLatch(1);
    }
}
