package org.apache.kafka.connect.runtime;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.health.ConnectorType;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkConnectorContext;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceConnectorContext;
import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@ExtendWith({MockitoExtension.class})
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
/* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerConnectorTest.class */
public class WorkerConnectorTest {
    private static final String VERSION = "1.1";
    public static final String CONNECTOR = "connector";
    public static final Map<String, String> CONFIG = new HashMap();
    public ConnectorConfig connectorConfig;
    public MockConnectMetrics metrics;

    @Mock
    private Plugins plugins;

    @Mock
    private CloseableConnectorContext ctx;

    @Mock
    private ConnectorStatus.Listener listener;

    @Mock
    private ClassLoader classLoader;
    private ConnectorType connectorType;
    private Connector connector;
    private CloseableOffsetStorageReader offsetStorageReader;
    private ConnectorOffsetBackingStore offsetStore;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.kafka.connect.runtime.WorkerConnectorTest$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerConnectorTest$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$connect$health$ConnectorType = new int[ConnectorType.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$connect$health$ConnectorType[ConnectorType.SINK.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$connect$health$ConnectorType[ConnectorType.SOURCE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerConnectorTest$TestConnector.class */
    private static abstract class TestConnector extends Connector {
        private TestConnector() {
        }
    }

    private void setConnector(ConnectorType connectorType) {
        this.connectorType = connectorType;
        switch (AnonymousClass1.$SwitchMap$org$apache$kafka$connect$health$ConnectorType[connectorType.ordinal()]) {
            case 1:
                this.connector = (Connector) Mockito.mock(SinkConnector.class);
                this.offsetStorageReader = null;
                this.offsetStore = null;
                return;
            case 2:
                this.connector = (Connector) Mockito.mock(SourceConnector.class);
                this.offsetStorageReader = (CloseableOffsetStorageReader) Mockito.mock(CloseableOffsetStorageReader.class);
                this.offsetStore = (ConnectorOffsetBackingStore) Mockito.mock(ConnectorOffsetBackingStore.class);
                return;
            default:
                throw new IllegalStateException("Unexpected connector type: " + String.valueOf(connectorType));
        }
    }

    public void setup(ConnectorType connectorType) {
        setConnector(connectorType);
        this.connectorConfig = new ConnectorConfig(this.plugins, CONFIG);
        this.metrics = new MockConnectMetrics();
    }

    @AfterEach
    public void tearDown() {
        if (this.metrics != null) {
            this.metrics.stop();
        }
    }

    @EnumSource(value = ConnectorType.class, names = {"SOURCE", "SINK"})
    @ParameterizedTest
    public void testInitializeFailure(ConnectorType connectorType) {
        setup(connectorType);
        RuntimeException runtimeException = new RuntimeException();
        Mockito.when(this.connector.version()).thenReturn(VERSION);
        ((Connector) Mockito.doThrow(new Throwable[]{runtimeException}).when(this.connector)).initialize((ConnectorContext) ArgumentMatchers.any());
        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, this.connector, this.connectorConfig, this.ctx, this.metrics, this.listener, this.offsetStorageReader, this.offsetStore, this.classLoader);
        workerConnector.initialize();
        assertFailedMetric(workerConnector);
        workerConnector.shutdown();
        workerConnector.doShutdown();
        assertDestroyedMetric(workerConnector);
        verifyInitialize();
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onFailure(CONNECTOR, runtimeException);
        verifyCleanShutdown(false);
    }

    @EnumSource(value = ConnectorType.class, names = {"SOURCE", "SINK"})
    @ParameterizedTest
    public void testFailureIsFinalState(ConnectorType connectorType) {
        setup(connectorType);
        RuntimeException runtimeException = new RuntimeException();
        Mockito.when(this.connector.version()).thenReturn(VERSION);
        ((Connector) Mockito.doThrow(new Throwable[]{runtimeException}).when(this.connector)).initialize((ConnectorContext) ArgumentMatchers.any());
        Callback<TargetState> mockCallback = mockCallback();
        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, this.connector, this.connectorConfig, this.ctx, this.metrics, this.listener, this.offsetStorageReader, this.offsetStore, this.classLoader);
        workerConnector.initialize();
        assertFailedMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.STARTED, mockCallback);
        assertFailedMetric(workerConnector);
        workerConnector.shutdown();
        workerConnector.doShutdown();
        assertDestroyedMetric(workerConnector);
        verifyInitialize();
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onFailure(CONNECTOR, runtimeException);
        verifyCleanShutdown(false);
        ((Callback) Mockito.verify(mockCallback)).onCompletion((Throwable) ArgumentMatchers.any(Exception.class), (TargetState) ArgumentMatchers.isNull());
        Mockito.verifyNoMoreInteractions(new Object[]{mockCallback});
    }

