package org.apache.kafka.connect.integration;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/connect/integration/BlockingConnectorTest.class */
public class BlockingConnectorTest {
    private static final int NUM_WORKERS = 1;
    private static final String BLOCKING_CONNECTOR_NAME = "blocking-connector";
    private static final String NORMAL_CONNECTOR_NAME = "normal-connector";
    private static final String TEST_TOPIC = "normal-topic";
    private static final int NUM_RECORDS_PRODUCED = 100;
    private static final String CONNECTOR_INITIALIZE = "Connector::initialize";
    private static final String CONNECTOR_INITIALIZE_WITH_TASK_CONFIGS = "Connector::initializeWithTaskConfigs";
    private static final String CONNECTOR_START = "Connector::start";
    private static final String CONNECTOR_RECONFIGURE = "Connector::reconfigure";
    private static final String CONNECTOR_TASK_CLASS = "Connector::taskClass";
    private static final String CONNECTOR_TASK_CONFIGS = "Connector::taskConfigs";
    private static final String CONNECTOR_STOP = "Connector::stop";
    private static final String CONNECTOR_VALIDATE = "Connector::validate";
    private static final String CONNECTOR_CONFIG = "Connector::config";
    private static final String CONNECTOR_VERSION = "Connector::version";
    private static final String TASK_START = "Task::start";
    private static final String TASK_STOP = "Task::stop";
    private static final String TASK_VERSION = "Task::version";
    private static final String SINK_TASK_INITIALIZE = "SinkTask::initialize";
    private static final String SINK_TASK_PUT = "SinkTask::put";
    private static final String SINK_TASK_FLUSH = "SinkTask::flush";
    private static final String SINK_TASK_PRE_COMMIT = "SinkTask::preCommit";
    private static final String SINK_TASK_OPEN = "SinkTask::open";
    private static final String SINK_TASK_ON_PARTITIONS_ASSIGNED = "SinkTask::onPartitionsAssigned";
    private static final String SINK_TASK_CLOSE = "SinkTask::close";
    private static final String SINK_TASK_ON_PARTITIONS_REVOKED = "SinkTask::onPartitionsRevoked";
    private static final String SOURCE_TASK_INITIALIZE = "SourceTask::initialize";
    private static final String SOURCE_TASK_POLL = "SourceTask::poll";
    private static final String SOURCE_TASK_COMMIT = "SourceTask::commit";
    private static final String SOURCE_TASK_COMMIT_RECORD = "SourceTask::commitRecord";
    private static final String SOURCE_TASK_COMMIT_RECORD_WITH_METADATA = "SourceTask::commitRecordWithMetadata";
    private EmbeddedConnectCluster connect;
    private ConnectorHandle normalConnectorHandle;
    private static final Logger log = LoggerFactory.getLogger(BlockingConnectorTest.class);
    private static final long CONNECT_WORKER_STARTUP_TIMEOUT = TimeUnit.SECONDS.toMillis(60);
    private static final long RECORD_TRANSFER_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
    private static final long REST_REQUEST_TIMEOUT = Worker.CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS * 2;

    /* loaded from: input_file:org/apache/kafka/connect/integration/BlockingConnectorTest$Block.class */
    private static class Block {
        private static CountDownLatch blockLatch;
        private final String block;
        public static final String BLOCK_CONFIG = "block";

        private static ConfigDef config() {
            return new ConfigDef().define(BLOCK_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, "Where to block indefinitely, e.g., 'Connector::start', 'Connector::initialize', 'Connector::taskConfigs', 'Task::version', 'SinkTask::put', 'SourceTask::poll'");
        }

        public static void waitForBlock() throws InterruptedException {
            synchronized (Block.class) {
                if (blockLatch == null) {
                    throw new IllegalArgumentException("No connector has been created yet");
                }
            }
            BlockingConnectorTest.log.debug("Waiting for connector to block");
            blockLatch.await();
            BlockingConnectorTest.log.debug("Connector should now be blocked");
        }

