package org.apache.kafka.connect.runtime;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Frequencies;
import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.health.ConnectorType;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.LogReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.LoggingContext;
import org.apache.kafka.connect.util.SinkUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/runtime/Worker.class
 */
/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.5.1.jar:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/runtime/Worker.class */
public class Worker {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) Worker.class);
    protected Herder herder;
    private final ExecutorService executor;
    private final Time time;
    private final String workerId;
    private final Plugins plugins;
    private final ConnectMetrics metrics;
    private final WorkerMetricsGroup workerMetricsGroup;
    private final WorkerConfig config;
    private final Converter internalKeyConverter;
    private final Converter internalValueConverter;
    private final OffsetBackingStore offsetBackingStore;
    private final ConcurrentMap<String, WorkerConnector> connectors;
    private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks;
    private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
    private WorkerConfigTransformer workerConfigTransformer;
    private ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/runtime/Worker$WorkerMetricsGroup.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.5.1.jar:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/runtime/Worker$WorkerMetricsGroup.class */
    public class WorkerMetricsGroup {
        private final ConnectMetrics.MetricGroup metricGroup;
        private final Sensor connectorStartupAttempts;
        private final Sensor connectorStartupSuccesses;
        private final Sensor connectorStartupFailures;
        private final Sensor connectorStartupResults;
        private final Sensor taskStartupAttempts;
        private final Sensor taskStartupSuccesses;
        private final Sensor taskStartupFailures;
        private final Sensor taskStartupResults;

        public WorkerMetricsGroup(ConnectMetrics connectMetrics) {
            ConnectMetricsRegistry registry = connectMetrics.registry();
            this.metricGroup = connectMetrics.group(registry.workerGroupName(), new String[0]);
            this.metricGroup.addValueMetric(registry.connectorCount, new ConnectMetrics.LiteralSupplier<Double>() { // from class: org.apache.kafka.connect.runtime.Worker.WorkerMetricsGroup.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier
                public Double metricValue(long j) {
                    return Double.valueOf(Worker.this.connectors.size());
                }
            });
            this.metricGroup.addValueMetric(registry.taskCount, new ConnectMetrics.LiteralSupplier<Double>() { // from class: org.apache.kafka.connect.runtime.Worker.WorkerMetricsGroup.2
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier
                public Double metricValue(long j) {
                    return Double.valueOf(Worker.this.tasks.size());
                }
            });
            Frequencies forBooleanValues = Frequencies.forBooleanValues(this.metricGroup.metricName(registry.connectorStartupFailurePercentage), this.metricGroup.metricName(registry.connectorStartupSuccessPercentage));
            this.connectorStartupResults = this.metricGroup.sensor("connector-startup-results");
            this.connectorStartupResults.add(forBooleanValues);
            this.connectorStartupAttempts = this.metricGroup.sensor("connector-startup-attempts");
            this.connectorStartupAttempts.add(this.metricGroup.metricName(registry.connectorStartupAttemptsTotal), new Total());
            this.connectorStartupSuccesses = this.metricGroup.sensor("connector-startup-successes");
            this.connectorStartupSuccesses.add(this.metricGroup.metricName(registry.connectorStartupSuccessTotal), new Total());
            this.connectorStartupFailures = this.metricGroup.sensor("connector-startup-failures");
            this.connectorStartupFailures.add(this.metricGroup.metricName(registry.connectorStartupFailureTotal), new Total());
            Frequencies forBooleanValues2 = Frequencies.forBooleanValues(this.metricGroup.metricName(registry.taskStartupFailurePercentage), this.metricGroup.metricName(registry.taskStartupSuccessPercentage));
            this.taskStartupResults = this.metricGroup.sensor("task-startup-results");
            this.taskStartupResults.add(forBooleanValues2);
            this.taskStartupAttempts = this.metricGroup.sensor("task-startup-attempts");
            this.taskStartupAttempts.add(this.metricGroup.metricName(registry.taskStartupAttemptsTotal), new Total());
            this.taskStartupSuccesses = this.metricGroup.sensor("task-startup-successes");
            this.taskStartupSuccesses.add(this.metricGroup.metricName(registry.taskStartupSuccessTotal), new Total());
            this.taskStartupFailures = this.metricGroup.sensor("task-startup-failures");
            this.taskStartupFailures.add(this.metricGroup.metricName(registry.taskStartupFailureTotal), new Total());
        }

        void close() {
            this.metricGroup.close();
        }

        void recordConnectorStartupFailure() {
            this.connectorStartupAttempts.record(1.0d);
            this.connectorStartupFailures.record(1.0d);
            this.connectorStartupResults.record(0.0d);
        }

        void recordConnectorStartupSuccess() {
            this.connectorStartupAttempts.record(1.0d);
            this.connectorStartupSuccesses.record(1.0d);
            this.connectorStartupResults.record(1.0d);
        }

        void recordTaskFailure() {
            this.taskStartupAttempts.record(1.0d);
            this.taskStartupFailures.record(1.0d);
            this.taskStartupResults.record(0.0d);
        }

        void recordTaskSuccess() {
            this.taskStartupAttempts.record(1.0d);
            this.taskStartupSuccesses.record(1.0d);
            this.taskStartupResults.record(1.0d);
        }

        protected ConnectMetrics.MetricGroup metricGroup() {
            return this.metricGroup;
        }
    }

    public Worker(String str, Time time, Plugins plugins, WorkerConfig workerConfig, OffsetBackingStore offsetBackingStore, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
        this(str, time, plugins, workerConfig, offsetBackingStore, Executors.newCachedThreadPool(), connectorClientConfigOverridePolicy);
    }

    Worker(String str, Time time, Plugins plugins, WorkerConfig workerConfig, OffsetBackingStore offsetBackingStore, ExecutorService executorService, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
        this.connectors = new ConcurrentHashMap();
        this.tasks = new ConcurrentHashMap();
        this.metrics = new ConnectMetrics(str, workerConfig, time);
        this.executor = executorService;
        this.workerId = str;
        this.time = time;
        this.plugins = plugins;
        this.config = workerConfig;
        this.connectorClientConfigOverridePolicy = connectorClientConfigOverridePolicy;
        this.workerMetricsGroup = new WorkerMetricsGroup(this.metrics);
        this.internalKeyConverter = plugins.newConverter(workerConfig, WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage.PLUGINS);
        this.internalValueConverter = plugins.newConverter(workerConfig, WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage.PLUGINS);
        this.offsetBackingStore = offsetBackingStore;
        this.offsetBackingStore.configure(workerConfig);
        this.workerConfigTransformer = initConfigTransformer();
    }

    private WorkerConfigTransformer initConfigTransformer() {
        List<String> list = this.config.getList(WorkerConfig.CONFIG_PROVIDERS_CONFIG);
        HashMap hashMap = new HashMap();
        for (String str : list) {
            hashMap.put(str, this.plugins.newConfigProvider(this.config, "config.providers." + str, Plugins.ClassLoaderUsage.PLUGINS));
        }
        return new WorkerConfigTransformer(this, hashMap);
    }

    public WorkerConfigTransformer configTransformer() {
        return this.workerConfigTransformer;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Herder herder() {
        return this.herder;
    }

    public void start() {
        log.info("Worker starting");
        this.offsetBackingStore.start();
        this.sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(this.config);
        log.info("Worker started");
    }

    public void stop() {
        log.info("Worker stopping");
        long milliseconds = this.time.milliseconds() + this.config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG).longValue();
        if (!this.connectors.isEmpty()) {
            log.warn("Shutting down connectors {} uncleanly; herder should have shut down connectors before the Worker is stopped", this.connectors.keySet());
            stopConnectors();
        }
        if (!this.tasks.isEmpty()) {
            log.warn("Shutting down tasks {} uncleanly; herder should have shut down tasks before the Worker is stopped", this.tasks.keySet());
            stopAndAwaitTasks();
        }
        this.sourceTaskOffsetCommitter.close(milliseconds - this.time.milliseconds());
        this.offsetBackingStore.stop();
        this.metrics.stop();
        log.info("Worker stopped");
        this.workerMetricsGroup.close();
    }

    public boolean startConnector(String str, Map<String, String> map, ConnectorContext connectorContext, ConnectorStatus.Listener listener, TargetState targetState) {
        LoggingContext forConnector = LoggingContext.forConnector(str);
        Throwable th = null;
        try {
            if (this.connectors.containsKey(str)) {
                throw new ConnectException("Connector with name " + str + " already exists");
            }
            ClassLoader currentThreadLoader = this.plugins.currentThreadLoader();
            try {
                ConnectorConfig connectorConfig = new ConnectorConfig(this.plugins, map);
                String string = connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
                log.info("Creating connector {} of type {}", str, string);
                Connector newConnector = this.plugins.newConnector(string);
                WorkerConnector workerConnector = new WorkerConnector(str, newConnector, connectorContext, this.metrics, listener);
                log.info("Instantiated connector {} with version {} of type {}", str, newConnector.version(), newConnector.getClass());
                currentThreadLoader = this.plugins.compareAndSwapLoaders(newConnector);
                workerConnector.initialize(connectorConfig);
                workerConnector.transitionTo(targetState);
                Plugins.compareAndSwapLoaders(currentThreadLoader);
                if (this.connectors.putIfAbsent(str, workerConnector) != null) {
                    throw new ConnectException("Connector with name " + str + " already exists");
                }
                log.info("Finished creating connector {}", str);
                this.workerMetricsGroup.recordConnectorStartupSuccess();
                if (forConnector == null) {
                    return true;
                }
                if (0 == 0) {
                    forConnector.close();
                    return true;
                }
                try {
                    forConnector.close();
                    return true;
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                    return true;
                }
            } catch (Throwable th3) {
                log.error("Failed to start connector {}", str, th3);
                Plugins.compareAndSwapLoaders(currentThreadLoader);
                this.workerMetricsGroup.recordConnectorStartupFailure();
                listener.onFailure(str, th3);
                if (forConnector != null) {
                    if (0 != 0) {
                        try {
                            forConnector.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        forConnector.close();
                    }
                }
                return false;
            }
        } catch (Throwable th5) {
            if (forConnector != null) {
                if (0 != 0) {
                    try {
                        forConnector.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    forConnector.close();
                }
            }
            throw th5;
        }
    }

    public boolean isSinkConnector(String str) {
        WorkerConnector workerConnector = this.connectors.get(str);
        if (workerConnector == null) {
            throw new ConnectException("Connector " + str + " not found in this worker.");
        }
        ClassLoader currentThreadLoader = this.plugins.currentThreadLoader();
        try {
            currentThreadLoader = this.plugins.compareAndSwapLoaders(workerConnector.connector());
            boolean isSinkConnector = workerConnector.isSinkConnector();
            Plugins.compareAndSwapLoaders(currentThreadLoader);
            return isSinkConnector;
        } catch (Throwable th) {
            Plugins.compareAndSwapLoaders(currentThreadLoader);
            throw th;
        }
    }

    public List<Map<String, String>> connectorTaskConfigs(String str, ConnectorConfig connectorConfig) {
        ArrayList arrayList = new ArrayList();
        LoggingContext forConnector = LoggingContext.forConnector(str);
        Throwable th = null;
        try {
            log.trace("Reconfiguring connector tasks for {}", str);
            WorkerConnector workerConnector = this.connectors.get(str);
            if (workerConnector == null) {
                throw new ConnectException("Connector " + str + " not found in this worker.");
            }
            int intValue = connectorConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG).intValue();
            Map<String, String> originalsStrings = connectorConfig.originalsStrings();
            Connector connector = workerConnector.connector();
            ClassLoader currentThreadLoader = this.plugins.currentThreadLoader();
            try {
                currentThreadLoader = this.plugins.compareAndSwapLoaders(connector);
                String name = connector.taskClass().getName();
                Iterator<Map<String, String>> it = connector.taskConfigs(intValue).iterator();
                while (it.hasNext()) {
                    HashMap hashMap = new HashMap(it.next());
                    hashMap.put(TaskConfig.TASK_CLASS_CONFIG, name);
                    if (originalsStrings.containsKey("topics")) {
                        hashMap.put("topics", originalsStrings.get("topics"));
                    }
                    if (originalsStrings.containsKey("topics.regex")) {
                        hashMap.put("topics.regex", originalsStrings.get("topics.regex"));
                    }
                    arrayList.add(hashMap);
                }
                Plugins.compareAndSwapLoaders(currentThreadLoader);
                return arrayList;
            } catch (Throwable th2) {
                Plugins.compareAndSwapLoaders(currentThreadLoader);
                throw th2;
            }
        } finally {
            if (forConnector != null) {
                if (0 != 0) {
                    try {
                        forConnector.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                } else {
                    forConnector.close();
                }
            }
        }
    }

    private void stopConnectors() {
        Iterator<String> it = this.connectors.keySet().iterator();
        while (it.hasNext()) {
            stopConnector(it.next());
        }
    }

    public boolean stopConnector(String str) {
        LoggingContext forConnector = LoggingContext.forConnector(str);
        Throwable th = null;
        try {
            log.info("Stopping connector {}", str);
            WorkerConnector remove = this.connectors.remove(str);
            if (remove == null) {
                log.warn("Ignoring stop request for unowned connector {}", str);
                if (forConnector != null) {
                    if (0 != 0) {
                        try {
                            forConnector.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        forConnector.close();
                    }
                }
                return false;
            }
            ClassLoader currentThreadLoader = this.plugins.currentThreadLoader();
            try {
                currentThreadLoader = this.plugins.compareAndSwapLoaders(remove.connector());
                remove.shutdown();
                Plugins.compareAndSwapLoaders(currentThreadLoader);
                log.info("Stopped connector {}", str);
                if (forConnector == null) {
                    return true;
                }
                if (0 == 0) {
                    forConnector.close();
                    return true;
                }
                try {
                    forConnector.close();
                    return true;
                } catch (Throwable th3) {
                    th.addSuppressed(th3);
                    return true;
                }
            } catch (Throwable th4) {
                Plugins.compareAndSwapLoaders(currentThreadLoader);
                throw th4;
            }
        } catch (Throwable th5) {
            if (forConnector != null) {
                if (0 != 0) {
                    try {
                        forConnector.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    forConnector.close();
                }
            }
            throw th5;
        }
    }

    public Set<String> connectorNames() {
        return this.connectors.keySet();
    }

    public boolean isRunning(String str) {
        WorkerConnector workerConnector = this.connectors.get(str);
        return workerConnector != null && workerConnector.isRunning();
    }

    public boolean startTask(ConnectorTaskId connectorTaskId, ClusterConfigState clusterConfigState, Map<String, String> map, Map<String, String> map2, TaskStatus.Listener listener, TargetState targetState) {
        LoggingContext forTask = LoggingContext.forTask(connectorTaskId);
        Throwable th = null;
        try {
            log.info("Creating task {}", connectorTaskId);
            if (this.tasks.containsKey(connectorTaskId)) {
                throw new ConnectException("Task already exists in this worker: " + connectorTaskId);
            }
            ClassLoader currentThreadLoader = this.plugins.currentThreadLoader();
            try {
                ConnectorConfig connectorConfig = new ConnectorConfig(this.plugins, map);
                ClassLoader connectorLoader = this.plugins.delegatingLoader().connectorLoader(connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
                currentThreadLoader = Plugins.compareAndSwapLoaders(connectorLoader);
                TaskConfig taskConfig = new TaskConfig(map2);
                Class<? extends U> asSubclass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
                Task newTask = this.plugins.newTask(asSubclass);
                log.info("Instantiated task {} with version {} of type {}", connectorTaskId, newTask.version(), asSubclass.getName());
                Converter newConverter = this.plugins.newConverter(connectorConfig, "key.converter", Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER);
                Converter newConverter2 = this.plugins.newConverter(connectorConfig, "value.converter", Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER);
                HeaderConverter newHeaderConverter = this.plugins.newHeaderConverter(connectorConfig, "header.converter", Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER);
                if (newConverter == null) {
                    newConverter = this.plugins.newConverter(this.config, "key.converter", Plugins.ClassLoaderUsage.PLUGINS);
                    log.info("Set up the key converter {} for task {} using the worker config", newConverter.getClass(), connectorTaskId);
                } else {
                    log.info("Set up the key converter {} for task {} using the connector config", newConverter.getClass(), connectorTaskId);
                }
                if (newConverter2 == null) {
                    newConverter2 = this.plugins.newConverter(this.config, "value.converter", Plugins.ClassLoaderUsage.PLUGINS);
                    log.info("Set up the value converter {} for task {} using the worker config", newConverter2.getClass(), connectorTaskId);
                } else {
                    log.info("Set up the value converter {} for task {} using the connector config", newConverter2.getClass(), connectorTaskId);
                }
                if (newHeaderConverter == null) {
                    newHeaderConverter = this.plugins.newHeaderConverter(this.config, "header.converter", Plugins.ClassLoaderUsage.PLUGINS);
                    log.info("Set up the header converter {} for task {} using the worker config", newHeaderConverter.getClass(), connectorTaskId);
                } else {
                    log.info("Set up the header converter {} for task {} using the connector config", newHeaderConverter.getClass(), connectorTaskId);
                }
                WorkerTask buildWorkerTask = buildWorkerTask(clusterConfigState, connectorConfig, connectorTaskId, newTask, listener, targetState, newConverter, newConverter2, newHeaderConverter, connectorLoader);
                buildWorkerTask.initialize(taskConfig);
                Plugins.compareAndSwapLoaders(currentThreadLoader);
                if (this.tasks.putIfAbsent(connectorTaskId, buildWorkerTask) != null) {
                    throw new ConnectException("Task already exists in this worker: " + connectorTaskId);
                }
                this.executor.submit(buildWorkerTask);
                if (buildWorkerTask instanceof WorkerSourceTask) {
                    this.sourceTaskOffsetCommitter.schedule(connectorTaskId, (WorkerSourceTask) buildWorkerTask);
                }
                this.workerMetricsGroup.recordTaskSuccess();
                if (forTask != null) {
                    if (0 != 0) {
                        try {
                            forTask.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        forTask.close();
                    }
                }
                return true;
            } catch (Throwable th3) {
                log.error("Failed to start task {}", connectorTaskId, th3);
                Plugins.compareAndSwapLoaders(currentThreadLoader);
                this.workerMetricsGroup.recordTaskFailure();
                listener.onFailure(connectorTaskId, th3);
                if (forTask != null) {
                    if (0 != 0) {
                        try {
                            forTask.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        forTask.close();
                    }
                }
                return false;
            }
        } catch (Throwable th5) {
            if (forTask != null) {
                if (0 != 0) {
                    try {
                        forTask.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    forTask.close();
                }
            }
            throw th5;
        }
    }

    private WorkerTask buildWorkerTask(ClusterConfigState clusterConfigState, ConnectorConfig connectorConfig, ConnectorTaskId connectorTaskId, Task task, TaskStatus.Listener listener, TargetState targetState, Converter converter, Converter converter2, HeaderConverter headerConverter, ClassLoader classLoader) {
        ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(connectorTaskId);
        Class<? extends Connector> connectorClass = this.plugins.connectorClass(connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(connectorConfig.errorRetryTimeout(), connectorConfig.errorMaxDelayInMillis(), connectorConfig.errorToleranceType(), Time.SYSTEM);
        retryWithToleranceOperator.metrics(errorHandlingMetrics);
        if (task instanceof SourceTask) {
            retryWithToleranceOperator.reporters(sourceTaskReporters(connectorTaskId, connectorConfig, errorHandlingMetrics));
            TransformationChain transformationChain = new TransformationChain(connectorConfig.transformations(), retryWithToleranceOperator);
            log.info("Initializing: {}", transformationChain);
            return new WorkerSourceTask(connectorTaskId, (SourceTask) task, listener, targetState, converter, converter2, headerConverter, transformationChain, new KafkaProducer(producerConfigs(connectorTaskId, "connector-producer-" + connectorTaskId, this.config, connectorConfig, connectorClass, this.connectorClientConfigOverridePolicy)), new OffsetStorageReaderImpl(this.offsetBackingStore, connectorTaskId.connector(), this.internalKeyConverter, this.internalValueConverter), new OffsetStorageWriter(this.offsetBackingStore, connectorTaskId.connector(), this.internalKeyConverter, this.internalValueConverter), this.config, clusterConfigState, this.metrics, classLoader, this.time, retryWithToleranceOperator);
        }
        if (!(task instanceof SinkTask)) {
            log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
            throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask");
        }
        TransformationChain transformationChain2 = new TransformationChain(connectorConfig.transformations(), retryWithToleranceOperator);
        log.info("Initializing: {}", transformationChain2);
        retryWithToleranceOperator.reporters(sinkTaskReporters(connectorTaskId, new SinkConnectorConfig(this.plugins, connectorConfig.originalsStrings()), errorHandlingMetrics, connectorClass));
        return new WorkerSinkTask(connectorTaskId, (SinkTask) task, listener, targetState, this.config, clusterConfigState, this.metrics, converter, converter2, headerConverter, transformationChain2, new KafkaConsumer(consumerConfigs(connectorTaskId, this.config, connectorConfig, connectorClass, this.connectorClientConfigOverridePolicy)), classLoader, this.time, retryWithToleranceOperator);
    }

    static Map<String, Object> producerConfigs(ConnectorTaskId connectorTaskId, String str, WorkerConfig workerConfig, ConnectorConfig connectorConfig, Class<? extends Connector> cls, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", Utils.join(workerConfig.getList("bootstrap.servers"), ","));
        hashMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        hashMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        hashMap.put("request.timeout.ms", Integer.toString(Integer.MAX_VALUE));
        hashMap.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE));
        hashMap.put(ProducerConfig.ACKS_CONFIG, "all");
        hashMap.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
        hashMap.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
        hashMap.put("client.id", str);
        hashMap.putAll(workerConfig.originalsWithPrefix("producer."));
        hashMap.putAll(connectorClientConfigOverrides(connectorTaskId, connectorConfig, cls, ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX, ConnectorType.SOURCE, ConnectorClientConfigRequest.ClientType.PRODUCER, connectorClientConfigOverridePolicy));
        return hashMap;
    }

    static Map<String, Object> consumerConfigs(ConnectorTaskId connectorTaskId, WorkerConfig workerConfig, ConnectorConfig connectorConfig, Class<? extends Connector> cls, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
        HashMap hashMap = new HashMap();
        hashMap.put("group.id", SinkUtils.consumerGroupId(connectorTaskId.connector()));
        hashMap.put("client.id", "connector-consumer-" + connectorTaskId);
        hashMap.put("bootstrap.servers", Utils.join(workerConfig.getList("bootstrap.servers"), ","));
        hashMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        hashMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        hashMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        hashMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        hashMap.putAll(workerConfig.originalsWithPrefix("consumer."));
        hashMap.putAll(connectorClientConfigOverrides(connectorTaskId, connectorConfig, cls, ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX, ConnectorType.SINK, ConnectorClientConfigRequest.ClientType.CONSUMER, connectorClientConfigOverridePolicy));
        return hashMap;
    }

    static Map<String, Object> adminConfigs(ConnectorTaskId connectorTaskId, WorkerConfig workerConfig, ConnectorConfig connectorConfig, Class<? extends Connector> cls, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", Utils.join(workerConfig.getList("bootstrap.servers"), ","));
        hashMap.putAll(workerConfig.originalsWithPrefix("admin."));
        hashMap.putAll(connectorClientConfigOverrides(connectorTaskId, connectorConfig, cls, ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX, ConnectorType.SINK, ConnectorClientConfigRequest.ClientType.ADMIN, connectorClientConfigOverridePolicy));
        return hashMap;
    }

    private static Map<String, Object> connectorClientConfigOverrides(ConnectorTaskId connectorTaskId, ConnectorConfig connectorConfig, Class<? extends Connector> cls, String str, ConnectorType connectorType, ConnectorClientConfigRequest.ClientType clientType, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
        Map<String, Object> originalsWithPrefix = connectorConfig.originalsWithPrefix(str);
        List list = (List) connectorClientConfigOverridePolicy.validate(new ConnectorClientConfigRequest(connectorTaskId.connector(), connectorType, cls, originalsWithPrefix, clientType)).stream().filter(configValue -> {
            return configValue.errorMessages().size() > 0;
        }).collect(Collectors.toList());
        if (list.size() > 0) {
            throw new ConnectException("Client Config Overrides not allowed " + list);
        }
        return originalsWithPrefix;
    }

    ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId connectorTaskId) {
        return new ErrorHandlingMetrics(connectorTaskId, this.metrics);
    }

    private List<ErrorReporter> sinkTaskReporters(ConnectorTaskId connectorTaskId, SinkConnectorConfig sinkConnectorConfig, ErrorHandlingMetrics errorHandlingMetrics, Class<? extends Connector> cls) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new LogReporter(connectorTaskId, sinkConnectorConfig, errorHandlingMetrics));
        String dlqTopicName = sinkConnectorConfig.dlqTopicName();
        if (dlqTopicName != null && !dlqTopicName.isEmpty()) {
            arrayList.add(DeadLetterQueueReporter.createAndSetup(adminConfigs(connectorTaskId, this.config, sinkConnectorConfig, cls, this.connectorClientConfigOverridePolicy), connectorTaskId, sinkConnectorConfig, producerConfigs(connectorTaskId, "connector-dlq-producer-" + connectorTaskId, this.config, sinkConnectorConfig, cls, this.connectorClientConfigOverridePolicy), errorHandlingMetrics));
        }
        return arrayList;
    }

    private List<ErrorReporter> sourceTaskReporters(ConnectorTaskId connectorTaskId, ConnectorConfig connectorConfig, ErrorHandlingMetrics errorHandlingMetrics) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new LogReporter(connectorTaskId, connectorConfig, errorHandlingMetrics));
        return arrayList;
    }

    private void stopTask(ConnectorTaskId connectorTaskId) {
        LoggingContext forTask = LoggingContext.forTask(connectorTaskId);
        Throwable th = null;
        try {
            WorkerTask workerTask = this.tasks.get(connectorTaskId);
            if (workerTask == null) {
                log.warn("Ignoring stop request for unowned task {}", connectorTaskId);
                if (forTask != null) {
                    if (0 == 0) {
                        forTask.close();
                        return;
                    }
                    try {
                        forTask.close();
                        return;
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                        return;
                    }
                }
                return;
            }
            log.info("Stopping task {}", workerTask.id());
            if (workerTask instanceof WorkerSourceTask) {
                this.sourceTaskOffsetCommitter.remove(workerTask.id());
            }
            ClassLoader currentThreadLoader = this.plugins.currentThreadLoader();
            try {
                currentThreadLoader = Plugins.compareAndSwapLoaders(workerTask.loader());
                workerTask.stop();
                Plugins.compareAndSwapLoaders(currentThreadLoader);
                if (forTask != null) {
                    if (0 == 0) {
                        forTask.close();
                        return;
                    }
                    try {
                        forTask.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                }
            } catch (Throwable th4) {
                Plugins.compareAndSwapLoaders(currentThreadLoader);
                throw th4;
            }
        } catch (Throwable th5) {
            if (forTask != null) {
                if (0 != 0) {
                    try {
                        forTask.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    forTask.close();
                }
            }
            throw th5;
        }
    }

    private void stopTasks(Collection<ConnectorTaskId> collection) {
        Iterator<ConnectorTaskId> it = collection.iterator();
        while (it.hasNext()) {
            stopTask(it.next());
        }
    }

    private void awaitStopTask(ConnectorTaskId connectorTaskId, long j) {
        LoggingContext forTask = LoggingContext.forTask(connectorTaskId);
        Throwable th = null;
        try {
            WorkerTask remove = this.tasks.remove(connectorTaskId);
            if (remove == null) {
                log.warn("Ignoring await stop request for non-present task {}", connectorTaskId);
                if (forTask != null) {
                    if (0 == 0) {
                        forTask.close();
                        return;
                    }
                    try {
                        forTask.close();
                        return;
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                        return;
                    }
                }
                return;
            }
            if (remove.awaitStop(j)) {
                log.debug("Graceful stop of task {} succeeded.", remove.id());
            } else {
                log.error("Graceful stop of task {} failed.", remove.id());
                remove.cancel();
            }
            if (forTask != null) {
                if (0 == 0) {
                    forTask.close();
                    return;
                }
                try {
                    forTask.close();
                } catch (Throwable th3) {
                    th.addSuppressed(th3);
                }
            }
        } catch (Throwable th4) {
            if (forTask != null) {
                if (0 != 0) {
                    try {
                        forTask.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    forTask.close();
                }
            }
            throw th4;
        }
    }

    private void awaitStopTasks(Collection<ConnectorTaskId> collection) {
        long milliseconds = this.time.milliseconds() + this.config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG).longValue();
        Iterator<ConnectorTaskId> it = collection.iterator();
        while (it.hasNext()) {
            awaitStopTask(it.next(), Math.max(0L, milliseconds - this.time.milliseconds()));
        }
    }

    public void stopAndAwaitTasks() {
        stopAndAwaitTasks(new ArrayList(this.tasks.keySet()));
    }

    public void stopAndAwaitTasks(Collection<ConnectorTaskId> collection) {
        stopTasks(collection);
        awaitStopTasks(collection);
    }

    public void stopAndAwaitTask(ConnectorTaskId connectorTaskId) {
        stopTask(connectorTaskId);
        awaitStopTasks(Collections.singletonList(connectorTaskId));
    }

    public Set<ConnectorTaskId> taskIds() {
        return this.tasks.keySet();
    }

    public Converter getInternalKeyConverter() {
        return this.internalKeyConverter;
    }

    public Converter getInternalValueConverter() {
        return this.internalValueConverter;
    }

    public Plugins getPlugins() {
        return this.plugins;
    }

    public String workerId() {
        return this.workerId;
    }

    public ConnectMetrics metrics() {
        return this.metrics;
    }

    public void setTargetState(String str, TargetState targetState) {
        log.info("Setting connector {} state to {}", str, targetState);
        WorkerConnector workerConnector = this.connectors.get(str);
        if (workerConnector != null) {
            transitionTo(workerConnector, targetState, this.plugins.delegatingLoader().connectorLoader(workerConnector.connector()));
        }
        for (Map.Entry<ConnectorTaskId, WorkerTask> entry : this.tasks.entrySet()) {
            if (entry.getKey().connector().equals(str)) {
                WorkerTask value = entry.getValue();
                transitionTo(value, targetState, value.loader());
            }
        }
    }

    private void transitionTo(Object obj, TargetState targetState, ClassLoader classLoader) {
        ClassLoader currentThreadLoader = this.plugins.currentThreadLoader();
        try {
            ClassLoader compareAndSwapLoaders = Plugins.compareAndSwapLoaders(classLoader);
            if (obj instanceof WorkerConnector) {
                ((WorkerConnector) obj).transitionTo(targetState);
            } else {
                if (!(obj instanceof WorkerTask)) {
                    throw new ConnectException("Request for state transition on an object that is neither a WorkerConnector nor a WorkerTask: " + obj.getClass());
                }
                ((WorkerTask) obj).transitionTo(targetState);
            }
            Plugins.compareAndSwapLoaders(compareAndSwapLoaders);
        } catch (Throwable th) {
            Plugins.compareAndSwapLoaders(currentThreadLoader);
            throw th;
        }
    }

    WorkerMetricsGroup workerMetricsGroup() {
        return this.workerMetricsGroup;
    }
}