    @EnumSource(value = ConnectorType.class, names = {"SOURCE", "SINK"})
    @ParameterizedTest
    public void testStartupAndShutdown(ConnectorType connectorType) {
        setup(connectorType);
        Mockito.when(this.connector.version()).thenReturn(VERSION);
        Callback<TargetState> mockCallback = mockCallback();
        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, this.connector, this.connectorConfig, this.ctx, this.metrics, this.listener, this.offsetStorageReader, this.offsetStore, this.classLoader);
        workerConnector.initialize();
        assertInitializedMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.STARTED, mockCallback);
        assertRunningMetric(workerConnector);
        workerConnector.shutdown();
        workerConnector.doShutdown();
        assertDestroyedMetric(workerConnector);
        verifyInitialize();
        ((Connector) Mockito.verify(this.connector)).start(CONFIG);
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onStartup(CONNECTOR);
        verifyCleanShutdown(true);
        ((Callback) Mockito.verify(mockCallback)).onCompletion((Throwable) ArgumentMatchers.isNull(), (TargetState) ArgumentMatchers.eq(TargetState.STARTED));
        Mockito.verifyNoMoreInteractions(new Object[]{mockCallback});
    }

    @EnumSource(value = ConnectorType.class, names = {"SOURCE", "SINK"})
    @ParameterizedTest
    public void testStartupAndPause(ConnectorType connectorType) {
        setup(connectorType);
        Mockito.when(this.connector.version()).thenReturn(VERSION);
        Callback<TargetState> mockCallback = mockCallback();
        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, this.connector, this.connectorConfig, this.ctx, this.metrics, this.listener, this.offsetStorageReader, this.offsetStore, this.classLoader);
        workerConnector.initialize();
        workerConnector.doTransitionTo(TargetState.STARTED, mockCallback);
        assertRunningMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.PAUSED, mockCallback);
        assertPausedMetric(workerConnector);
        workerConnector.shutdown();
        workerConnector.doShutdown();
        assertDestroyedMetric(workerConnector);
        verifyInitialize();
        ((Connector) Mockito.verify(this.connector)).start(CONFIG);
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onStartup(CONNECTOR);
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onPause(CONNECTOR);
        verifyCleanShutdown(true);
        InOrder inOrder = Mockito.inOrder(new Object[]{mockCallback});
        ((Callback) inOrder.verify(mockCallback)).onCompletion((Throwable) ArgumentMatchers.isNull(), (TargetState) ArgumentMatchers.eq(TargetState.STARTED));
        ((Callback) inOrder.verify(mockCallback)).onCompletion((Throwable) ArgumentMatchers.isNull(), (TargetState) ArgumentMatchers.eq(TargetState.PAUSED));
        Mockito.verifyNoMoreInteractions(new Object[]{mockCallback});
    }

    @EnumSource(value = ConnectorType.class, names = {"SOURCE", "SINK"})
    @ParameterizedTest
    public void testStartupAndStop(ConnectorType connectorType) {
        setup(connectorType);
        Mockito.when(this.connector.version()).thenReturn(VERSION);
        Callback<TargetState> mockCallback = mockCallback();
        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, this.connector, this.connectorConfig, this.ctx, this.metrics, this.listener, this.offsetStorageReader, this.offsetStore, this.classLoader);
        workerConnector.initialize();
        assertInitializedMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.STARTED, mockCallback);
        assertRunningMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.STOPPED, mockCallback);
        assertStoppedMetric(workerConnector);
        workerConnector.shutdown();
        workerConnector.doShutdown();
        assertDestroyedMetric(workerConnector);
        verifyInitialize();
        ((Connector) Mockito.verify(this.connector)).start(CONFIG);
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onStartup(CONNECTOR);
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onStop(CONNECTOR);
        verifyCleanShutdown(true);
        InOrder inOrder = Mockito.inOrder(new Object[]{mockCallback});
        ((Callback) inOrder.verify(mockCallback)).onCompletion((Throwable) ArgumentMatchers.isNull(), (TargetState) ArgumentMatchers.eq(TargetState.STARTED));
        ((Callback) inOrder.verify(mockCallback)).onCompletion((Throwable) ArgumentMatchers.isNull(), (TargetState) ArgumentMatchers.eq(TargetState.STOPPED));
        Mockito.verifyNoMoreInteractions(new Object[]{mockCallback});
    }

    @EnumSource(value = ConnectorType.class, names = {"SOURCE", "SINK"})
    @ParameterizedTest
    public void testOnResume(ConnectorType connectorType) {
        setup(connectorType);
        Mockito.when(this.connector.version()).thenReturn(VERSION);
        Callback<TargetState> mockCallback = mockCallback();
        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, this.connector, this.connectorConfig, this.ctx, this.metrics, this.listener, this.offsetStorageReader, this.offsetStore, this.classLoader);
        workerConnector.initialize();
        assertInitializedMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.PAUSED, mockCallback);
        assertPausedMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.STARTED, mockCallback);
        assertRunningMetric(workerConnector);
        workerConnector.shutdown();
        workerConnector.doShutdown();
        assertDestroyedMetric(workerConnector);
        verifyInitialize();
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onPause(CONNECTOR);
        ((Connector) Mockito.verify(this.connector)).start(CONFIG);
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onResume(CONNECTOR);
        verifyCleanShutdown(true);
        InOrder inOrder = Mockito.inOrder(new Object[]{mockCallback});
        ((Callback) inOrder.verify(mockCallback)).onCompletion((Throwable) ArgumentMatchers.isNull(), (TargetState) ArgumentMatchers.eq(TargetState.PAUSED));
        ((Callback) inOrder.verify(mockCallback)).onCompletion((Throwable) ArgumentMatchers.isNull(), (TargetState) ArgumentMatchers.eq(TargetState.STARTED));
        Mockito.verifyNoMoreInteractions(new Object[]{mockCallback});
    }

    @EnumSource(value = ConnectorType.class, names = {"SOURCE", "SINK"})
    @ParameterizedTest
    public void testStartupPaused(ConnectorType connectorType) {
        setup(connectorType);
        Mockito.when(this.connector.version()).thenReturn(VERSION);
        Callback<TargetState> mockCallback = mockCallback();
        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, this.connector, this.connectorConfig, this.ctx, this.metrics, this.listener, this.offsetStorageReader, this.offsetStore, this.classLoader);
        workerConnector.initialize();
        assertInitializedMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.PAUSED, mockCallback);
        assertPausedMetric(workerConnector);
        workerConnector.shutdown();
        workerConnector.doShutdown();
        assertDestroyedMetric(workerConnector);
        verifyInitialize();
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onPause(CONNECTOR);
        verifyCleanShutdown(false);
        ((Callback) Mockito.verify(mockCallback)).onCompletion((Throwable) ArgumentMatchers.isNull(), (TargetState) ArgumentMatchers.eq(TargetState.PAUSED));
        Mockito.verifyNoMoreInteractions(new Object[]{mockCallback});
    }

    @EnumSource(value = ConnectorType.class, names = {"SOURCE", "SINK"})
    @ParameterizedTest
    public void testStartupStopped(ConnectorType connectorType) {
        setup(connectorType);
        Mockito.when(this.connector.version()).thenReturn(VERSION);
        Callback<TargetState> mockCallback = mockCallback();
        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, this.connector, this.connectorConfig, this.ctx, this.metrics, this.listener, this.offsetStorageReader, this.offsetStore, this.classLoader);
        workerConnector.initialize();
        assertInitializedMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.STOPPED, mockCallback);
        assertStoppedMetric(workerConnector);
        workerConnector.shutdown();
        workerConnector.doShutdown();
        assertDestroyedMetric(workerConnector);
        verifyInitialize();
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onStop(CONNECTOR);
        verifyCleanShutdown(false);
        ((Callback) Mockito.verify(mockCallback)).onCompletion((Throwable) ArgumentMatchers.isNull(), (TargetState) ArgumentMatchers.eq(TargetState.STOPPED));
        Mockito.verifyNoMoreInteractions(new Object[]{mockCallback});
    }

    @EnumSource(value = ConnectorType.class, names = {"SOURCE", "SINK"})
    @ParameterizedTest
    public void testStartupFailure(ConnectorType connectorType) {
        setup(connectorType);
        RuntimeException runtimeException = new RuntimeException();
        Mockito.when(this.connector.version()).thenReturn(VERSION);
        ((Connector) Mockito.doThrow(new Throwable[]{runtimeException}).when(this.connector)).start(CONFIG);
        Callback<TargetState> mockCallback = mockCallback();
        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, this.connector, this.connectorConfig, this.ctx, this.metrics, this.listener, this.offsetStorageReader, this.offsetStore, this.classLoader);
        workerConnector.initialize();
        assertInitializedMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.STARTED, mockCallback);
        assertFailedMetric(workerConnector);
        workerConnector.shutdown();
        workerConnector.doShutdown();
        assertDestroyedMetric(workerConnector);
        verifyInitialize();
        ((Connector) Mockito.verify(this.connector)).start(CONFIG);
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onFailure(CONNECTOR, runtimeException);
        verifyCleanShutdown(false);
        ((Callback) Mockito.verify(mockCallback)).onCompletion((Throwable) ArgumentMatchers.any(Exception.class), (TargetState) ArgumentMatchers.isNull());
        Mockito.verifyNoMoreInteractions(new Object[]{mockCallback});
    }

    @EnumSource(value = ConnectorType.class, names = {"SOURCE", "SINK"})
    @ParameterizedTest
    public void testStopFailure(ConnectorType connectorType) {
        setup(connectorType);
        RuntimeException runtimeException = new RuntimeException();
        Mockito.when(this.connector.version()).thenReturn(VERSION);
        ((Connector) Mockito.doThrow(new Throwable[]{runtimeException}).doNothing().when(this.connector)).stop();
        Callback<TargetState> mockCallback = mockCallback();
        Callback<TargetState> mockCallback2 = mockCallback();
        Callback<TargetState> mockCallback3 = mockCallback();
        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, this.connector, this.connectorConfig, this.ctx, this.metrics, this.listener, this.offsetStorageReader, this.offsetStore, this.classLoader);
        workerConnector.initialize();
        assertInitializedMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.STARTED, mockCallback);
        assertRunningMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.STOPPED, mockCallback2);
        assertStoppedMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.STARTED, mockCallback3);
        assertRunningMetric(workerConnector);
        workerConnector.shutdown();
        workerConnector.doShutdown();
        assertDestroyedMetric(workerConnector);
        verifyInitialize();
        ((Connector) Mockito.verify(this.connector, Mockito.times(2))).start(CONFIG);
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onStartup(CONNECTOR);
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onResume(CONNECTOR);
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onStop(CONNECTOR);
        ((Callback) Mockito.verify(mockCallback)).onCompletion((Throwable) ArgumentMatchers.isNull(), (TargetState) ArgumentMatchers.eq(TargetState.STARTED));
        Mockito.verifyNoMoreInteractions(new Object[]{mockCallback});
        ((Callback) Mockito.verify(mockCallback2)).onCompletion((Throwable) ArgumentMatchers.isNull(), (TargetState) ArgumentMatchers.eq(TargetState.STOPPED));
        Mockito.verifyNoMoreInteractions(new Object[]{mockCallback2});
        ((Callback) Mockito.verify(mockCallback3)).onCompletion((Throwable) ArgumentMatchers.isNull(), (TargetState) ArgumentMatchers.eq(TargetState.STARTED));
        Mockito.verifyNoMoreInteractions(new Object[]{mockCallback3});
        verifyShutdown(2, true, true);
        Mockito.verifyNoMoreInteractions(new Object[]{this.listener});
    }

    @EnumSource(value = ConnectorType.class, names = {"SOURCE", "SINK"})
    @ParameterizedTest
    public void testShutdownFailure(ConnectorType connectorType) {
        setup(connectorType);
        RuntimeException runtimeException = new RuntimeException();
        Mockito.when(this.connector.version()).thenReturn(VERSION);
        ((Connector) Mockito.doThrow(new Throwable[]{runtimeException}).when(this.connector)).stop();
        Callback<TargetState> mockCallback = mockCallback();
        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, this.connector, this.connectorConfig, this.ctx, this.metrics, this.listener, this.offsetStorageReader, this.offsetStore, this.classLoader);
        workerConnector.initialize();
        assertInitializedMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.STARTED, mockCallback);
        assertRunningMetric(workerConnector);
        workerConnector.shutdown();
        workerConnector.doShutdown();
        assertFailedMetric(workerConnector);
        verifyInitialize();
        ((Connector) Mockito.verify(this.connector)).start(CONFIG);
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onStartup(CONNECTOR);
        ((Callback) Mockito.verify(mockCallback)).onCompletion((Throwable) ArgumentMatchers.isNull(), (TargetState) ArgumentMatchers.eq(TargetState.STARTED));
        Mockito.verifyNoMoreInteractions(new Object[]{mockCallback});
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onFailure(CONNECTOR, runtimeException);
        verifyShutdown(false, true);
    }

    @EnumSource(value = ConnectorType.class, names = {"SOURCE", "SINK"})
    @ParameterizedTest
    public void testTransitionStartedToStarted(ConnectorType connectorType) {
        setup(connectorType);
        Mockito.when(this.connector.version()).thenReturn(VERSION);
        Callback<TargetState> mockCallback = mockCallback();
        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, this.connector, this.connectorConfig, this.ctx, this.metrics, this.listener, this.offsetStorageReader, this.offsetStore, this.classLoader);
        workerConnector.initialize();
        assertInitializedMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.STARTED, mockCallback);
        assertRunningMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.STARTED, mockCallback);
        assertRunningMetric(workerConnector);
        workerConnector.shutdown();
        workerConnector.doShutdown();
        assertDestroyedMetric(workerConnector);
        verifyInitialize();
        ((Connector) Mockito.verify(this.connector)).start(CONFIG);
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onStartup(CONNECTOR);
        verifyCleanShutdown(true);
        ((Callback) Mockito.verify(mockCallback, Mockito.times(2))).onCompletion((Throwable) ArgumentMatchers.isNull(), (TargetState) ArgumentMatchers.eq(TargetState.STARTED));
        Mockito.verifyNoMoreInteractions(new Object[]{mockCallback});
    }

    @EnumSource(value = ConnectorType.class, names = {"SOURCE", "SINK"})
    @ParameterizedTest
    public void testTransitionPausedToPaused(ConnectorType connectorType) {
        setup(connectorType);
        Mockito.when(this.connector.version()).thenReturn(VERSION);
        Callback<TargetState> mockCallback = mockCallback();
        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, this.connector, this.connectorConfig, this.ctx, this.metrics, this.listener, this.offsetStorageReader, this.offsetStore, this.classLoader);
        workerConnector.initialize();
        assertInitializedMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.STARTED, mockCallback);
        assertRunningMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.PAUSED, mockCallback);
        assertPausedMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.PAUSED, mockCallback);
        assertPausedMetric(workerConnector);
        workerConnector.shutdown();
        workerConnector.doShutdown();
        assertDestroyedMetric(workerConnector);
        verifyInitialize();
        ((Connector) Mockito.verify(this.connector)).start(CONFIG);
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onStartup(CONNECTOR);
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onPause(CONNECTOR);
        verifyCleanShutdown(true);
        InOrder inOrder = Mockito.inOrder(new Object[]{mockCallback});
        ((Callback) inOrder.verify(mockCallback)).onCompletion((Throwable) ArgumentMatchers.isNull(), (TargetState) ArgumentMatchers.eq(TargetState.STARTED));
        ((Callback) inOrder.verify(mockCallback, Mockito.times(2))).onCompletion((Throwable) ArgumentMatchers.isNull(), (TargetState) ArgumentMatchers.eq(TargetState.PAUSED));
        Mockito.verifyNoMoreInteractions(new Object[]{mockCallback});
    }

    @EnumSource(value = ConnectorType.class, names = {"SOURCE", "SINK"})
    @ParameterizedTest
    public void testTransitionStoppedToStopped(ConnectorType connectorType) {
        setup(connectorType);
        Mockito.when(this.connector.version()).thenReturn(VERSION);
        Callback<TargetState> mockCallback = mockCallback();
        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, this.connector, this.connectorConfig, this.ctx, this.metrics, this.listener, this.offsetStorageReader, this.offsetStore, this.classLoader);
        workerConnector.initialize();
        assertInitializedMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.STARTED, mockCallback);
        assertRunningMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.STOPPED, mockCallback);
        assertStoppedMetric(workerConnector);
        workerConnector.doTransitionTo(TargetState.STOPPED, mockCallback);
        assertStoppedMetric(workerConnector);
        workerConnector.shutdown();
        workerConnector.doShutdown();
        assertDestroyedMetric(workerConnector);
        verifyInitialize();
        ((Connector) Mockito.verify(this.connector)).start(CONFIG);
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onStartup(CONNECTOR);
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onStop(CONNECTOR);
        verifyCleanShutdown(true);
        InOrder inOrder = Mockito.inOrder(new Object[]{mockCallback});
        ((Callback) inOrder.verify(mockCallback)).onCompletion((Throwable) ArgumentMatchers.isNull(), (TargetState) ArgumentMatchers.eq(TargetState.STARTED));
        ((Callback) inOrder.verify(mockCallback, Mockito.times(2))).onCompletion((Throwable) ArgumentMatchers.isNull(), (TargetState) ArgumentMatchers.eq(TargetState.STOPPED));
        Mockito.verifyNoMoreInteractions(new Object[]{mockCallback});
    }

    @EnumSource(value = ConnectorType.class, names = {"SOURCE", "SINK"})
    @ParameterizedTest
    public void testFailConnectorThatIsNeitherSourceNorSink(ConnectorType connectorType) {
        setup(connectorType);
        Connector connector = (Connector) Mockito.mock(Connector.class);
        Mockito.when(connector.version()).thenReturn(VERSION);
        new WorkerConnector(CONNECTOR, connector, this.connectorConfig, this.ctx, this.metrics, this.listener, this.offsetStorageReader, this.offsetStore, this.classLoader).initialize();
        ((Connector) Mockito.verify(connector)).version();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Throwable.class);
        ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onFailure((String) ArgumentMatchers.eq(CONNECTOR), (Throwable) forClass.capture());
        Throwable th = (Throwable) forClass.getValue();
        Assertions.assertInstanceOf(ConnectException.class, th);
        Assertions.assertTrue(th.getMessage().contains("must be a subclass of"));
    }

    protected void assertFailedMetric(WorkerConnector workerConnector) {
        Assertions.assertFalse(workerConnector.metrics().isUnassigned());
        Assertions.assertTrue(workerConnector.metrics().isFailed());
        Assertions.assertFalse(workerConnector.metrics().isStopped());
        Assertions.assertFalse(workerConnector.metrics().isPaused());
        Assertions.assertFalse(workerConnector.metrics().isRunning());
    }

    protected void assertStoppedMetric(WorkerConnector workerConnector) {
        Assertions.assertFalse(workerConnector.metrics().isUnassigned());
        Assertions.assertFalse(workerConnector.metrics().isFailed());
        Assertions.assertFalse(workerConnector.metrics().isPaused());
        Assertions.assertTrue(workerConnector.metrics().isStopped());
        Assertions.assertFalse(workerConnector.metrics().isRunning());
    }

    protected void assertPausedMetric(WorkerConnector workerConnector) {
        Assertions.assertFalse(workerConnector.metrics().isUnassigned());
        Assertions.assertFalse(workerConnector.metrics().isFailed());
        Assertions.assertFalse(workerConnector.metrics().isStopped());
        Assertions.assertTrue(workerConnector.metrics().isPaused());
        Assertions.assertFalse(workerConnector.metrics().isRunning());
    }

    protected void assertRunningMetric(WorkerConnector workerConnector) {
        Assertions.assertFalse(workerConnector.metrics().isUnassigned());
        Assertions.assertFalse(workerConnector.metrics().isFailed());
        Assertions.assertFalse(workerConnector.metrics().isStopped());
        Assertions.assertFalse(workerConnector.metrics().isPaused());
        Assertions.assertTrue(workerConnector.metrics().isRunning());
    }

    protected void assertDestroyedMetric(WorkerConnector workerConnector) {
        Assertions.assertTrue(workerConnector.metrics().isUnassigned());
        Assertions.assertFalse(workerConnector.metrics().isFailed());
        Assertions.assertFalse(workerConnector.metrics().isStopped());
        Assertions.assertFalse(workerConnector.metrics().isPaused());
        Assertions.assertFalse(workerConnector.metrics().isRunning());
    }

    protected void assertInitializedMetric(WorkerConnector workerConnector) {
        String str;
        switch (AnonymousClass1.$SwitchMap$org$apache$kafka$connect$health$ConnectorType[this.connectorType.ordinal()]) {
            case 1:
                str = "sink";
                break;
            case 2:
                str = "source";
                break;
            default:
                throw new IllegalStateException("Unexpected connector type: " + String.valueOf(this.connectorType));
        }
        assertInitializedMetric(workerConnector, str);
    }

    protected void assertInitializedMetric(WorkerConnector workerConnector, String str) {
        Assertions.assertTrue(workerConnector.metrics().isUnassigned());
        Assertions.assertFalse(workerConnector.metrics().isFailed());
        Assertions.assertFalse(workerConnector.metrics().isStopped());
        Assertions.assertFalse(workerConnector.metrics().isPaused());
        Assertions.assertFalse(workerConnector.metrics().isRunning());
        ConnectMetrics.MetricGroup metricGroup = workerConnector.metrics().metricGroup();
        this.metrics.currentMetricValueAsString(metricGroup, "status");
        String currentMetricValueAsString = this.metrics.currentMetricValueAsString(metricGroup, "connector-type");
        String currentMetricValueAsString2 = this.metrics.currentMetricValueAsString(metricGroup, "connector-class");
        String currentMetricValueAsString3 = this.metrics.currentMetricValueAsString(metricGroup, "connector-version");
        Assertions.assertEquals(str, currentMetricValueAsString);
        Assertions.assertNotNull(currentMetricValueAsString2);
        Assertions.assertEquals(VERSION, currentMetricValueAsString3);
    }

    private Callback<TargetState> mockCallback() {
        return (Callback) Mockito.mock(Callback.class);
    }

    private void verifyInitialize() {
        ((Connector) Mockito.verify(this.connector)).version();
        if (this.connectorType == ConnectorType.SOURCE) {
            ((ConnectorOffsetBackingStore) Mockito.verify(this.offsetStore)).start();
            ((Connector) Mockito.verify(this.connector)).initialize((ConnectorContext) ArgumentMatchers.any(SourceConnectorContext.class));
        } else if (this.connectorType == ConnectorType.SINK) {
            ((Connector) Mockito.verify(this.connector)).initialize((ConnectorContext) ArgumentMatchers.any(SinkConnectorContext.class));
        }
    }

    private void verifyCleanShutdown(boolean z) {
        verifyShutdown(true, z);
    }

    private void verifyShutdown(boolean z, boolean z2) {
        verifyShutdown(1, z, z2);
    }

    private void verifyShutdown(int i, boolean z, boolean z2) {
        ((CloseableConnectorContext) Mockito.verify(this.ctx)).close();
        if (this.connectorType == ConnectorType.SOURCE) {
            ((CloseableOffsetStorageReader) Mockito.verify(this.offsetStorageReader)).close();
            ((ConnectorOffsetBackingStore) Mockito.verify(this.offsetStore)).stop();
        }
        if (z) {
            ((ConnectorStatus.Listener) Mockito.verify(this.listener)).onShutdown(CONNECTOR);
        }
        if (z2) {
            ((Connector) Mockito.verify(this.connector, Mockito.times(i))).stop();
        }
    }

    static {
        CONFIG.put("connector.class", TestConnector.class.getName());
        CONFIG.put("name", CONNECTOR);
        CONFIG.put("topics", "my-topic");
    }
}
