package org.apache.pulsar.functions;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.nar.FileUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.runtime.RuntimeUtils;
import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory;
import org.apache.pulsar.functions.runtime.shaded.com.beust.jcommander.IStringConverter;
import org.apache.pulsar.functions.runtime.shaded.com.beust.jcommander.JCommander;
import org.apache.pulsar.functions.runtime.shaded.com.beust.jcommander.Parameter;
import org.apache.pulsar.functions.runtime.shaded.com.google.gson.Gson;
import org.apache.pulsar.functions.runtime.shaded.com.google.gson.GsonBuilder;
import org.apache.pulsar.functions.runtime.shaded.com.google.gson.JsonParser;
import org.apache.pulsar.functions.runtime.shaded.io.prometheus.client.CollectorRegistry;
import org.apache.pulsar.functions.runtime.shaded.io.prometheus.client.exporter.HTTPServer;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
import org.apache.pulsar.functions.secretsproviderconfigurator.NameAndConfigBasedSecretsProviderConfigurator;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.FunctionRuntimeCommon;
import org.apache.pulsar.functions.utils.LoadedFunctionPackage;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
import org.apache.pulsar.functions.utils.functions.FunctionArchive;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
import org.apache.pulsar.functions.utils.io.Connector;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.DeliveryOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/LocalRunner.class */
public class LocalRunner implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(LocalRunner.class);
    private final String narExtractionDirectory;
    private final File narExtractionDirectoryCreated;
    private final String connectorsDir;
    private final String functionsDir;
    private final Thread shutdownHook;
    private final int instanceLivenessCheck;
    private UserCodeClassLoader userCodeClassLoader;
    private UserCodeClassLoader transformFunctionCodeClassLoader;
    private RuntimeFactory runtimeFactory;
    private HTTPServer metricsServer;

    @Parameter(names = {"--functionConfig"}, description = "The json representation of FunctionConfig", hidden = true, converter = FunctionConfigConverter.class)
    protected FunctionConfig functionConfig;

    @Parameter(names = {"--sourceConfig"}, description = "The json representation of SourceConfig", hidden = true, converter = SourceConfigConverter.class)
    protected SourceConfig sourceConfig;

    @Parameter(names = {"--sinkConfig"}, description = "The json representation of SinkConfig", hidden = true, converter = SinkConfigConverter.class)
    protected SinkConfig sinkConfig;

    @Parameter(names = {"--stateStorageImplClass"}, description = "The implemenatation class state storage service (by default Apache BookKeeper)", hidden = true, required = false)
    protected String stateStorageImplClass;

    @Parameter(names = {"--stateStorageServiceUrl"}, description = "The URL for the state storage service (by default Apache BookKeeper)", hidden = true)
    protected String stateStorageServiceUrl;

    @Parameter(names = {"--brokerServiceUrl"}, description = "The URL for the Pulsar broker", hidden = true)
    protected String brokerServiceUrl;

    @Parameter(names = {"--clientAuthPlugin"}, description = "Client authentication plugin using which function-process can connect to broker", hidden = true)
    protected String clientAuthPlugin;

    @Parameter(names = {"--clientAuthParams"}, description = "Client authentication param", hidden = true)
    protected String clientAuthParams;

    @Parameter(names = {"--useTls"}, description = "Use tls connection\n", hidden = true, arity = 1)
    protected boolean useTls;

    @Parameter(names = {"--tlsAllowInsecureConnection"}, description = "Allow insecure tls connection\n", hidden = true, arity = 1)
    protected boolean tlsAllowInsecureConnection;

    @Parameter(names = {"--tlsHostNameVerificationEnabled"}, description = "Enable hostname verification", hidden = true, arity = 1)
    protected boolean tlsHostNameVerificationEnabled;

    @Parameter(names = {"--tlsTrustCertFilePath"}, description = "tls trust cert file path", hidden = true)
    protected String tlsTrustCertFilePath;

    @Parameter(names = {"--instanceIdOffset"}, description = "Start the instanceIds from this offset", hidden = true)
    protected int instanceIdOffset;

    @Parameter(names = {"--runtime"}, description = "Function runtime to use (Thread/Process)", hidden = true, converter = RuntimeConverter.class)
    protected RuntimeEnv runtimeEnv;

    @Parameter(names = {"--secretsProviderClassName"}, description = "Whats the classname of secrets provider", hidden = true)
    protected String secretsProviderClassName;

    @Parameter(names = {"--secretsProviderConfig"}, description = "Whats the config for the secrets provider", hidden = true)
    protected String secretsProviderConfig;

    @Parameter(names = {"--metricsPortStart"}, description = "The starting port range for metrics server. When running instances as threads, one metrics server is used to host the stats for all instances.", hidden = true)
    protected Integer metricsPortStart;

    @Parameter(names = {"--exitOnError"}, description = "The starting port range for metrics server. When running instances as threads, one metrics server is used to host the stats for all instances.", hidden = true)
    protected boolean exitOnError;
    private static final String DEFAULT_SERVICE_URL = "pulsar://localhost:6650";
    private static final String DEFAULT_WEB_SERVICE_URL = "http://localhost:8080";
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final List<RuntimeSpawner> spawners = new LinkedList();

    @Parameter(names = {"--webServiceUrl"}, description = "The URL for the Pulsar web service", hidden = true)
    protected String webServiceUrl = null;

    /* loaded from: input_file:org/apache/pulsar/functions/LocalRunner$FunctionConfigConverter.class */
    public static class FunctionConfigConverter implements IStringConverter<FunctionConfig> {
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.pulsar.functions.runtime.shaded.com.beust.jcommander.IStringConverter
        public FunctionConfig convert(String str) {
            try {
                return (FunctionConfig) ObjectMapperFactory.getMapper().reader().readValue(str, FunctionConfig.class);
            } catch (IOException e) {
                throw new RuntimeException("Failed to parse function config:", e);
            }
        }
    }

    /* loaded from: input_file:org/apache/pulsar/functions/LocalRunner$LocalRunnerBuilder.class */
    public static class LocalRunnerBuilder {
        private FunctionConfig functionConfig;
        private SourceConfig sourceConfig;
        private SinkConfig sinkConfig;
        private String stateStorageImplClass;
        private String stateStorageServiceUrl;
        private String brokerServiceUrl;
        private String clientAuthPlugin;
        private String clientAuthParams;
        private boolean useTls;
        private boolean tlsAllowInsecureConnection;
        private boolean tlsHostNameVerificationEnabled;
        private String tlsTrustCertFilePath;
        private int instanceIdOffset;
        private RuntimeEnv runtimeEnv;
        private String secretsProviderClassName;
        private String secretsProviderConfig;
        private String narExtractionDirectory;
        private String connectorsDirectory;
        private String functionsDirectory;
        private Integer metricsPortStart;
        private boolean exitOnError;

        LocalRunnerBuilder() {
        }

        public LocalRunnerBuilder functionConfig(FunctionConfig functionConfig) {
            this.functionConfig = functionConfig;
            return this;
        }

        public LocalRunnerBuilder sourceConfig(SourceConfig sourceConfig) {
            this.sourceConfig = sourceConfig;
            return this;
        }

        public LocalRunnerBuilder sinkConfig(SinkConfig sinkConfig) {
            this.sinkConfig = sinkConfig;
            return this;
        }

        public LocalRunnerBuilder stateStorageImplClass(String str) {
            this.stateStorageImplClass = str;
            return this;
        }

        public LocalRunnerBuilder stateStorageServiceUrl(String str) {
            this.stateStorageServiceUrl = str;
            return this;
        }

        public LocalRunnerBuilder brokerServiceUrl(String str) {
            this.brokerServiceUrl = str;
            return this;
        }

        public LocalRunnerBuilder clientAuthPlugin(String str) {
            this.clientAuthPlugin = str;
            return this;
        }

        public LocalRunnerBuilder clientAuthParams(String str) {
            this.clientAuthParams = str;
            return this;
        }

        public LocalRunnerBuilder useTls(boolean z) {
            this.useTls = z;
            return this;
        }

        public LocalRunnerBuilder tlsAllowInsecureConnection(boolean z) {
            this.tlsAllowInsecureConnection = z;
            return this;
        }

        public LocalRunnerBuilder tlsHostNameVerificationEnabled(boolean z) {
            this.tlsHostNameVerificationEnabled = z;
            return this;
        }

        public LocalRunnerBuilder tlsTrustCertFilePath(String str) {
            this.tlsTrustCertFilePath = str;
            return this;
        }

        public LocalRunnerBuilder instanceIdOffset(int i) {
            this.instanceIdOffset = i;
            return this;
        }

        public LocalRunnerBuilder runtimeEnv(RuntimeEnv runtimeEnv) {
            this.runtimeEnv = runtimeEnv;
            return this;
        }

        public LocalRunnerBuilder secretsProviderClassName(String str) {
            this.secretsProviderClassName = str;
            return this;
        }

        public LocalRunnerBuilder secretsProviderConfig(String str) {
            this.secretsProviderConfig = str;
            return this;
        }

        public LocalRunnerBuilder narExtractionDirectory(String str) {
            this.narExtractionDirectory = str;
            return this;
        }

        public LocalRunnerBuilder connectorsDirectory(String str) {
            this.connectorsDirectory = str;
            return this;
        }

        public LocalRunnerBuilder functionsDirectory(String str) {
            this.functionsDirectory = str;
            return this;
        }

        public LocalRunnerBuilder metricsPortStart(Integer num) {
            this.metricsPortStart = num;
            return this;
        }

        public LocalRunnerBuilder exitOnError(boolean z) {
            this.exitOnError = z;
            return this;
        }

        public LocalRunner build() {
            return new LocalRunner(this.functionConfig, this.sourceConfig, this.sinkConfig, this.stateStorageImplClass, this.stateStorageServiceUrl, this.brokerServiceUrl, this.clientAuthPlugin, this.clientAuthParams, this.useTls, this.tlsAllowInsecureConnection, this.tlsHostNameVerificationEnabled, this.tlsTrustCertFilePath, this.instanceIdOffset, this.runtimeEnv, this.secretsProviderClassName, this.secretsProviderConfig, this.narExtractionDirectory, this.connectorsDirectory, this.functionsDirectory, this.metricsPortStart, this.exitOnError);
        }

        public String toString() {
            return "LocalRunner.LocalRunnerBuilder(functionConfig=" + this.functionConfig + ", sourceConfig=" + this.sourceConfig + ", sinkConfig=" + this.sinkConfig + ", stateStorageImplClass=" + this.stateStorageImplClass + ", stateStorageServiceUrl=" + this.stateStorageServiceUrl + ", brokerServiceUrl=" + this.brokerServiceUrl + ", clientAuthPlugin=" + this.clientAuthPlugin + ", clientAuthParams=" + this.clientAuthParams + ", useTls=" + this.useTls + ", tlsAllowInsecureConnection=" + this.tlsAllowInsecureConnection + ", tlsHostNameVerificationEnabled=" + this.tlsHostNameVerificationEnabled + ", tlsTrustCertFilePath=" + this.tlsTrustCertFilePath + ", instanceIdOffset=" + this.instanceIdOffset + ", runtimeEnv=" + this.runtimeEnv + ", secretsProviderClassName=" + this.secretsProviderClassName + ", secretsProviderConfig=" + this.secretsProviderConfig + ", narExtractionDirectory=" + this.narExtractionDirectory + ", connectorsDirectory=" + this.connectorsDirectory + ", functionsDirectory=" + this.functionsDirectory + ", metricsPortStart=" + this.metricsPortStart + ", exitOnError=" + this.exitOnError + ")";
        }
    }

    /* loaded from: input_file:org/apache/pulsar/functions/LocalRunner$RuntimeConverter.class */
    public static class RuntimeConverter implements IStringConverter<RuntimeEnv> {
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.pulsar.functions.runtime.shaded.com.beust.jcommander.IStringConverter
        public RuntimeEnv convert(String str) {
            return RuntimeEnv.valueOf(str);
        }
    }

    /* loaded from: input_file:org/apache/pulsar/functions/LocalRunner$RuntimeEnv.class */
    public enum RuntimeEnv {
        THREAD,
        PROCESS
    }

    /* loaded from: input_file:org/apache/pulsar/functions/LocalRunner$SinkConfigConverter.class */
    public static class SinkConfigConverter implements IStringConverter<SinkConfig> {
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.pulsar.functions.runtime.shaded.com.beust.jcommander.IStringConverter
        public SinkConfig convert(String str) {
            try {
                return (SinkConfig) ObjectMapperFactory.getMapper().reader().readValue(str, SinkConfig.class);
            } catch (IOException e) {
                throw new RuntimeException("Failed to parse sink config:", e);
            }
        }
    }

    /* loaded from: input_file:org/apache/pulsar/functions/LocalRunner$SourceConfigConverter.class */
    public static class SourceConfigConverter implements IStringConverter<SourceConfig> {
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.pulsar.functions.runtime.shaded.com.beust.jcommander.IStringConverter
        public SourceConfig convert(String str) {
            try {
                return (SourceConfig) ObjectMapperFactory.getMapper().reader().readValue(str, SourceConfig.class);
            } catch (IOException e) {
                throw new RuntimeException("Failed to parse source config:", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/functions/LocalRunner$UserCodeClassLoader.class */
    public static final class UserCodeClassLoader {
        private final ClassLoader classLoader;
        private final boolean classLoaderCreated;

        public UserCodeClassLoader(ClassLoader classLoader, boolean z) {
            this.classLoader = classLoader;
            this.classLoaderCreated = z;
        }

        public ClassLoader getClassLoader() {
            return this.classLoader;
        }

        public boolean isClassLoaderCreated() {
            return this.classLoaderCreated;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof UserCodeClassLoader)) {
                return false;
            }
            UserCodeClassLoader userCodeClassLoader = (UserCodeClassLoader) obj;
            if (isClassLoaderCreated() != userCodeClassLoader.isClassLoaderCreated()) {
                return false;
            }
            ClassLoader classLoader = getClassLoader();
            ClassLoader classLoader2 = userCodeClassLoader.getClassLoader();
            return classLoader == null ? classLoader2 == null : classLoader.equals(classLoader2);
        }

        public int hashCode() {
            int i = (1 * 59) + (isClassLoaderCreated() ? 79 : 97);
            ClassLoader classLoader = getClassLoader();
            return (i * 59) + (classLoader == null ? 43 : classLoader.hashCode());
        }

        public String toString() {
            return "LocalRunner.UserCodeClassLoader(classLoader=" + getClassLoader() + ", classLoaderCreated=" + isClassLoaderCreated() + ")";
        }
    }

    public static void main(String[] strArr) throws Exception {
        LocalRunner build = builder().build();
        JCommander jCommander = new JCommander(build);
        jCommander.setProgramName("LocalRunner");
        jCommander.parse(strArr);
        try {
            build.start(true);
        } catch (Exception e) {
            log.error("Encountered error starting localrunner", e);
            build.close();
        }
    }

    public LocalRunner(FunctionConfig functionConfig, SourceConfig sourceConfig, SinkConfig sinkConfig, String str, String str2, String str3, String str4, String str5, boolean z, boolean z2, boolean z3, String str6, int i, RuntimeEnv runtimeEnv, String str7, String str8, String str9, String str10, String str11, Integer num, boolean z4) {
        this.instanceIdOffset = 0;
        this.functionConfig = functionConfig;
        this.sourceConfig = sourceConfig;
        this.sinkConfig = sinkConfig;
        this.stateStorageImplClass = str;
        this.stateStorageServiceUrl = str2;
        this.brokerServiceUrl = str3;
        this.clientAuthPlugin = str4;
        this.clientAuthParams = str5;
        this.useTls = z;
        this.tlsAllowInsecureConnection = z2;
        this.tlsHostNameVerificationEnabled = z3;
        this.tlsTrustCertFilePath = str6;
        this.instanceIdOffset = i;
        this.runtimeEnv = runtimeEnv;
        this.secretsProviderClassName = str7;
        this.secretsProviderConfig = str8;
        if (str9 != null) {
            this.narExtractionDirectoryCreated = null;
            this.narExtractionDirectory = str9;
        } else {
            this.narExtractionDirectoryCreated = createNarExtractionTempDirectory();
            this.narExtractionDirectory = this.narExtractionDirectoryCreated.getAbsolutePath();
        }
        this.connectorsDir = str10 != null ? str10 : getPulsarDirectory("connectors");
        this.functionsDir = str11 != null ? str11 : getPulsarDirectory("functions");
        this.metricsPortStart = num;
        this.exitOnError = z4;
        this.instanceLivenessCheck = z4 ? 0 : 30000;
        this.shutdownHook = new Thread(() -> {
            try {
                close();
            } catch (Exception e) {
                log.warn("Encountered exception when closing localrunner", e);
            }
        });
    }

    private static String getPulsarDirectory(String str) {
        return (System.getenv("PULSAR_HOME") != null ? Path.of(System.getenv("PULSAR_HOME"), str) : Path.of(str, new String[0])).toAbsolutePath().normalize().toString();
    }

    private static File createNarExtractionTempDirectory() {
        try {
            return Files.createTempDirectory("pulsar_localrunner_nars_", new FileAttribute[0]).toFile();
        } catch (IOException e) {
            throw new UncheckedIOException("Cannot create temp directory", e);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        try {
            stop();
        } finally {
            if (this.narExtractionDirectoryCreated != null && this.narExtractionDirectoryCreated.exists()) {
                FileUtils.deleteFile(this.narExtractionDirectoryCreated, true);
            }
        }
    }

    public synchronized void stop() {
        if (this.running.compareAndSet(true, false)) {
            notify();
            try {
                Runtime.getRuntime().removeShutdownHook(this.shutdownHook);
            } catch (IllegalStateException e) {
            }
            if (this.metricsServer != null) {
                this.metricsServer.stop();
            }
            Iterator<RuntimeSpawner> it = this.spawners.iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            this.spawners.clear();
            if (this.runtimeFactory != null) {
                this.runtimeFactory.close();
                this.runtimeFactory = null;
            }
            closeClassLoaderIfneeded(this.userCodeClassLoader);
            this.userCodeClassLoader = null;
            closeClassLoaderIfneeded(this.transformFunctionCodeClassLoader);
            this.transformFunctionCodeClassLoader = null;
        }
    }

    private static void closeClassLoaderIfneeded(UserCodeClassLoader userCodeClassLoader) {
        if (userCodeClassLoader != null && userCodeClassLoader.isClassLoaderCreated() && (userCodeClassLoader.getClassLoader() instanceof Closeable)) {
            try {
                ((Closeable) userCodeClassLoader.getClassLoader()).close();
            } catch (IOException e) {
                log.warn("Error closing classloader", e);
            }
        }
    }

    public void start(boolean z) throws Exception {
        String archive;
        int intValue;
        LinkedList<RuntimeSpawner> linkedList = new LinkedList();
        synchronized (this) {
            if (!this.running.compareAndSet(false, true)) {
                throw new IllegalArgumentException("Pulsar Function local run already started!");
            }
            Runtime.getRuntime().addShutdownHook(this.shutdownHook);
            Function.FunctionDetails functionDetails = null;
            String str = null;
            if (this.functionConfig != null) {
                FunctionConfigUtils.inferMissingArguments(this.functionConfig, true);
                intValue = this.functionConfig.getParallelism().intValue();
                if (this.functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
                    archive = this.functionConfig.getJar();
                    this.userCodeClassLoader = extractClassLoader(archive, Function.FunctionDetails.ComponentType.FUNCTION, this.functionConfig.getClassName());
                    functionDetails = FunctionConfigUtils.convert(this.functionConfig, FunctionConfigUtils.validateJavaFunction(this.functionConfig, new LoadedFunctionPackage(getCurrentOrUserCodeClassLoader(), FunctionDefinition.class)));
                } else if (this.functionConfig.getRuntime() == FunctionConfig.Runtime.GO) {
                    archive = this.functionConfig.getGo();
                } else {
                    if (this.functionConfig.getRuntime() != FunctionConfig.Runtime.PYTHON) {
                        throw new UnsupportedOperationException();
                    }
                    archive = this.functionConfig.getPy();
                }
                if (functionDetails == null) {
                    functionDetails = FunctionConfigUtils.convert(this.functionConfig, new LoadedFunctionPackage(getCurrentOrUserCodeClassLoader(), FunctionDefinition.class));
                }
            } else if (this.sourceConfig != null) {
                Utils.inferMissingArguments(this.sourceConfig);
                archive = this.sourceConfig.getArchive();
                intValue = this.sourceConfig.getParallelism().intValue();
                this.userCodeClassLoader = extractClassLoader(archive, Function.FunctionDetails.ComponentType.SOURCE, this.sourceConfig.getClassName());
                functionDetails = SourceConfigUtils.convert(this.sourceConfig, SourceConfigUtils.validateAndExtractDetails(this.sourceConfig, new LoadedFunctionPackage(getCurrentOrUserCodeClassLoader(), ConnectorDefinition.class), true));
            } else {
                if (this.sinkConfig == null) {
                    throw new IllegalArgumentException("Must specify Function, Source or Sink config");
                }
                Utils.inferMissingArguments(this.sinkConfig);
                archive = this.sinkConfig.getArchive();
                str = this.sinkConfig.getTransformFunction();
                intValue = this.sinkConfig.getParallelism().intValue();
                this.userCodeClassLoader = extractClassLoader(archive, Function.FunctionDetails.ComponentType.SINK, this.sinkConfig.getClassName());
                LoadedFunctionPackage loadedFunctionPackage = new LoadedFunctionPackage(getCurrentOrUserCodeClassLoader(), ConnectorDefinition.class);
                if (StringUtils.isNotEmpty(this.sinkConfig.getTransformFunction())) {
                    this.transformFunctionCodeClassLoader = extractClassLoader(this.sinkConfig.getTransformFunction(), Function.FunctionDetails.ComponentType.FUNCTION, this.sinkConfig.getTransformFunctionClassName());
                }
                functionDetails = SinkConfigUtils.convert(this.sinkConfig, SinkConfigUtils.validateAndExtractDetails(this.sinkConfig, loadedFunctionPackage, this.transformFunctionCodeClassLoader != null ? new LoadedFunctionPackage(this.transformFunctionCodeClassLoader.getClassLoader() == null ? Thread.currentThread().getContextClassLoader() : this.transformFunctionCodeClassLoader.getClassLoader(), FunctionDefinition.class) : null, true));
            }
            if (System.getProperty(FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY) == null) {
                System.setProperty(FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY, LocalRunner.class.getProtectionDomain().getCodeSource().getLocation().getFile());
            }
            AuthenticationConfig build = AuthenticationConfig.builder().clientAuthenticationPlugin(this.clientAuthPlugin).clientAuthenticationParameters(this.clientAuthParams).useTls(this.useTls).tlsAllowInsecureConnection(this.tlsAllowInsecureConnection).tlsHostnameVerificationEnable(this.tlsHostNameVerificationEnabled).tlsTrustCertsFilePath(this.tlsTrustCertFilePath).build();
            String str2 = DEFAULT_SERVICE_URL;
            if (this.brokerServiceUrl != null) {
                str2 = this.brokerServiceUrl;
            }
            if (this.webServiceUrl == null) {
                this.webServiceUrl = DEFAULT_WEB_SERVICE_URL;
            }
            if (!(this.sourceConfig == null && this.sinkConfig == null && this.functionConfig.getRuntime() != FunctionConfig.Runtime.JAVA) && (this.runtimeEnv == null || this.runtimeEnv == RuntimeEnv.THREAD)) {
                startThreadedMode(functionDetails, intValue, this.instanceIdOffset, str2, this.stateStorageServiceUrl, build, archive, str);
            } else {
                startProcessMode(functionDetails, intValue, this.instanceIdOffset, str2, this.stateStorageServiceUrl, build, archive, str);
            }
            linkedList.addAll(this.spawners);
        }
        if (z) {
            if (!this.exitOnError) {
                synchronized (this) {
                    while (this.running.get()) {
                        wait();
                    }
                }
            } else {
                for (RuntimeSpawner runtimeSpawner : linkedList) {
                    runtimeSpawner.join();
                    log.info("RuntimeSpawner quit because of", runtimeSpawner.getRuntime().getDeathException());
                }
                close();
            }
        }
    }

    private ClassLoader getCurrentOrUserCodeClassLoader() {
        return (this.userCodeClassLoader == null || this.userCodeClassLoader.getClassLoader() == null) ? Thread.currentThread().getContextClassLoader() : this.userCodeClassLoader.getClassLoader();
    }

    private UserCodeClassLoader extractClassLoader(String str, Function.FunctionDetails.ComponentType componentType, String str2) throws IOException, URISyntaxException {
        String str3;
        Object obj;
        ClassLoader isBuiltIn = str != null ? isBuiltIn(str, componentType) : null;
        boolean z = false;
        if (isBuiltIn == null) {
            if (str != null && Utils.isFunctionPackageUrlSupported(str)) {
                isBuiltIn = FunctionRuntimeCommon.getClassLoaderFromPackage(componentType, str2, FunctionCommon.extractFileFromPkgURL(str), this.narExtractionDirectory);
                z = true;
            } else if (str != null) {
                File file = new File(str);
                if (!file.exists()) {
                    switch (componentType) {
                        case FUNCTION:
                            obj = "User jar";
                            break;
                        case SOURCE:
                            obj = "Source archive";
                            break;
                        case SINK:
                            obj = "Sink archive";
                            break;
                        default:
                            throw new IllegalStateException("Unexpected value: " + componentType);
                    }
                    throw new RuntimeException(obj + " (" + str + ") does not exist");
                }
                isBuiltIn = FunctionRuntimeCommon.getClassLoaderFromPackage(componentType, str2, file, this.narExtractionDirectory);
                z = true;
            } else if (this.runtimeEnv != null && this.runtimeEnv != RuntimeEnv.THREAD) {
                switch (componentType) {
                    case FUNCTION:
                        str3 = "The jar property must be specified in FunctionConfig.";
                        break;
                    case SOURCE:
                        str3 = "The archive property must be specified in SourceConfig.";
                        break;
                    case SINK:
                        str3 = "The archive property must be specified in SinkConfig.";
                        break;
                    default:
                        throw new IllegalStateException("Unexpected ComponentType: " + componentType);
                }
                throw new IllegalStateException(str3);
            }
        }
        return new UserCodeClassLoader(isBuiltIn, z);
    }

    private void startProcessMode(Function.FunctionDetails functionDetails, int i, int i2, String str, String str2, AuthenticationConfig authenticationConfig, String str3, String str4) throws Exception {
        this.runtimeFactory = new ProcessRuntimeFactory(str, this.webServiceUrl, str2, authenticationConfig, null, null, null, null, this.narExtractionDirectory, getSecretsProviderConfigurator(), false, Optional.empty(), Optional.empty());
        for (int i3 = 0; i3 < i; i3++) {
            InstanceConfig instanceConfig = new InstanceConfig();
            instanceConfig.setFunctionDetails(functionDetails);
            instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
            instanceConfig.setFunctionId(UUID.randomUUID().toString());
            instanceConfig.setInstanceId(i3 + i2);
            instanceConfig.setMaxBufferedTuples(1024);
            instanceConfig.setPort(FunctionCommon.findAvailablePort());
            if (this.metricsPortStart != null) {
                int intValue = this.metricsPortStart.intValue() + i3;
                if (this.metricsPortStart.intValue() < 0 || this.metricsPortStart.intValue() > 65535) {
                    throw new IllegalArgumentException("Metrics port need to be within the range of 0 and 65535");
                }
                instanceConfig.setMetricsPort(intValue);
            } else {
                instanceConfig.setMetricsPort(FunctionCommon.findAvailablePort());
            }
            instanceConfig.setClusterName("local");
            if (this.functionConfig != null) {
                instanceConfig.setMaxPendingAsyncRequests(this.functionConfig.getMaxPendingAsyncRequests().intValue());
                if (this.functionConfig.getExposePulsarAdminClientEnabled() != null) {
                    instanceConfig.setExposePulsarAdminClientEnabled(this.functionConfig.getExposePulsarAdminClientEnabled().booleanValue());
                }
            }
            RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, str3, null, str4, null, this.runtimeFactory, this.instanceLivenessCheck);
            this.spawners.add(runtimeSpawner);
            runtimeSpawner.start();
        }
        Timer timer = new Timer();
        timer.scheduleAtFixedRate(new TimerTask() { // from class: org.apache.pulsar.functions.LocalRunner.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                CompletableFuture[] completableFutureArr = new CompletableFuture[LocalRunner.this.spawners.size()];
                int i4 = 0;
                Iterator<RuntimeSpawner> it = LocalRunner.this.spawners.iterator();
                while (it.hasNext()) {
                    completableFutureArr[i4] = it.next().getFunctionStatusAsJson(i4);
                    i4++;
                }
                try {
                    CompletableFuture.allOf(completableFutureArr).get(5L, TimeUnit.SECONDS);
                    for (CompletableFuture completableFuture : completableFutureArr) {
                        LocalRunner.log.info(new GsonBuilder().setPrettyPrinting().create().toJson(JsonParser.parseString((String) completableFuture.get())));
                    }
                } catch (InterruptedException | ExecutionException | TimeoutException e) {
                    LocalRunner.log.error("Could not get status from all local instances");
                }
            }
        }, DeliveryOptions.DEFAULT_TIMEOUT, DeliveryOptions.DEFAULT_TIMEOUT);
        Runtime runtime = Runtime.getRuntime();
        Objects.requireNonNull(timer);
        runtime.addShutdownHook(new Thread(timer::cancel));
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v61, types: [org.apache.pulsar.functions.secretsprovider.SecretsProvider] */
    private void startThreadedMode(Function.FunctionDetails functionDetails, int i, int i2, String str, String str2, AuthenticationConfig authenticationConfig, String str3, String str4) throws Exception {
        ClearTextSecretsProvider clearTextSecretsProvider;
        if (this.metricsPortStart != null && (this.metricsPortStart.intValue() < 0 || this.metricsPortStart.intValue() > 65535)) {
            throw new IllegalArgumentException("Metrics port need to be within the range of 0 and 65535");
        }
        if (this.secretsProviderClassName != null) {
            clearTextSecretsProvider = (SecretsProvider) Reflections.createInstance(this.secretsProviderClassName, ClassLoader.getSystemClassLoader());
            clearTextSecretsProvider.init(this.secretsProviderConfig != null ? (Map) new Gson().fromJson(this.secretsProviderConfig, Map.class) : null);
        } else {
            clearTextSecretsProvider = new ClearTextSecretsProvider();
        }
        boolean z = false;
        if (this.functionConfig != null && this.functionConfig.getExposePulsarAdminClientEnabled() != null) {
            z = this.functionConfig.getExposePulsarAdminClientEnabled().booleanValue();
        }
        FunctionCollectorRegistry defaultImplementation = FunctionCollectorRegistry.getDefaultImplementation();
        RuntimeUtils.registerDefaultCollectors(defaultImplementation);
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            if (this.userCodeClassLoader != null && this.userCodeClassLoader.getClassLoader() != null) {
                Thread.currentThread().setContextClassLoader(this.userCodeClassLoader.getClassLoader());
            }
            this.runtimeFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup", str, this.stateStorageImplClass, str2, authenticationConfig, clearTextSecretsProvider, defaultImplementation, this.narExtractionDirectory, null, z, this.webServiceUrl);
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            for (int i3 = 0; i3 < i; i3++) {
                InstanceConfig instanceConfig = new InstanceConfig();
                instanceConfig.setFunctionDetails(functionDetails);
                instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
                instanceConfig.setFunctionId(UUID.randomUUID().toString());
                instanceConfig.setTransformFunctionId(UUID.randomUUID().toString());
                instanceConfig.setInstanceId(i3 + i2);
                instanceConfig.setMaxBufferedTuples(1024);
                if (this.metricsPortStart != null) {
                    instanceConfig.setMetricsPort(this.metricsPortStart.intValue());
                }
                instanceConfig.setClusterName("local");
                if (this.functionConfig != null) {
                    instanceConfig.setMaxPendingAsyncRequests(this.functionConfig.getMaxPendingAsyncRequests().intValue());
                    if (this.functionConfig.getExposePulsarAdminClientEnabled() != null) {
                        instanceConfig.setExposePulsarAdminClientEnabled(this.functionConfig.getExposePulsarAdminClientEnabled().booleanValue());
                    }
                }
                RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, str3, null, str4, null, this.runtimeFactory, this.instanceLivenessCheck);
                this.spawners.add(runtimeSpawner);
                runtimeSpawner.start();
            }
            if (this.metricsPortStart != null) {
                log.info("Starting metrics server on port {}", this.metricsPortStart);
                this.metricsServer = new HTTPServer(new InetSocketAddress(this.metricsPortStart.intValue()), (CollectorRegistry) defaultImplementation, true);
            }
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    private ClassLoader isBuiltIn(String str, Function.FunctionDetails.ComponentType componentType) throws IOException {
        switch (componentType) {
            case FUNCTION:
                return isBuiltInFunction(str);
            case SOURCE:
                return isBuiltInSource(str);
            case SINK:
                return isBuiltInSink(str);
            default:
                throw new IllegalStateException("Unexpected ComponentType: " + componentType);
        }
    }

    private ClassLoader isBuiltInFunction(String str) throws IOException {
        FunctionArchive functionArchive = getFunctions().get(str.replaceFirst("^builtin://", ""));
        if (functionArchive == null || functionArchive.getFunctionDefinition().getFunctionClass() == null) {
            return null;
        }
        return functionArchive.getFunctionPackage().getClassLoader();
    }

    private ClassLoader isBuiltInSource(String str) throws IOException {
        Connector connector = getConnectors().get(str.replaceFirst("^builtin://", ""));
        if (connector == null || connector.getConnectorDefinition().getSourceClass() == null) {
            return null;
        }
        return connector.getConnectorFunctionPackage().getClassLoader();
    }

    private ClassLoader isBuiltInSink(String str) throws IOException {
        Connector connector = getConnectors().get(str.replaceFirst("^builtin://", ""));
        if (connector == null || connector.getConnectorDefinition().getSinkClass() == null) {
            return null;
        }
        return connector.getConnectorFunctionPackage().getClassLoader();
    }

    private TreeMap<String, FunctionArchive> getFunctions() throws IOException {
        return FunctionUtils.searchForFunctions(this.functionsDir, this.narExtractionDirectory, true);
    }

    private TreeMap<String, Connector> getConnectors() throws IOException {
        return ConnectorUtils.searchForConnectors(this.connectorsDir, this.narExtractionDirectory, true);
    }

    private SecretsProviderConfigurator getSecretsProviderConfigurator() {
        SecretsProviderConfigurator defaultSecretsProviderConfigurator;
        if (this.secretsProviderClassName != null) {
            Map map = null;
            if (this.secretsProviderConfig != null) {
                map = (Map) new Gson().fromJson(this.secretsProviderConfig, Map.class);
            }
            defaultSecretsProviderConfigurator = new NameAndConfigBasedSecretsProviderConfigurator(this.secretsProviderClassName, map);
        } else {
            defaultSecretsProviderConfigurator = new DefaultSecretsProviderConfigurator();
        }
        return defaultSecretsProviderConfigurator;
    }

    public static LocalRunnerBuilder builder() {
        return new LocalRunnerBuilder();
    }
}
