package org.apache.kafka.connect.runtime;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tukaani.xz.common.Util;

/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/connect-runtime-0.10.2.1.jar:org/apache/kafka/connect/runtime/Worker.class
 */
/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.4.1.jar:META-INF/bundled-dependencies/connect-runtime-0.10.2.1.jar:org/apache/kafka/connect/runtime/Worker.class */
public class Worker {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) Worker.class);
    private final Time time;
    private final String workerId;
    private final ConnectorFactory connectorFactory;
    private final WorkerConfig config;
    private final Converter defaultKeyConverter;
    private final Converter defaultValueConverter;
    private final Converter internalKeyConverter;
    private final Converter internalValueConverter;
    private final OffsetBackingStore offsetBackingStore;
    private final Map<String, Object> producerProps;
    private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
    private final ConcurrentMap<String, WorkerConnector> connectors = new ConcurrentHashMap();
    private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap();
    private final ExecutorService executor = Executors.newCachedThreadPool();

    public Worker(String str, Time time, ConnectorFactory connectorFactory, WorkerConfig workerConfig, OffsetBackingStore offsetBackingStore) {
        this.workerId = str;
        this.time = time;
        this.connectorFactory = connectorFactory;
        this.config = workerConfig;
        this.defaultKeyConverter = (Converter) workerConfig.getConfiguredInstance("key.converter", Converter.class);
        this.defaultKeyConverter.configure(workerConfig.originalsWithPrefix("key.converter."), true);
        this.defaultValueConverter = (Converter) workerConfig.getConfiguredInstance("value.converter", Converter.class);
        this.defaultValueConverter.configure(workerConfig.originalsWithPrefix("value.converter."), false);
        this.internalKeyConverter = (Converter) workerConfig.getConfiguredInstance(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Converter.class);
        this.internalKeyConverter.configure(workerConfig.originalsWithPrefix("internal.key.converter."), true);
        this.internalValueConverter = (Converter) workerConfig.getConfiguredInstance(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
        this.internalValueConverter.configure(workerConfig.originalsWithPrefix("internal.value.converter."), false);
        this.offsetBackingStore = offsetBackingStore;
        this.offsetBackingStore.configure(workerConfig);
        this.producerProps = new HashMap();
        this.producerProps.put("bootstrap.servers", Utils.join(workerConfig.getList("bootstrap.servers"), ","));
        this.producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        this.producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        Integer num = Integer.MAX_VALUE;
        this.producerProps.put("request.timeout.ms", num.toString());
        Integer num2 = Integer.MAX_VALUE;
        this.producerProps.put(ProducerConfig.RETRIES_CONFIG, num2.toString());
        this.producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.valueOf(Util.VLI_MAX).toString());
        this.producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
        this.producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
        this.producerProps.putAll(workerConfig.originalsWithPrefix("producer."));
    }

    public void start() {
        log.info("Worker starting");
        this.offsetBackingStore.start();
        this.sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(this.config);
        log.info("Worker started");
    }

    public void stop() {
        log.info("Worker stopping");
        long milliseconds = this.time.milliseconds() + this.config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG).longValue();
        if (!this.connectors.isEmpty()) {
            log.warn("Shutting down connectors {} uncleanly; herder should have shut down connectors before the Worker is stopped", this.connectors.keySet());
            stopConnectors();
        }
        if (!this.tasks.isEmpty()) {
            log.warn("Shutting down tasks {} uncleanly; herder should have shut down tasks before the Worker is stopped", this.tasks.keySet());
            stopAndAwaitTasks();
        }
        this.sourceTaskOffsetCommitter.close(milliseconds - this.time.milliseconds());
        this.offsetBackingStore.stop();
        log.info("Worker stopped");
    }

    public boolean startConnector(String str, Map<String, String> map, ConnectorContext connectorContext, ConnectorStatus.Listener listener, TargetState targetState) {
        if (this.connectors.containsKey(str)) {
            throw new ConnectException("Connector with name " + str + " already exists");
        }
        try {
            ConnectorConfig connectorConfig = new ConnectorConfig(map);
            String string = connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
            log.info("Creating connector {} of type {}", str, string);
            Connector newConnector = this.connectorFactory.newConnector(string);
            WorkerConnector workerConnector = new WorkerConnector(str, newConnector, connectorContext, listener);
            log.info("Instantiated connector {} with version {} of type {}", str, newConnector.version(), newConnector.getClass());
            workerConnector.initialize(connectorConfig);
            workerConnector.transitionTo(targetState);
            if (this.connectors.putIfAbsent(str, workerConnector) != null) {
                throw new ConnectException("Connector with name " + str + " already exists");
            }
            log.info("Finished creating connector {}", str);
            return true;
        } catch (Throwable th) {
            log.error("Failed to start connector {}", str, th);
            listener.onFailure(str, th);
            return false;
        }
    }

    public boolean isSinkConnector(String str) {
        WorkerConnector workerConnector = this.connectors.get(str);
        if (workerConnector == null) {
            throw new ConnectException("Connector " + str + " not found in this worker.");
        }
        return workerConnector.isSinkConnector();
    }

    public List<Map<String, String>> connectorTaskConfigs(String str, int i, List<String> list) {
        log.trace("Reconfiguring connector tasks for {}", str);
        WorkerConnector workerConnector = this.connectors.get(str);
        if (workerConnector == null) {
            throw new ConnectException("Connector " + str + " not found in this worker.");
        }
        Connector connector = workerConnector.connector();
        ArrayList arrayList = new ArrayList();
        String name = connector.taskClass().getName();
        Iterator<Map<String, String>> it = connector.taskConfigs(i).iterator();
        while (it.hasNext()) {
            HashMap hashMap = new HashMap(it.next());
            hashMap.put(TaskConfig.TASK_CLASS_CONFIG, name);
            if (list != null) {
                hashMap.put("topics", Utils.join(list, ","));
            }
            arrayList.add(hashMap);
        }
        return arrayList;
    }

    private void stopConnectors() {
        Iterator<String> it = this.connectors.keySet().iterator();
        while (it.hasNext()) {
            stopConnector(it.next());
        }
    }

    public boolean stopConnector(String str) {
        log.info("Stopping connector {}", str);
        WorkerConnector remove = this.connectors.remove(str);
        if (remove == null) {
            log.warn("Ignoring stop request for unowned connector {}", str);
            return false;
        }
        remove.shutdown();
        log.info("Stopped connector {}", str);
        return true;
    }

    public Set<String> connectorNames() {
        return this.connectors.keySet();
    }

    public boolean isRunning(String str) {
        WorkerConnector workerConnector = this.connectors.get(str);
        return workerConnector != null && workerConnector.isRunning();
    }

    public boolean startTask(ConnectorTaskId connectorTaskId, Map<String, String> map, Map<String, String> map2, TaskStatus.Listener listener, TargetState targetState) {
        log.info("Creating task {}", connectorTaskId);
        if (this.tasks.containsKey(connectorTaskId)) {
            throw new ConnectException("Task already exists in this worker: " + connectorTaskId);
        }
        try {
            ConnectorConfig connectorConfig = new ConnectorConfig(map);
            TaskConfig taskConfig = new TaskConfig(map2);
            Class<? extends U> asSubclass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
            Task newTask = this.connectorFactory.newTask(asSubclass);
            log.info("Instantiated task {} with version {} of type {}", connectorTaskId, newTask.version(), asSubclass.getName());
            Converter converter = (Converter) connectorConfig.getConfiguredInstance("key.converter", Converter.class);
            if (converter != null) {
                converter.configure(connectorConfig.originalsWithPrefix("key.converter."), true);
            } else {
                converter = this.defaultKeyConverter;
            }
            Converter converter2 = (Converter) connectorConfig.getConfiguredInstance("value.converter", Converter.class);
            if (converter2 != null) {
                converter2.configure(connectorConfig.originalsWithPrefix("value.converter."), false);
            } else {
                converter2 = this.defaultValueConverter;
            }
            WorkerTask buildWorkerTask = buildWorkerTask(connectorConfig, connectorTaskId, newTask, listener, targetState, converter, converter2);
            buildWorkerTask.initialize(taskConfig);
            if (this.tasks.putIfAbsent(connectorTaskId, buildWorkerTask) != null) {
                throw new ConnectException("Task already exists in this worker: " + connectorTaskId);
            }
            this.executor.submit(buildWorkerTask);
            if (!(buildWorkerTask instanceof WorkerSourceTask)) {
                return true;
            }
            this.sourceTaskOffsetCommitter.schedule(connectorTaskId, (WorkerSourceTask) buildWorkerTask);
            return true;
        } catch (Throwable th) {
            log.error("Failed to start task {}", connectorTaskId, th);
            listener.onFailure(connectorTaskId, th);
            return false;
        }
    }

    private WorkerTask buildWorkerTask(ConnectorConfig connectorConfig, ConnectorTaskId connectorTaskId, Task task, TaskStatus.Listener listener, TargetState targetState, Converter converter, Converter converter2) {
        if (task instanceof SourceTask) {
            return new WorkerSourceTask(connectorTaskId, (SourceTask) task, listener, targetState, converter, converter2, new TransformationChain(connectorConfig.transformations()), new KafkaProducer(this.producerProps), new OffsetStorageReaderImpl(this.offsetBackingStore, connectorTaskId.connector(), this.internalKeyConverter, this.internalValueConverter), new OffsetStorageWriter(this.offsetBackingStore, connectorTaskId.connector(), this.internalKeyConverter, this.internalValueConverter), this.config, this.time);
        }
        if (task instanceof SinkTask) {
            return new WorkerSinkTask(connectorTaskId, (SinkTask) task, listener, targetState, this.config, converter, converter2, new TransformationChain(connectorConfig.transformations()), this.time);
        }
        log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
        throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask");
    }

    private void stopTask(ConnectorTaskId connectorTaskId) {
        WorkerTask workerTask = this.tasks.get(connectorTaskId);
        if (workerTask == null) {
            log.warn("Ignoring stop request for unowned task {}", connectorTaskId);
            return;
        }
        log.info("Stopping task {}", workerTask.id());
        if (workerTask instanceof WorkerSourceTask) {
            this.sourceTaskOffsetCommitter.remove(workerTask.id());
        }
        workerTask.stop();
    }

    private void stopTasks(Collection<ConnectorTaskId> collection) {
        Iterator<ConnectorTaskId> it = collection.iterator();
        while (it.hasNext()) {
            stopTask(it.next());
        }
    }

    private void awaitStopTask(ConnectorTaskId connectorTaskId, long j) {
        WorkerTask remove = this.tasks.remove(connectorTaskId);
        if (remove == null) {
            log.warn("Ignoring await stop request for non-present task {}", connectorTaskId);
        } else {
            if (remove.awaitStop(j)) {
                return;
            }
            log.error("Graceful stop of task {} failed.", remove.id());
            remove.cancel();
        }
    }

    private void awaitStopTasks(Collection<ConnectorTaskId> collection) {
        long milliseconds = this.time.milliseconds() + this.config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG).longValue();
        Iterator<ConnectorTaskId> it = collection.iterator();
        while (it.hasNext()) {
            awaitStopTask(it.next(), Math.max(0L, milliseconds - this.time.milliseconds()));
        }
    }

    public void stopAndAwaitTasks() {
        stopAndAwaitTasks(new ArrayList(this.tasks.keySet()));
    }

    public void stopAndAwaitTasks(Collection<ConnectorTaskId> collection) {
        stopTasks(collection);
        awaitStopTasks(collection);
    }

    public void stopAndAwaitTask(ConnectorTaskId connectorTaskId) {
        stopTask(connectorTaskId);
        awaitStopTasks(Collections.singletonList(connectorTaskId));
    }

    public Set<ConnectorTaskId> taskIds() {
        return this.tasks.keySet();
    }

    public Converter getInternalKeyConverter() {
        return this.internalKeyConverter;
    }

    public Converter getInternalValueConverter() {
        return this.internalValueConverter;
    }

    public ConnectorFactory getConnectorFactory() {
        return this.connectorFactory;
    }

    public String workerId() {
        return this.workerId;
    }

    public void setTargetState(String str, TargetState targetState) {
        log.info("Setting connector {} state to {}", str, targetState);
        WorkerConnector workerConnector = this.connectors.get(str);
        if (workerConnector != null) {
            workerConnector.transitionTo(targetState);
        }
        for (Map.Entry<ConnectorTaskId, WorkerTask> entry : this.tasks.entrySet()) {
            if (entry.getKey().connector().equals(str)) {
                entry.getValue().transitionTo(targetState);
            }
        }
    }
}
