package org.apache.kafka.connect.runtime;

import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.zookeeper.server.quorum.QuorumStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.10.jar:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/runtime/WorkerConnector.class
 */
/* loaded from: input_file:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/runtime/WorkerConnector.class */
public class WorkerConnector {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) WorkerConnector.class);
    private final String connName;
    private final ConnectorStatus.Listener statusListener;
    private final ConnectorContext ctx;
    private final Connector connector;
    private final ConnectorMetricsGroup metrics;
    private Map<String, String> config;
    private State state = State.INIT;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.10.jar:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/runtime/WorkerConnector$ConnectorMetricsGroup.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/runtime/WorkerConnector$ConnectorMetricsGroup.class */
    public class ConnectorMetricsGroup implements ConnectorStatus.Listener, AutoCloseable {
        private volatile AbstractStatus.State state;
        private final ConnectMetrics.MetricGroup metricGroup;
        private final ConnectorStatus.Listener delegate;

        public ConnectorMetricsGroup(ConnectMetrics connectMetrics, AbstractStatus.State state, ConnectorStatus.Listener listener) {
            Objects.requireNonNull(connectMetrics);
            Objects.requireNonNull(WorkerConnector.this.connector);
            Objects.requireNonNull(state);
            Objects.requireNonNull(listener);
            this.delegate = listener;
            this.state = state;
            ConnectMetricsRegistry registry = connectMetrics.registry();
            this.metricGroup = connectMetrics.group(registry.connectorGroupName(), registry.connectorTagName(), WorkerConnector.this.connName);
            this.metricGroup.close();
            this.metricGroup.addImmutableValueMetric(registry.connectorType, WorkerConnector.this.connectorType());
            this.metricGroup.addImmutableValueMetric(registry.connectorClass, WorkerConnector.this.connector.getClass().getName());
            this.metricGroup.addImmutableValueMetric(registry.connectorVersion, WorkerConnector.this.connector.version());
            this.metricGroup.addValueMetric(registry.connectorStatus, new ConnectMetrics.LiteralSupplier<String>() { // from class: org.apache.kafka.connect.runtime.WorkerConnector.ConnectorMetricsGroup.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier
                public String metricValue(long j) {
                    return ConnectorMetricsGroup.this.state.toString().toLowerCase(Locale.getDefault());
                }
            });
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            this.metricGroup.close();
        }

        @Override // org.apache.kafka.connect.runtime.ConnectorStatus.Listener
        public void onStartup(String str) {
            this.state = AbstractStatus.State.RUNNING;
            this.delegate.onStartup(str);
        }

        @Override // org.apache.kafka.connect.runtime.ConnectorStatus.Listener
        public void onShutdown(String str) {
            this.state = AbstractStatus.State.UNASSIGNED;
            this.delegate.onShutdown(str);
        }

        @Override // org.apache.kafka.connect.runtime.ConnectorStatus.Listener
        public void onPause(String str) {
            this.state = AbstractStatus.State.PAUSED;
            this.delegate.onPause(str);
        }

        @Override // org.apache.kafka.connect.runtime.ConnectorStatus.Listener
        public void onResume(String str) {
            this.state = AbstractStatus.State.RUNNING;
            this.delegate.onResume(str);
        }

        @Override // org.apache.kafka.connect.runtime.ConnectorStatus.Listener
        public void onFailure(String str, Throwable th) {
            this.state = AbstractStatus.State.FAILED;
            this.delegate.onFailure(str, th);
        }

        @Override // org.apache.kafka.connect.runtime.ConnectorStatus.Listener
        public void onDeletion(String str) {
            this.state = AbstractStatus.State.DESTROYED;
            this.delegate.onDeletion(str);
        }

        boolean isUnassigned() {
            return this.state == AbstractStatus.State.UNASSIGNED;
        }

        boolean isRunning() {
            return this.state == AbstractStatus.State.RUNNING;
        }

        boolean isPaused() {
            return this.state == AbstractStatus.State.PAUSED;
        }

        boolean isFailed() {
            return this.state == AbstractStatus.State.FAILED;
        }

        protected ConnectMetrics.MetricGroup metricGroup() {
            return this.metricGroup;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.10.jar:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/runtime/WorkerConnector$State.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/runtime/WorkerConnector$State.class */
    public enum State {
        INIT,
        STOPPED,
        STARTED,
        FAILED
    }

    public WorkerConnector(String str, Connector connector, ConnectorContext connectorContext, ConnectMetrics connectMetrics, ConnectorStatus.Listener listener) {
        this.connName = str;
        this.ctx = connectorContext;
        this.connector = connector;
        this.metrics = new ConnectorMetricsGroup(connectMetrics, AbstractStatus.State.UNASSIGNED, listener);
        this.statusListener = this.metrics;
    }

    public void initialize(ConnectorConfig connectorConfig) {
        try {
            this.config = connectorConfig.originalsStrings();
            log.debug("{} Initializing connector {} with config {}", this, this.connName, this.config);
            if (isSinkConnector()) {
                SinkConnectorConfig.validate(this.config);
            }
            this.connector.initialize(new ConnectorContext() { // from class: org.apache.kafka.connect.runtime.WorkerConnector.1
                @Override // org.apache.kafka.connect.connector.ConnectorContext
                public void requestTaskReconfiguration() {
                    WorkerConnector.this.ctx.requestTaskReconfiguration();
                }

                @Override // org.apache.kafka.connect.connector.ConnectorContext
                public void raiseError(Exception exc) {
                    WorkerConnector.log.error("{} Connector raised an error", WorkerConnector.this, exc);
                    WorkerConnector.this.onFailure(exc);
                    WorkerConnector.this.ctx.raiseError(exc);
                }
            });
        } catch (Throwable th) {
            log.error("{} Error initializing connector", this, th);
            onFailure(th);
        }
    }

    private boolean doStart() {
        try {
            switch (this.state) {
                case STARTED:
                    return false;
                case INIT:
                case STOPPED:
                    this.connector.start(this.config);
                    this.state = State.STARTED;
                    return true;
                default:
                    throw new IllegalArgumentException("Cannot start connector in state " + this.state);
            }
        } catch (Throwable th) {
            log.error("{} Error while starting connector", this, th);
            onFailure(th);
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onFailure(Throwable th) {
        this.statusListener.onFailure(this.connName, th);
        this.state = State.FAILED;
    }

    private void resume() {
        if (doStart()) {
            this.statusListener.onResume(this.connName);
        }
    }

    private void start() {
        if (doStart()) {
            this.statusListener.onStartup(this.connName);
        }
    }

    public boolean isRunning() {
        return this.state == State.STARTED;
    }

    private void pause() {
        try {
            switch (this.state) {
                case STARTED:
                    this.connector.stop();
                    break;
                case INIT:
                    break;
                case STOPPED:
                    return;
                default:
                    throw new IllegalArgumentException("Cannot pause connector in state " + this.state);
            }
            this.statusListener.onPause(this.connName);
            this.state = State.STOPPED;
        } catch (Throwable th) {
            log.error("{} Error while shutting down connector", this, th);
            this.statusListener.onFailure(this.connName, th);
            this.state = State.FAILED;
        }
    }

    public void shutdown() {
        try {
            if (this.state == State.STARTED) {
                this.connector.stop();
            }
            this.state = State.STOPPED;
            this.statusListener.onShutdown(this.connName);
        } catch (Throwable th) {
            log.error("{} Error while shutting down connector", this, th);
            this.state = State.FAILED;
            this.statusListener.onFailure(this.connName, th);
        } finally {
            this.metrics.close();
        }
    }

    public void transitionTo(TargetState targetState) {
        if (this.state == State.FAILED) {
            log.warn("{} Cannot transition connector to {} since it has failed", this, targetState);
            return;
        }
        log.debug("{} Transition connector to {}", this, targetState);
        if (targetState == TargetState.PAUSED) {
            pause();
        } else {
            if (targetState != TargetState.STARTED) {
                throw new IllegalArgumentException("Unhandled target state " + targetState);
            }
            if (this.state == State.INIT) {
                start();
            } else {
                resume();
            }
        }
    }

    public boolean isSinkConnector() {
        return SinkConnector.class.isAssignableFrom(this.connector.getClass());
    }

    public boolean isSourceConnector() {
        return SourceConnector.class.isAssignableFrom(this.connector.getClass());
    }

    protected String connectorType() {
        return isSinkConnector() ? "sink" : isSourceConnector() ? "source" : QuorumStats.Provider.UNKNOWN_STATE;
    }

    public Connector connector() {
        return this.connector;
    }

    ConnectorMetricsGroup metrics() {
        return this.metrics;
    }

    public String toString() {
        return "WorkerConnector{id=" + this.connName + '}';
    }
}
