package org.apache.kafka.connect.cli;

import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.Connect;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.runtime.WorkerInfo;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.pulsar.client.impl.schema.LocalDateTimeSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/cli/ConnectDistributed.class
 */
/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.4.jar:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/cli/ConnectDistributed.class */
public class ConnectDistributed {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ConnectDistributed.class);
    private final Time time = Time.SYSTEM;
    private final long initStart = this.time.hiResClockMs();

    public static void main(String[] strArr) {
        if (strArr.length < 1 || Arrays.asList(strArr).contains("--help")) {
            log.info("Usage: ConnectDistributed worker.properties");
            Exit.exit(1);
        }
        try {
            new WorkerInfo().logAll();
            String str = strArr[0];
            new ConnectDistributed().startConnect(!str.isEmpty() ? Utils.propsToStringMap(Utils.loadProps(str)) : Collections.emptyMap()).awaitStop();
        } catch (Throwable th) {
            log.error("Stopping due to error", th);
            Exit.exit(2);
        }
    }

    public Connect startConnect(Map<String, String> map) {
        log.info("Scanning for plugin classes. This might take a moment ...");
        Plugins plugins = new Plugins(map);
        plugins.compareAndSwapWithDelegatingLoader();
        DistributedConfig distributedConfig = new DistributedConfig(map);
        String lookupKafkaClusterId = ConnectUtils.lookupKafkaClusterId(distributedConfig);
        log.debug("Kafka cluster ID: {}", lookupKafkaClusterId);
        RestServer restServer = new RestServer(distributedConfig);
        restServer.initializeServer();
        URI advertisedUrl = restServer.advertisedUrl();
        String str = advertisedUrl.getHost() + LocalDateTimeSchema.DELIMITER + advertisedUrl.getPort();
        KafkaOffsetBackingStore kafkaOffsetBackingStore = new KafkaOffsetBackingStore();
        kafkaOffsetBackingStore.configure(distributedConfig);
        ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = (ConnectorClientConfigOverridePolicy) plugins.newPlugin(distributedConfig.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG), distributedConfig, ConnectorClientConfigOverridePolicy.class);
        Worker worker = new Worker(str, this.time, plugins, distributedConfig, kafkaOffsetBackingStore, connectorClientConfigOverridePolicy);
        WorkerConfigTransformer configTransformer = worker.configTransformer();
        Converter internalValueConverter = worker.getInternalValueConverter();
        KafkaStatusBackingStore kafkaStatusBackingStore = new KafkaStatusBackingStore(this.time, internalValueConverter);
        kafkaStatusBackingStore.configure(distributedConfig);
        Connect connect = new Connect(new DistributedHerder(distributedConfig, this.time, worker, lookupKafkaClusterId, kafkaStatusBackingStore, new KafkaConfigBackingStore(internalValueConverter, distributedConfig, configTransformer), advertisedUrl.toString(), connectorClientConfigOverridePolicy), restServer);
        log.info("Kafka Connect distributed worker initialization took {}ms", Long.valueOf(this.time.hiResClockMs() - this.initStart));
        try {
            connect.start();
        } catch (Exception e) {
            log.error("Failed to start Connect", (Throwable) e);
            connect.stop();
            Exit.exit(3);
        }
        return connect;
    }
}