        public static void resetBlockLatch() {
            synchronized (Block.class) {
                if (blockLatch != null) {
                    blockLatch.countDown();
                    blockLatch = null;
                }
            }
        }

        public Block(Map<String, String> map) {
            this(new AbstractConfig(config(), map).getString(BLOCK_CONFIG));
        }

        public Block(String str) {
            this.block = str;
            synchronized (Block.class) {
                if (blockLatch != null) {
                    blockLatch.countDown();
                }
                blockLatch = new CountDownLatch(1);
            }
        }

        public Map<String, String> taskConfig() {
            return Collections.singletonMap(BLOCK_CONFIG, this.block);
        }

        public void maybeBlockOn(String str) {
            if (!str.equals(this.block)) {
                BlockingConnectorTest.log.debug("Will not block on {}", str);
            } else {
                BlockingConnectorTest.log.info("Will block on {}", str);
                blockLatch.countDown();
                while (true) {
                    try {
                        Thread.sleep(Long.MAX_VALUE);
                    } catch (InterruptedException e) {
                    }
                }
            }
        }

        static /* synthetic */ ConfigDef access$100() {
            return config();
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/integration/BlockingConnectorTest$BlockingConnector.class */
    public static class BlockingConnector extends SourceConnector {
        private Block block;

        /* loaded from: input_file:org/apache/kafka/connect/integration/BlockingConnectorTest$BlockingConnector$BlockingTask.class */
        public static class BlockingTask extends SourceTask {
            public void start(Map<String, String> map) {
            }

            public List<SourceRecord> poll() {
                return null;
            }

            public void stop() {
            }

            public String version() {
                return "0.0.0";
            }
        }

        public BlockingConnector() {
            this(null);
        }

        protected BlockingConnector(String str) {
            this.block = new Block(str);
        }

        public void initialize(ConnectorContext connectorContext) {
            this.block.maybeBlockOn(BlockingConnectorTest.CONNECTOR_INITIALIZE);
            super.initialize(connectorContext);
        }

        public void initialize(ConnectorContext connectorContext, List<Map<String, String>> list) {
            this.block.maybeBlockOn(BlockingConnectorTest.CONNECTOR_INITIALIZE_WITH_TASK_CONFIGS);
            super.initialize(connectorContext, list);
        }

        public void start(Map<String, String> map) {
            this.block = new Block(map);
            this.block.maybeBlockOn(BlockingConnectorTest.CONNECTOR_START);
        }

        public void reconfigure(Map<String, String> map) {
            this.block.maybeBlockOn(BlockingConnectorTest.CONNECTOR_RECONFIGURE);
            super.reconfigure(map);
        }

        public Class<? extends Task> taskClass() {
            this.block.maybeBlockOn(BlockingConnectorTest.CONNECTOR_TASK_CLASS);
            return BlockingTask.class;
        }

        public List<Map<String, String>> taskConfigs(int i) {
            this.block.maybeBlockOn(BlockingConnectorTest.CONNECTOR_TASK_CONFIGS);
            return Collections.singletonList(Collections.emptyMap());
        }

        public void stop() {
            this.block.maybeBlockOn(BlockingConnectorTest.CONNECTOR_STOP);
        }

        public Config validate(Map<String, String> map) {
            this.block.maybeBlockOn(BlockingConnectorTest.CONNECTOR_VALIDATE);
            return super.validate(map);
        }

        public ConfigDef config() {
            this.block.maybeBlockOn(BlockingConnectorTest.CONNECTOR_CONFIG);
            return Block.access$100();
        }

        public String version() {
            this.block.maybeBlockOn(BlockingConnectorTest.CONNECTOR_VERSION);
            return "0.0.0";
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/integration/BlockingConnectorTest$BlockingSinkConnector.class */
    public static class BlockingSinkConnector extends SinkConnector {
        private Map<String, String> props;
        private final Class<? extends BlockingSinkTask> taskClass;

        /* loaded from: input_file:org/apache/kafka/connect/integration/BlockingConnectorTest$BlockingSinkConnector$BlockingSinkTask.class */
        public static class BlockingSinkTask extends SinkTask {
            private Block block;

            public BlockingSinkTask() {
                this(null);
            }

            protected BlockingSinkTask(String str) {
                this.block = new Block(str);
            }

            public void start(Map<String, String> map) {
                this.block = new Block(map);
                this.block.maybeBlockOn(BlockingConnectorTest.TASK_START);
            }

            public void put(Collection<SinkRecord> collection) {
                this.block.maybeBlockOn(BlockingConnectorTest.SINK_TASK_PUT);
            }

            public void stop() {
                this.block.maybeBlockOn(BlockingConnectorTest.TASK_STOP);
            }

            public String version() {
                this.block.maybeBlockOn(BlockingConnectorTest.TASK_VERSION);
                return "0.0.0";
            }

            public void initialize(SinkTaskContext sinkTaskContext) {
                this.block.maybeBlockOn(BlockingConnectorTest.SINK_TASK_INITIALIZE);
                super.initialize(sinkTaskContext);
            }

            public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
                this.block.maybeBlockOn(BlockingConnectorTest.SINK_TASK_FLUSH);
                super.flush(map);
            }

            public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> map) {
                this.block.maybeBlockOn(BlockingConnectorTest.SINK_TASK_PRE_COMMIT);
                return super.preCommit(map);
            }

            public void open(Collection<TopicPartition> collection) {
                this.block.maybeBlockOn(BlockingConnectorTest.SINK_TASK_OPEN);
                super.open(collection);
            }

            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                this.block.maybeBlockOn(BlockingConnectorTest.SINK_TASK_ON_PARTITIONS_ASSIGNED);
                super.onPartitionsAssigned(collection);
            }

            public void close(Collection<TopicPartition> collection) {
                this.block.maybeBlockOn(BlockingConnectorTest.SINK_TASK_CLOSE);
                super.close(collection);
            }

            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                this.block.maybeBlockOn(BlockingConnectorTest.SINK_TASK_ON_PARTITIONS_REVOKED);
                super.onPartitionsRevoked(collection);
            }
        }

