package org.apache.pulsar.functions.utils.io;

import java.io.File;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.TreeMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.io.ConfigFieldDefinition;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.nar.NarClassLoaderBuilder;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.utils.Exceptions;
import org.apache.pulsar.functions.utils.io.Connector;
import org.apache.pulsar.io.core.BatchSource;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.annotations.FieldDoc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-utils-2.11.0.jar:org/apache/pulsar/functions/utils/io/ConnectorUtils.class */
public final class ConnectorUtils {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ConnectorUtils.class);
    private static final String PULSAR_IO_SERVICE_NAME = "pulsar-io.yaml";

    public static String getIOSourceClass(NarClassLoader narClassLoader) throws IOException {
        Class loadClass;
        ConnectorDefinition connectorDefinition = getConnectorDefinition(narClassLoader);
        if (StringUtils.isEmpty(connectorDefinition.getSourceClass())) {
            throw new IOException(String.format("The '%s' connector does not provide a source implementation", connectorDefinition.getName()));
        }
        try {
            loadClass = narClassLoader.loadClass(connectorDefinition.getSourceClass());
        } catch (Throwable th) {
            Exceptions.rethrowIOException(th);
        }
        if (Source.class.isAssignableFrom(loadClass) || BatchSource.class.isAssignableFrom(loadClass)) {
            return connectorDefinition.getSourceClass();
        }
        throw new IOException(String.format("Class %s does not implement interface %s or %s", connectorDefinition.getSourceClass(), Source.class.getName(), BatchSource.class.getName()));
    }

    public static String getIOSinkClass(NarClassLoader narClassLoader) throws IOException {
        ConnectorDefinition connectorDefinition = getConnectorDefinition(narClassLoader);
        if (StringUtils.isEmpty(connectorDefinition.getSinkClass())) {
            throw new IOException(String.format("The '%s' connector does not provide a sink implementation", connectorDefinition.getName()));
        }
        try {
        } catch (Throwable th) {
            Exceptions.rethrowIOException(th);
        }
        if (Sink.class.isAssignableFrom(narClassLoader.loadClass(connectorDefinition.getSinkClass()))) {
            return connectorDefinition.getSinkClass();
        }
        throw new IOException("Class " + connectorDefinition.getSinkClass() + " does not implement interface " + Sink.class.getName());
    }

    public static ConnectorDefinition getConnectorDefinition(NarClassLoader narClassLoader) throws IOException {
        return (ConnectorDefinition) ObjectMapperFactory.getThreadLocalYaml().readValue(narClassLoader.getServiceDefinition(PULSAR_IO_SERVICE_NAME), ConnectorDefinition.class);
    }

    public static List<ConfigFieldDefinition> getConnectorConfigDefinition(ClassLoader classLoader, String str) throws Exception {
        LinkedList linkedList = new LinkedList();
        for (Field field : Reflections.getAllFields(classLoader.loadClass(str))) {
            if (!Modifier.isStatic(field.getModifiers())) {
                field.setAccessible(true);
                ConfigFieldDefinition configFieldDefinition = new ConfigFieldDefinition();
                configFieldDefinition.setFieldName(field.getName());
                configFieldDefinition.setTypeName(field.getType().getName());
                HashMap hashMap = new HashMap();
                for (Annotation annotation : field.getAnnotations()) {
                    if (annotation.annotationType().equals(FieldDoc.class)) {
                        FieldDoc fieldDoc = (FieldDoc) annotation;
                        for (Method method : FieldDoc.class.getDeclaredMethods()) {
                            Object invoke = method.invoke(fieldDoc, new Object[0]);
                            hashMap.put(method.getName(), invoke == null ? "" : invoke.toString());
                        }
                    }
                }
                configFieldDefinition.setAttributes(hashMap);
                linkedList.add(configFieldDefinition);
            }
        }
        return linkedList;
    }

    public static TreeMap<String, Connector> searchForConnectors(String str, String str2) throws IOException {
        Path absolutePath = Paths.get(str, new String[0]).toAbsolutePath();
        log.info("Searching for connectors in {}", absolutePath);
        TreeMap<String, Connector> treeMap = new TreeMap<>();
        if (!absolutePath.toFile().exists()) {
            log.warn("Connectors archive directory not found");
            return treeMap;
        }
        DirectoryStream<Path> newDirectoryStream = Files.newDirectoryStream(absolutePath, "*.nar");
        try {
            for (Path path : newDirectoryStream) {
                try {
                    NarClassLoader build = NarClassLoaderBuilder.builder().narFile(new File(path.toString())).extractionDirectory(str2).build();
                    Connector.ConnectorBuilder builder = Connector.builder();
                    ConnectorDefinition connectorDefinition = getConnectorDefinition(build);
                    log.info("Found connector {} from {}", connectorDefinition, path);
                    builder.archivePath(path);
                    if (!StringUtils.isEmpty(connectorDefinition.getSourceClass()) && !StringUtils.isEmpty(connectorDefinition.getSourceConfigClass())) {
                        builder.sourceConfigFieldDefinitions(getConnectorConfigDefinition(build, connectorDefinition.getSourceConfigClass()));
                    }
                    if (!StringUtils.isEmpty(connectorDefinition.getSinkClass()) && !StringUtils.isEmpty(connectorDefinition.getSinkConfigClass())) {
                        builder.sinkConfigFieldDefinitions(getConnectorConfigDefinition(build, connectorDefinition.getSinkConfigClass()));
                    }
                    builder.classLoader(build);
                    builder.connectorDefinition(connectorDefinition);
                    treeMap.put(connectorDefinition.getName(), builder.build());
                } catch (Throwable th) {
                    log.warn("Failed to load connector from {}", path, th);
                }
            }
            if (newDirectoryStream != null) {
                newDirectoryStream.close();
            }
            return treeMap;
        } catch (Throwable th2) {
            if (newDirectoryStream != null) {
                try {
                    newDirectoryStream.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }

    private ConnectorUtils() {
        throw new UnsupportedOperationException("This is a utility class and cannot be instantiated");
    }
}
