package org.apache.flink.client.deployment.application.executors;

import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.client.deployment.application.EmbeddedJobClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineExecutorFactory;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.class */
public class EmbeddedExecutorFactory implements PipelineExecutorFactory {
    private static Collection<JobID> bootstrapJobIds;
    private static Collection<JobID> submittedJobIds;
    private static DispatcherGateway dispatcherGateway;
    private static ScheduledExecutor retryExecutor;
    private static final long BOOTSTRAP_WAIT_INTERVAL = 10000;
    private static final int BOOTSTRAP_WAIT_RETRIES = 3;
    private static final Object bootstrapLock = new Object();
    private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedExecutorFactory.class);

    public EmbeddedExecutorFactory() {
        LOGGER.debug("{} loaded in thread {} with classloader {}.", new Object[]{getClass().getCanonicalName(), Thread.currentThread().getName(), getClass().getClassLoader().toString()});
    }

    public EmbeddedExecutorFactory(Collection<JobID> collection, DispatcherGateway dispatcherGateway2, ScheduledExecutor scheduledExecutor) {
        LOGGER.debug("{} initiated in thread {} with classloader {}.", new Object[]{getClass().getCanonicalName(), Thread.currentThread().getName(), getClass().getClassLoader().toString()});
        Preconditions.checkState(submittedJobIds == null);
        Preconditions.checkState(dispatcherGateway == null);
        Preconditions.checkState(retryExecutor == null);
        synchronized (bootstrapLock) {
            LOGGER.debug("Bootstrapping EmbeddedExecutorFactory.");
            submittedJobIds = new ConcurrentLinkedQueue((Collection) Preconditions.checkNotNull(collection));
            bootstrapJobIds = collection;
            dispatcherGateway = (DispatcherGateway) Preconditions.checkNotNull(dispatcherGateway2);
            retryExecutor = (ScheduledExecutor) Preconditions.checkNotNull(scheduledExecutor);
            bootstrapLock.notifyAll();
        }
    }

    public String getName() {
        return "embedded";
    }

    public boolean isCompatibleWith(Configuration configuration) {
        LOGGER.debug("Matching execution target: {}", configuration.get(DeploymentOptions.TARGET));
        return ((String) configuration.get(DeploymentOptions.TARGET)).equalsIgnoreCase("yarn-application") && ((String) configuration.toMap().getOrDefault("yarn.tags", "")).toLowerCase().contains("kyuubi");
    }

    public PipelineExecutor getExecutor(Configuration configuration) {
        Collection<JobID> collection;
        Preconditions.checkNotNull(configuration);
        synchronized (bootstrapLock) {
            for (int i = 0; bootstrapJobIds == null && i < 3; i++) {
                try {
                    LOGGER.debug("Waiting for bootstrap to complete. Wait retries: {}.", Integer.valueOf(i));
                    bootstrapLock.wait(BOOTSTRAP_WAIT_INTERVAL);
                } catch (InterruptedException e) {
                    throw new RuntimeException("Interrupted while waiting for bootstrap.", e);
                }
            }
            if (bootstrapJobIds == null) {
                throw new RuntimeException("Bootstrap of Flink SQL engine timed out after 30000 ms. Please check the engine log for more details.");
            }
        }
        if (bootstrapJobIds.size() > 0) {
            LOGGER.info("Submitting new Kyuubi job. Job submitted: {}.", Integer.valueOf(submittedJobIds.size()));
            collection = submittedJobIds;
        } else {
            LOGGER.info("Bootstrapping Flink SQL engine with the initial SQL.");
            collection = bootstrapJobIds;
        }
        return new EmbeddedExecutor(collection, dispatcherGateway, (jobID, classLoader) -> {
            return new EmbeddedJobClient(jobID, dispatcherGateway, retryExecutor, Time.milliseconds(((Duration) configuration.get(ClientOptions.CLIENT_TIMEOUT)).toMillis()), classLoader);
        });
    }
}