        public BlockingSinkConnector() {
            this(BlockingSinkTask.class);
        }

        protected BlockingSinkConnector(Class<? extends BlockingSinkTask> cls) {
            this.taskClass = cls;
        }

        public void start(Map<String, String> map) {
            this.props = map;
        }

        public Class<? extends Task> taskClass() {
            return this.taskClass;
        }

        public List<Map<String, String>> taskConfigs(int i) {
            return (List) IntStream.rangeClosed(0, i).mapToObj(i2 -> {
                return new HashMap(this.props);
            }).collect(Collectors.toList());
        }

        public void stop() {
        }

        public Config validate(Map<String, String> map) {
            return super.validate(map);
        }

        public ConfigDef config() {
            return Block.access$100();
        }

        public String version() {
            return "0.0.0";
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/integration/BlockingConnectorTest$BlockingSourceConnector.class */
    public static class BlockingSourceConnector extends SourceConnector {
        private Map<String, String> props;
        private final Class<? extends BlockingSourceTask> taskClass;

        /* loaded from: input_file:org/apache/kafka/connect/integration/BlockingConnectorTest$BlockingSourceConnector$BlockingSourceTask.class */
        public static class BlockingSourceTask extends SourceTask {
            private Block block;

            public BlockingSourceTask() {
                this(null);
            }

            protected BlockingSourceTask(String str) {
                this.block = new Block(str);
            }

            public void start(Map<String, String> map) {
                this.block = new Block(map);
                this.block.maybeBlockOn(BlockingConnectorTest.TASK_START);
            }

            public List<SourceRecord> poll() {
                this.block.maybeBlockOn(BlockingConnectorTest.SOURCE_TASK_POLL);
                return null;
            }

            public void stop() {
                this.block.maybeBlockOn(BlockingConnectorTest.TASK_STOP);
            }

            public String version() {
                this.block.maybeBlockOn(BlockingConnectorTest.TASK_VERSION);
                return "0.0.0";
            }

            public void initialize(SourceTaskContext sourceTaskContext) {
                this.block.maybeBlockOn(BlockingConnectorTest.SOURCE_TASK_INITIALIZE);
                super.initialize(sourceTaskContext);
            }

            public void commit() throws InterruptedException {
                this.block.maybeBlockOn(BlockingConnectorTest.SOURCE_TASK_COMMIT);
                super.commit();
            }

            public void commitRecord(SourceRecord sourceRecord) throws InterruptedException {
                this.block.maybeBlockOn(BlockingConnectorTest.SOURCE_TASK_COMMIT_RECORD);
                super.commitRecord(sourceRecord);
            }

            public void commitRecord(SourceRecord sourceRecord, RecordMetadata recordMetadata) throws InterruptedException {
                this.block.maybeBlockOn(BlockingConnectorTest.SOURCE_TASK_COMMIT_RECORD_WITH_METADATA);
                super.commitRecord(sourceRecord, recordMetadata);
            }
        }

        public BlockingSourceConnector() {
            this(BlockingSourceTask.class);
        }

        protected BlockingSourceConnector(Class<? extends BlockingSourceTask> cls) {
            this.taskClass = cls;
        }

        public void start(Map<String, String> map) {
            this.props = map;
        }

        public Class<? extends Task> taskClass() {
            return this.taskClass;
        }

        public List<Map<String, String>> taskConfigs(int i) {
            return (List) IntStream.range(0, i).mapToObj(i2 -> {
                return new HashMap(this.props);
            }).collect(Collectors.toList());
        }

        public void stop() {
        }

        public Config validate(Map<String, String> map) {
            return super.validate(map);
        }

        public ConfigDef config() {
            return Block.access$100();
        }

        public String version() {
            return "0.0.0";
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/integration/BlockingConnectorTest$ConfigBlockingConnector.class */
    public static class ConfigBlockingConnector extends BlockingConnector {
        public ConfigBlockingConnector() {
            super(BlockingConnectorTest.CONNECTOR_CONFIG);
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/integration/BlockingConnectorTest$InitializeBlockingConnector.class */
    public static class InitializeBlockingConnector extends BlockingConnector {
        public InitializeBlockingConnector() {
            super(BlockingConnectorTest.CONNECTOR_INITIALIZE);
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/integration/BlockingConnectorTest$TaskInitializeBlockingSinkConnector.class */
    public static class TaskInitializeBlockingSinkConnector extends BlockingSinkConnector {

        /* loaded from: input_file:org/apache/kafka/connect/integration/BlockingConnectorTest$TaskInitializeBlockingSinkConnector$InitializeBlockingSinkTask.class */
        public static class InitializeBlockingSinkTask extends BlockingSinkConnector.BlockingSinkTask {
            public InitializeBlockingSinkTask() {
                super(BlockingConnectorTest.SINK_TASK_INITIALIZE);
            }
        }

        public TaskInitializeBlockingSinkConnector() {
            super(InitializeBlockingSinkTask.class);
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/integration/BlockingConnectorTest$TaskInitializeBlockingSourceConnector.class */
    public static class TaskInitializeBlockingSourceConnector extends BlockingSourceConnector {

        /* loaded from: input_file:org/apache/kafka/connect/integration/BlockingConnectorTest$TaskInitializeBlockingSourceConnector$InitializeBlockingSourceTask.class */
        public static class InitializeBlockingSourceTask extends BlockingSourceConnector.BlockingSourceTask {
            public InitializeBlockingSourceTask() {
                super(BlockingConnectorTest.SOURCE_TASK_INITIALIZE);
            }
        }

        public TaskInitializeBlockingSourceConnector() {
            super(InitializeBlockingSourceTask.class);
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/integration/BlockingConnectorTest$ValidateBlockingConnector.class */
    public static class ValidateBlockingConnector extends BlockingConnector {
        public ValidateBlockingConnector() {
            super(BlockingConnectorTest.CONNECTOR_VALIDATE);
        }
    }

    @Before
    public void setup() throws Exception {
        ConnectorsResource.setRequestTimeout(REST_REQUEST_TIMEOUT);
        this.connect = new EmbeddedConnectCluster.Builder().name("connect-cluster").numWorkers(1).numBrokers(1).workerProps(new HashMap()).brokerProps(new Properties()).build();
        this.connect.start();
        TestUtils.waitForCondition(() -> {
            return this.connect.requestGet(this.connect.endpointForResource("connectors/nonexistent")).getStatus() == 404;
        }, CONNECT_WORKER_STARTUP_TIMEOUT, "Worker did not complete startup in time");
    }

    @After
    public void close() {
        this.connect.stop();
        ConnectorsResource.resetRequestTimeout();
        Block.resetBlockLatch();
    }

    @Test
    public void testBlockInConnectorValidate() throws Exception {
        log.info("Starting test testBlockInConnectorValidate");
        Assert.assertThrows(ConnectRestException.class, () -> {
            createConnectorWithBlock(ValidateBlockingConnector.class, CONNECTOR_VALIDATE);
        });
        Block.waitForBlock();
        createNormalConnector();
        verifyNormalConnector();
    }

    @Test
    public void testBlockInConnectorConfig() throws Exception {
        log.info("Starting test testBlockInConnectorConfig");
        Assert.assertThrows(ConnectRestException.class, () -> {
            createConnectorWithBlock(ConfigBlockingConnector.class, CONNECTOR_CONFIG);
        });
        Block.waitForBlock();
        createNormalConnector();
        verifyNormalConnector();
    }

    @Test
    public void testBlockInConnectorInitialize() throws Exception {
        log.info("Starting test testBlockInConnectorInitialize");
        createConnectorWithBlock(InitializeBlockingConnector.class, CONNECTOR_INITIALIZE);
        Block.waitForBlock();
        createNormalConnector();
        verifyNormalConnector();
    }

    @Test
    public void testBlockInConnectorStart() throws Exception {
        log.info("Starting test testBlockInConnectorStart");
        createConnectorWithBlock(BlockingConnector.class, CONNECTOR_START);
        Block.waitForBlock();
        createNormalConnector();
        verifyNormalConnector();
    }

    @Test
    public void testBlockInConnectorStop() throws Exception {
        log.info("Starting test testBlockInConnectorStop");
        createConnectorWithBlock(BlockingConnector.class, CONNECTOR_STOP);
        waitForConnectorStart(BLOCKING_CONNECTOR_NAME);
        this.connect.deleteConnector(BLOCKING_CONNECTOR_NAME);
        Block.waitForBlock();
        createNormalConnector();
        verifyNormalConnector();
    }

    @Test
    public void testBlockInSourceTaskStart() throws Exception {
        log.info("Starting test testBlockInSourceTaskStart");
        createConnectorWithBlock(BlockingSourceConnector.class, TASK_START);
        Block.waitForBlock();
        createNormalConnector();
        verifyNormalConnector();
    }

    @Test
    public void testBlockInSourceTaskStop() throws Exception {
        log.info("Starting test testBlockInSourceTaskStop");
        createConnectorWithBlock(BlockingSourceConnector.class, TASK_STOP);
        waitForConnectorStart(BLOCKING_CONNECTOR_NAME);
        this.connect.deleteConnector(BLOCKING_CONNECTOR_NAME);
        Block.waitForBlock();
        createNormalConnector();
        verifyNormalConnector();
    }

    @Test
    public void testBlockInSinkTaskStart() throws Exception {
        log.info("Starting test testBlockInSinkTaskStart");
        createConnectorWithBlock(BlockingSinkConnector.class, TASK_START);
        Block.waitForBlock();
        createNormalConnector();
        verifyNormalConnector();
    }

    @Test
    public void testBlockInSinkTaskStop() throws Exception {
        log.info("Starting test testBlockInSinkTaskStop");
        createConnectorWithBlock(BlockingSinkConnector.class, TASK_STOP);
        waitForConnectorStart(BLOCKING_CONNECTOR_NAME);
        this.connect.deleteConnector(BLOCKING_CONNECTOR_NAME);
        Block.waitForBlock();
        createNormalConnector();
        verifyNormalConnector();
    }

    @Test
    public void testWorkerRestartWithBlockInConnectorStart() throws Exception {
        log.info("Starting test testWorkerRestartWithBlockInConnectorStart");
        createConnectorWithBlock(BlockingConnector.class, CONNECTOR_START);
        Block.waitForBlock();
        createNormalConnector();
        this.connect.removeWorker();
        this.connect.addWorker();
        Block.waitForBlock();
        verifyNormalConnector();
    }

    @Test
    public void testWorkerRestartWithBlockInConnectorStop() throws Exception {
        log.info("Starting test testWorkerRestartWithBlockInConnectorStop");
        createConnectorWithBlock(BlockingConnector.class, CONNECTOR_STOP);
        waitForConnectorStart(BLOCKING_CONNECTOR_NAME);
        createNormalConnector();
        waitForConnectorStart(NORMAL_CONNECTOR_NAME);
        this.connect.removeWorker();
        Block.waitForBlock();
        this.connect.addWorker();
        waitForConnectorStart(BLOCKING_CONNECTOR_NAME);
        verifyNormalConnector();
    }

    private void createConnectorWithBlock(Class<? extends Connector> cls, String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", cls.getName());
        hashMap.put("tasks.max", "1");
        hashMap.put("topics", "t1");
        hashMap.put(Block.BLOCK_CONFIG, Objects.requireNonNull(str));
        log.info("Creating blocking connector of type {} with block in {}", cls.getSimpleName(), str);
        try {
            this.connect.configureConnector(BLOCKING_CONNECTOR_NAME, hashMap);
        } catch (RuntimeException e) {
            log.info("Failed to create connector", e);
            throw e;
        }
    }

    private void createNormalConnector() {
        this.connect.kafka().createTopic(TEST_TOPIC, 3);
        this.normalConnectorHandle = RuntimeHandles.get().connectorHandle(NORMAL_CONNECTOR_NAME);
        this.normalConnectorHandle.expectedRecords(NUM_RECORDS_PRODUCED);
        this.normalConnectorHandle.expectedCommits(NUM_RECORDS_PRODUCED);
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", MonitorableSourceConnector.class.getName());
        hashMap.put("tasks.max", "1");
        hashMap.put(MonitorableSourceConnector.TOPIC_CONFIG, TEST_TOPIC);
        log.info("Creating normal connector");
        try {
            this.connect.configureConnector(NORMAL_CONNECTOR_NAME, hashMap);
        } catch (RuntimeException e) {
            log.info("Failed to create connector", e);
            throw e;
        }
    }

    private void waitForConnectorStart(String str) throws InterruptedException {
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(str, 0, String.format("Failed to observe transition to 'RUNNING' state for connector '%s' in time", str));
    }

    private void verifyNormalConnector() throws InterruptedException {
        waitForConnectorStart(NORMAL_CONNECTOR_NAME);
        this.normalConnectorHandle.awaitRecords(RECORD_TRANSFER_DURATION_MS);
        this.normalConnectorHandle.awaitCommits(RECORD_TRANSFER_DURATION_MS);
    }
}
