package org.apache.kafka.connect.integration;

import java.io.Closeable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.ErrorHandlingTaskTest;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.ConnectAssertions;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
import org.apache.kafka.connect.util.clusters.WorkerHandle;
import org.apache.kafka.test.NoRetryException;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tag("integration")
/* loaded from: input_file:org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.class */
public class ExactlyOnceSourceIntegrationTest {
    private static final String CLUSTER_GROUP_ID = "exactly-once-source-integration-test";
    private static final String CONNECTOR_NAME = "exactlyOnceQuestionMark";
    private static final int CONSUME_RECORDS_TIMEOUT_MS = 60000;
    private static final int SOURCE_TASK_PRODUCE_TIMEOUT_MS = 30000;
    private static final int ACL_PROPAGATION_TIMEOUT_MS = 30000;
    private static final int DEFAULT_NUM_WORKERS = 3;
    private Properties brokerProps;
    private Map<String, String> workerProps;
    private EmbeddedConnectCluster.Builder connectBuilder;
    private EmbeddedConnectCluster connect;
    private ConnectorHandle connectorHandle;
    private static final Logger log = LoggerFactory.getLogger(ExactlyOnceSourceIntegrationTest.class);
    private static final int MINIMUM_MESSAGES = 100;
    private static final String MESSAGES_PER_POLL = Integer.toString(MINIMUM_MESSAGES);
    private static final String MESSAGES_PER_SECOND = Long.toString(50);

    /* loaded from: input_file:org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest$NaughtyConnector.class */
    public static class NaughtyConnector extends SourceConnector {
        private Map<String, String> props;

        public void start(Map<String, String> map) {
            this.props = map;
        }

        public Class<? extends Task> taskClass() {
            return NaughtyTask.class;
        }

        public List<Map<String, String>> taskConfigs(int i) {
            return (List) IntStream.range(0, i).mapToObj(i2 -> {
                return this.props;
            }).collect(Collectors.toList());
        }

        public void stop() {
        }

        public ConfigDef config() {
            return new ConfigDef();
        }

        public String version() {
            return "none";
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest$NaughtyTask.class */
    public static class NaughtyTask extends SourceTask {
        private String topic;

        public void start(Map<String, String> map) {
            if (!map.containsKey("offsets.storage.topic")) {
                throw new ConnectException("No offsets topic");
            }
            this.topic = map.get("offsets.storage.topic");
        }

        public List<SourceRecord> poll() {
            this.context.offsetStorageReader().offset(Collections.singletonMap("", null));
            return Collections.singletonList(new SourceRecord((Map) null, (Map) null, this.topic, (Schema) null, "", (Schema) null, (Object) null));
        }

        public void stop() {
        }

        public String version() {
            return "none";
        }
    }

    @BeforeEach
    public void setup() {
        this.workerProps = new HashMap();
        this.workerProps.put("exactly.once.source.support", "enabled");
        this.workerProps.put("group.id", CLUSTER_GROUP_ID);
        this.brokerProps = new Properties();
        this.brokerProps.put("transaction.state.log.replication.factor", "1");
        this.brokerProps.put("transaction.state.log.min.isr", "1");
        this.connectBuilder = (EmbeddedConnectCluster.Builder) ((EmbeddedConnectCluster.Builder) ((EmbeddedConnectCluster.Builder) new EmbeddedConnectCluster.Builder().numWorkers(DEFAULT_NUM_WORKERS).numBrokers(1)).workerProps(this.workerProps)).brokerProps(this.brokerProps);
        this.connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
    }

    private void startConnect() {
        this.connect = (EmbeddedConnectCluster) this.connectBuilder.build();
        this.connect.start();
    }

    @AfterEach
    public void close() {
        try {
            this.connect.stop();
            RuntimeHandles.get().deleteConnector(CONNECTOR_NAME);
        } catch (Throwable th) {
            RuntimeHandles.get().deleteConnector(CONNECTOR_NAME);
            throw th;
        }
    }

    @Test
    public void testPreflightValidation() {
        this.connectBuilder.numWorkers(1);
        startConnect();
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", MonitorableSourceConnector.class.getName());
        hashMap.put("tasks.max", "1");
        hashMap.put(MonitorableSourceConnector.TOPIC_CONFIG, MonitorableSourceConnector.TOPIC_CONFIG);
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        hashMap.put("name", CONNECTOR_NAME);
        hashMap.put("exactly.once.support", "required");
        hashMap.put(MonitorableSourceConnector.CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG, "null");
        ConfigInfos validateConnectorConfig = this.connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), hashMap);
        Assertions.assertEquals(1, validateConnectorConfig.errorCount(), "Preflight validation should have exactly one error");
        Assertions.assertFalse(findConfigInfo("exactly.once.support", validateConnectorConfig).configValue().errors().isEmpty(), "Preflight validation for exactly-once support property should have at least one error message");
        hashMap.put(MonitorableSourceConnector.CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG, "unsupported");
        ConfigInfos validateConnectorConfig2 = this.connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), hashMap);
        Assertions.assertEquals(1, validateConnectorConfig2.errorCount(), "Preflight validation should have exactly one error");
        Assertions.assertFalse(findConfigInfo("exactly.once.support", validateConnectorConfig2).configValue().errors().isEmpty(), "Preflight validation for exactly-once support property should have at least one error message");
        hashMap.put(MonitorableSourceConnector.CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG, "fail");
        ConfigInfos validateConnectorConfig3 = this.connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), hashMap);
        Assertions.assertEquals(1, validateConnectorConfig3.errorCount(), "Preflight validation should have exactly one error");
        Assertions.assertFalse(findConfigInfo("exactly.once.support", validateConnectorConfig3).configValue().errors().isEmpty(), "Preflight validation for exactly-once support property should have at least one error message");
        hashMap.put(MonitorableSourceConnector.CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG, "supported");
        Assertions.assertEquals(0, this.connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), hashMap).errorCount(), "Preflight validation should have zero errors");
        hashMap.put("transaction.boundary", SourceTask.TransactionBoundary.CONNECTOR.toString());
        hashMap.put(MonitorableSourceConnector.CUSTOM_TRANSACTION_BOUNDARIES_CONFIG, "null");
        ConfigInfos validateConnectorConfig4 = this.connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), hashMap);
        Assertions.assertEquals(1, validateConnectorConfig4.errorCount(), "Preflight validation should have exactly one error");
        Assertions.assertFalse(findConfigInfo("transaction.boundary", validateConnectorConfig4).configValue().errors().isEmpty(), "Preflight validation for transaction boundary property should have at least one error message");
        hashMap.put(MonitorableSourceConnector.CUSTOM_TRANSACTION_BOUNDARIES_CONFIG, "unsupported");
        ConfigInfos validateConnectorConfig5 = this.connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), hashMap);
        Assertions.assertEquals(1, validateConnectorConfig5.errorCount(), "Preflight validation should have exactly one error");
        Assertions.assertFalse(findConfigInfo("transaction.boundary", validateConnectorConfig5).configValue().errors().isEmpty(), "Preflight validation for transaction boundary property should have at least one error message");
        hashMap.put(MonitorableSourceConnector.CUSTOM_TRANSACTION_BOUNDARIES_CONFIG, "fail");
        ConfigInfos validateConnectorConfig6 = this.connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), hashMap);
        Assertions.assertEquals(1, validateConnectorConfig6.errorCount(), "Preflight validation should have exactly one error");
        Assertions.assertFalse(findConfigInfo("transaction.boundary", validateConnectorConfig6).configValue().errors().isEmpty(), "Preflight validation for transaction boundary property should have at least one error message");
        hashMap.put(MonitorableSourceConnector.CUSTOM_TRANSACTION_BOUNDARIES_CONFIG, "supported");
        Assertions.assertEquals(0, this.connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), hashMap).errorCount(), "Preflight validation should have zero errors");
    }

    @Test
    public void testPollBoundary() throws Exception {
        this.workerProps.put("offset.flush.interval.ms", "600000");
        this.connectBuilder.numWorkers(1);
        startConnect();
        this.connect.kafka().createTopic("test-topic", DEFAULT_NUM_WORKERS);
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", MonitorableSourceConnector.class.getName());
        hashMap.put("tasks.max", Integer.toString(1));
        hashMap.put(MonitorableSourceConnector.TOPIC_CONFIG, "test-topic");
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        hashMap.put("name", CONNECTOR_NAME);
        hashMap.put("transaction.boundary", SourceTask.TransactionBoundary.POLL.toString());
        hashMap.put(MonitorableSourceConnector.MESSAGES_PER_POLL_CONFIG, MESSAGES_PER_POLL);
        hashMap.put(MonitorableSourceConnector.MAX_MESSAGES_PER_SECOND_CONFIG, MESSAGES_PER_SECOND);
        this.connectorHandle.expectedRecords(MINIMUM_MESSAGES);
        this.connectorHandle.expectedCommits(MINIMUM_MESSAGES);
        this.connect.configureConnector(CONNECTOR_NAME, hashMap);
        log.info("Waiting for records to be provided to worker by task");
        this.connectorHandle.awaitRecords(30000L);
        log.info("Waiting for records to be committed to Kafka by worker");
        this.connectorHandle.awaitCommits(TimeUnit.MINUTES.toMillis(1L));
        StartAndStopLatch expectedStops = this.connectorHandle.expectedStops(1, true);
        this.connect.deleteConnector(CONNECTOR_NAME);
        assertConnectorStopped(expectedStops);
        ConsumerRecords<byte[], byte[]> consumeAll = this.connect.kafka().consumeAll(ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, Collections.singletonMap("isolation.level", "read_committed"), null, "test-topic");
        Assertions.assertTrue(consumeAll.count() >= MINIMUM_MESSAGES, "Not enough records produced by source connector. Expected at least: 100 + but got " + consumeAll.count());
        assertExactlyOnceSeqnos(consumeAll, 1);
    }

    @Test
    public void testIntervalBoundary() throws Exception {
        this.workerProps.put("offset.flush.interval.ms", "600000");
        this.connectBuilder.numWorkers(1);
        startConnect();
        this.connect.kafka().createTopic("test-topic", DEFAULT_NUM_WORKERS);
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", MonitorableSourceConnector.class.getName());
        hashMap.put("tasks.max", Integer.toString(1));
        hashMap.put(MonitorableSourceConnector.TOPIC_CONFIG, "test-topic");
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        hashMap.put("name", CONNECTOR_NAME);
        hashMap.put("transaction.boundary", SourceTask.TransactionBoundary.INTERVAL.toString());
        hashMap.put("transaction.boundary.interval.ms", "10000");
        hashMap.put(MonitorableSourceConnector.MESSAGES_PER_POLL_CONFIG, MESSAGES_PER_POLL);
        hashMap.put(MonitorableSourceConnector.MAX_MESSAGES_PER_SECOND_CONFIG, MESSAGES_PER_SECOND);
        this.connectorHandle.expectedRecords(MINIMUM_MESSAGES);
        this.connectorHandle.expectedCommits(MINIMUM_MESSAGES);
        this.connect.configureConnector(CONNECTOR_NAME, hashMap);
        log.info("Waiting for records to be provided to worker by task");
        this.connectorHandle.awaitRecords(30000L);
        log.info("Waiting for records to be committed to Kafka by worker");
        this.connectorHandle.awaitCommits(TimeUnit.MINUTES.toMillis(1L));
        StartAndStopLatch expectedStops = this.connectorHandle.expectedStops(1, true);
        this.connect.deleteConnector(CONNECTOR_NAME);
        assertConnectorStopped(expectedStops);
        ConsumerRecords<byte[], byte[]> consumeAll = this.connect.kafka().consumeAll(ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, Collections.singletonMap("isolation.level", "read_committed"), null, "test-topic");
        Assertions.assertTrue(consumeAll.count() >= MINIMUM_MESSAGES, "Not enough records produced by source connector. Expected at least: 100 + but got " + consumeAll.count());
        assertExactlyOnceSeqnos(consumeAll, 1);
    }

    @Test
    public void testConnectorBoundary() throws Exception {
        this.workerProps.put("offset.storage.topic", "exactly-once-source-cluster-offsets");
        this.connectBuilder.numWorkers(1);
        startConnect();
        this.connect.kafka().createTopic("test-topic", DEFAULT_NUM_WORKERS);
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", MonitorableSourceConnector.class.getName());
        hashMap.put("tasks.max", "1");
        hashMap.put(MonitorableSourceConnector.TOPIC_CONFIG, "test-topic");
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        hashMap.put("name", CONNECTOR_NAME);
        hashMap.put("transaction.boundary", SourceTask.TransactionBoundary.CONNECTOR.toString());
        hashMap.put(MonitorableSourceConnector.CUSTOM_TRANSACTION_BOUNDARIES_CONFIG, "supported");
        hashMap.put(MonitorableSourceConnector.MESSAGES_PER_POLL_CONFIG, MESSAGES_PER_POLL);
        hashMap.put(MonitorableSourceConnector.MAX_MESSAGES_PER_SECOND_CONFIG, MESSAGES_PER_SECOND);
        this.connectorHandle.expectedRecords(233);
        this.connectorHandle.expectedCommits(233);
        this.connect.configureConnector(CONNECTOR_NAME, hashMap);
        log.info("Waiting for records to be provided to worker by task");
        this.connectorHandle.awaitRecords(30000L);
        log.info("Waiting for records to be committed to Kafka by worker");
        this.connectorHandle.awaitCommits(TimeUnit.MINUTES.toMillis(1L));
        HashMap hashMap2 = new HashMap();
        hashMap2.put("isolation.level", "read_committed");
        ConsumerRecords<byte[], byte[]> consumeAll = this.connect.kafka().consumeAll(ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, Collections.singletonMap("isolation.level", "read_committed"), null, "test-topic");
        Assertions.assertTrue(consumeAll.count() >= MINIMUM_MESSAGES, "Not enough records produced by source connector. Expected at least: 100 + but got " + consumeAll.count());
        ArrayList arrayList = new ArrayList();
        long j = 1;
        long j2 = 1;
        while (j2 <= 100) {
            arrayList.add(Long.valueOf(j2));
            j2 += j;
            j = j2 - j;
        }
        Assertions.assertEquals(arrayList, parseAndAssertOffsetsForSingleTask(this.connect.kafka().consume(arrayList.size(), TimeUnit.MINUTES.toMillis(1L), hashMap2, "exactly-once-source-cluster-offsets")).subList(0, arrayList.size()), "Committed offsets should match connector-defined transaction boundaries");
        List list = (List) LongStream.range(1L, 101L).boxed().collect(Collectors.toList());
        long j3 = 1;
        long j4 = 2;
        while (j3 < ((Long) list.get(list.size() - 1)).longValue()) {
            if (j4 % 2 == 0) {
                long j5 = j3;
                while (true) {
                    long j6 = j5 + 1;
                    if (j6 < j4 + 1) {
                        list.remove(Long.valueOf(j6));
                        j5 = j6;
                    }
                }
            }
            j4 += j3;
            j3 = j4 - j3;
        }
        List<Long> parseAndAssertValuesForSingleTask = parseAndAssertValuesForSingleTask(consumeAll);
        Collections.sort(parseAndAssertValuesForSingleTask);
        Assertions.assertEquals(list, parseAndAssertValuesForSingleTask.subList(0, list.size()), "Committed records should exclude connector-aborted transactions");
    }

    @Test
    public void testFencedLeaderRecovery() throws Exception {
        this.connectBuilder.numWorkers(1);
        this.workerProps.put("offset.flush.interval.ms", "600000");
        startConnect();
        this.connect.kafka().createTopic("test-topic", DEFAULT_NUM_WORKERS);
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", MonitorableSourceConnector.class.getName());
        hashMap.put("tasks.max", Integer.toString(1));
        hashMap.put(MonitorableSourceConnector.TOPIC_CONFIG, "test-topic");
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        hashMap.put("name", CONNECTOR_NAME);
        hashMap.put("transaction.boundary", SourceTask.TransactionBoundary.POLL.toString());
        hashMap.put(MonitorableSourceConnector.MESSAGES_PER_POLL_CONFIG, MESSAGES_PER_POLL);
        hashMap.put(MonitorableSourceConnector.MAX_MESSAGES_PER_SECOND_CONFIG, MESSAGES_PER_SECOND);
        this.connectorHandle.expectedRecords(MINIMUM_MESSAGES);
        this.connectorHandle.expectedCommits(MINIMUM_MESSAGES);
        KafkaProducer<byte[], byte[]> transactionalProducer = transactionalProducer("simulated-zombie-leader", DistributedConfig.transactionalProducerId(CLUSTER_GROUP_ID));
        transactionalProducer.initTransactions();
        transactionalProducer.close();
        Assertions.assertThrows(ConnectRestException.class, () -> {
            this.connect.configureConnector(CONNECTOR_NAME, hashMap);
        });
        this.connect.configureConnector(CONNECTOR_NAME, hashMap);
        log.info("Waiting for records to be provided to worker by task");
        this.connectorHandle.awaitRecords(30000L);
        log.info("Waiting for records to be committed to Kafka by worker");
        this.connectorHandle.awaitCommits(TimeUnit.MINUTES.toMillis(1L));
        StartAndStopLatch expectedStops = this.connectorHandle.expectedStops(1, true);
        this.connect.deleteConnector(CONNECTOR_NAME);
        assertConnectorStopped(expectedStops);
        ConsumerRecords<byte[], byte[]> consumeAll = this.connect.kafka().consumeAll(ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, Collections.singletonMap("isolation.level", "read_committed"), null, "test-topic");
        Assertions.assertTrue(consumeAll.count() >= MINIMUM_MESSAGES, "Not enough records produced by source connector. Expected at least: 100 + but got " + consumeAll.count());
        assertExactlyOnceSeqnos(consumeAll, 1);
    }

    @Test
    public void testConnectorReconfiguration() throws Exception {
        this.workerProps.put("offset.flush.interval.ms", "600000");
        startConnect();
        this.connect.kafka().createTopic("test-topic", DEFAULT_NUM_WORKERS);
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", MonitorableSourceConnector.class.getName());
        hashMap.put(MonitorableSourceConnector.TOPIC_CONFIG, "test-topic");
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        hashMap.put("name", CONNECTOR_NAME);
        hashMap.put(MonitorableSourceConnector.MESSAGES_PER_POLL_CONFIG, MESSAGES_PER_POLL);
        hashMap.put(MonitorableSourceConnector.MAX_MESSAGES_PER_SECOND_CONFIG, MESSAGES_PER_SECOND);
        this.connectorHandle.expectedRecords(MINIMUM_MESSAGES);
        this.connectorHandle.expectedCommits(MINIMUM_MESSAGES);
        StartAndStopLatch connectorAndTaskStart = connectorAndTaskStart(DEFAULT_NUM_WORKERS);
        hashMap.put("tasks.max", "3");
        this.connect.configureConnector(CONNECTOR_NAME, hashMap);
        assertConnectorStarted(connectorAndTaskStart);
        assertProducersAreFencedOnReconfiguration(DEFAULT_NUM_WORKERS, 5, "test-topic", hashMap);
        assertProducersAreFencedOnReconfiguration(5, 1, "test-topic", hashMap);
        assertProducersAreFencedOnReconfiguration(1, 5, "test-topic", hashMap);
        assertProducersAreFencedOnReconfiguration(5, DEFAULT_NUM_WORKERS, "test-topic", hashMap);
        log.info("Waiting for records to be provided to worker by task");
        this.connectorHandle.awaitRecords(30000L);
        log.info("Waiting for records to be committed to Kafka by worker");
        this.connectorHandle.awaitCommits(TimeUnit.MINUTES.toMillis(1L));
        StartAndStopLatch expectedStops = this.connectorHandle.expectedStops(1, true);
        this.connect.deleteConnector(CONNECTOR_NAME);
        assertConnectorStopped(expectedStops);
        ConsumerRecords<byte[], byte[]> consumeAll = this.connect.kafka().consumeAll(ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, Collections.singletonMap("isolation.level", "read_committed"), null, "test-topic");
        Assertions.assertTrue(consumeAll.count() >= MINIMUM_MESSAGES, "Not enough records produced by source connector. Expected at least: 100 + but got " + consumeAll.count());
        assertExactlyOnceSeqnos(consumeAll, 5);
    }

    @Test
    public void testTasksFailOnInabilityToFence() throws Exception {
        this.brokerProps.put("listener.security.protocol.map", "CONTROLLER:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT");
        this.brokerProps.put("authorizer.class.name", "org.apache.kafka.metadata.authorizer.StandardAuthorizer");
        this.brokerProps.put("sasl.enabled.mechanisms", "PLAIN");
        this.brokerProps.put("sasl.mechanism.inter.broker.protocol", "PLAIN");
        this.brokerProps.put("sasl.mechanism.controller.protocol", "PLAIN");
        this.brokerProps.put("listener.name.external.plain.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"super\" password=\"super_pwd\" user_connector=\"connector_pwd\" user_super=\"super_pwd\";");
        this.brokerProps.put("listener.name.controller.plain.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"super\" password=\"super_pwd\" user_connector=\"connector_pwd\" user_super=\"super_pwd\";");
        this.brokerProps.put("super.users", "User:super");
        HashMap hashMap = new HashMap();
        hashMap.put("sasl.mechanism", "PLAIN");
        hashMap.put("security.protocol", "SASL_PLAINTEXT");
        hashMap.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"super\" password=\"super_pwd\";");
        this.workerProps.putAll(hashMap);
        this.workerProps.put("offset.storage.topic", "connect-worker-offsets-topic");
        this.connectBuilder.clientProps(hashMap);
        startConnect();
        Admin createAdminClient = this.connect.kafka().createAdminClient();
        try {
            createAdminClient.createTopics(Collections.singleton(new NewTopic("test-topic", DEFAULT_NUM_WORKERS, (short) 1))).all().get();
            if (createAdminClient != null) {
                createAdminClient.close();
            }
            HashMap hashMap2 = new HashMap();
            int i = 2;
            hashMap2.put("connector.class", MonitorableSourceConnector.class.getName());
            hashMap2.put(MonitorableSourceConnector.TOPIC_CONFIG, "test-topic");
            hashMap2.put("key.converter", StringConverter.class.getName());
            hashMap2.put("value.converter", StringConverter.class.getName());
            hashMap2.put("name", CONNECTOR_NAME);
            hashMap2.put("tasks.max", Integer.toString(2));
            hashMap.forEach((str, str2) -> {
                hashMap2.put("consumer.override." + str, str2);
                hashMap2.put("producer.override." + str, str2);
            });
            hashMap2.put("admin.override.sasl.mechanism", "PLAIN");
            hashMap2.put("admin.override.security.protocol", "SASL_PLAINTEXT");
            hashMap2.put("admin.override.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"connector\" password=\"connector_pwd\";");
            createAdminClient = this.connect.kafka().createAdminClient();
            try {
                createAdminClient.createAcls(Arrays.asList(new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test-topic", PatternType.LITERAL), new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.TOPIC, "connect-worker-offsets-topic", PatternType.LITERAL), new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW)))).all().get();
                if (createAdminClient != null) {
                    createAdminClient.close();
                }
                log.info("Bringing up connector with fresh slate; fencing should not be necessary");
                this.connect.configureConnector(CONNECTOR_NAME, hashMap2);
                TestUtils.waitForCondition(() -> {
                    ConnectorStateInfo connectorStatus = this.connect.connectorStatus(CONNECTOR_NAME);
                    if ("RUNNING".equals(connectorStatus.connector().state())) {
                        return true;
                    }
                    if (!"FAILED".equals(connectorStatus.connector().state())) {
                        return false;
                    }
                    log.debug("Restarting failed connector {}", CONNECTOR_NAME);
                    this.connect.restartConnector(CONNECTOR_NAME);
                    return false;
                }, 30000L, "Connector was not able to start in time, or ACL updates were not propagated across the Kafka cluster soon enough");
                this.connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 2, "Connector and task should have started successfully");
                log.info("Reconfiguring connector; fencing should be necessary, and tasks should fail to start");
                hashMap2.put("message.in.a.bottle", "19e184427ac45bd34c8588a4e771aa1a");
                this.connect.configureConnector(CONNECTOR_NAME, hashMap2);
                this.connect.assertions().assertConnectorIsRunningAndTasksHaveFailed(CONNECTOR_NAME, 2, "Task should have failed on startup");
                Admin createAdminClient2 = this.connect.kafka().createAdminClient();
                try {
                    createAdminClient2.createAcls(Arrays.asList(new AclBinding(new ResourcePattern(ResourceType.TRANSACTIONAL_ID, Worker.taskTransactionalId(CLUSTER_GROUP_ID, CONNECTOR_NAME, 0), PatternType.LITERAL), new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.TRANSACTIONAL_ID, Worker.taskTransactionalId(CLUSTER_GROUP_ID, CONNECTOR_NAME, 1), PatternType.LITERAL), new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW))));
                    if (createAdminClient2 != null) {
                        createAdminClient2.close();
                    }
                    log.info("Restarting connector after tweaking its ACLs; fencing should succeed this time");
                    this.connect.restartConnectorAndTasks(CONNECTOR_NAME, false, true, false);
                    TestUtils.waitForCondition(() -> {
                        ConnectorStateInfo connectorStatus = this.connect.connectorStatus(CONNECTOR_NAME);
                        boolean equals = "RUNNING".equals(connectorStatus.connector().state());
                        boolean allMatch = connectorStatus.tasks().stream().allMatch(taskState -> {
                            return "RUNNING".equals(taskState.state());
                        });
                        boolean z = connectorStatus.tasks().size() == i;
                        if (equals && allMatch && z) {
                            return true;
                        }
                        if (!equals && "FAILED".equals(connectorStatus.connector().state())) {
                            throw new NoRetryException(new AssertionError("Connector exactlyOnceQuestionMark has failed unexpectedly"));
                        }
                        connectorStatus.tasks().stream().filter(taskState2 -> {
                            return "FAILED".equals(taskState2.state());
                        }).map((v0) -> {
                            return v0.id();
                        }).forEach(num -> {
                            this.connect.restartTask(CONNECTOR_NAME, num.intValue());
                        });
                        return false;
                    }, ConnectAssertions.CONNECTOR_SETUP_DURATION_MS, "Connector and task should have restarted successfully");
                } finally {
                    if (createAdminClient2 != null) {
                        try {
                            createAdminClient2.close();
                        } catch (Throwable th) {
                            th.addSuppressed(th);
                        }
                    }
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testSeparateOffsetsTopic() throws Exception {
        this.workerProps.put("offset.storage.topic", "connect-worker-offsets-topic");
        startConnect();
        int i = 1;
        EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster(1, this.brokerProps);
        Objects.requireNonNull(embeddedKafkaCluster);
        Closeable closeable = embeddedKafkaCluster::stop;
        try {
            embeddedKafkaCluster.start();
            TestUtils.waitForCondition(() -> {
                return embeddedKafkaCluster.runningBrokers().size() == i;
            }, ConnectAssertions.WORKER_SETUP_DURATION_MS, "Separate Kafka cluster did not start in time");
            embeddedKafkaCluster.createTopic("test-topic", DEFAULT_NUM_WORKERS);
            HashMap hashMap = new HashMap();
            hashMap.put("connector.class", MonitorableSourceConnector.class.getName());
            hashMap.put("tasks.max", Integer.toString(1));
            hashMap.put(MonitorableSourceConnector.TOPIC_CONFIG, "test-topic");
            hashMap.put("key.converter", StringConverter.class.getName());
            hashMap.put("value.converter", StringConverter.class.getName());
            hashMap.put("name", CONNECTOR_NAME);
            hashMap.put("transaction.boundary", SourceTask.TransactionBoundary.POLL.toString());
            hashMap.put(MonitorableSourceConnector.MESSAGES_PER_POLL_CONFIG, MESSAGES_PER_POLL);
            hashMap.put(MonitorableSourceConnector.MAX_MESSAGES_PER_SECOND_CONFIG, MESSAGES_PER_SECOND);
            hashMap.put("producer.override.bootstrap.servers", embeddedKafkaCluster.bootstrapServers());
            hashMap.put("consumer.override.bootstrap.servers", embeddedKafkaCluster.bootstrapServers());
            hashMap.put("admin.override.bootstrap.servers", embeddedKafkaCluster.bootstrapServers());
            hashMap.put("offsets.storage.topic", "exactlyOnceQuestionMark-offsets");
            this.connectorHandle.expectedRecords(MINIMUM_MESSAGES);
            this.connectorHandle.expectedCommits(MINIMUM_MESSAGES);
            this.connect.configureConnector(CONNECTOR_NAME, hashMap);
            this.connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, "connector and tasks did not start in time");
            log.info("Waiting for records to be provided to worker by task");
            this.connectorHandle.awaitRecords(30000L);
            log.info("Waiting for records to be committed to Kafka by worker");
            this.connectorHandle.awaitCommits(TimeUnit.MINUTES.toMillis(1L));
            int count = embeddedKafkaCluster.consume(MINIMUM_MESSAGES, TimeUnit.MINUTES.toMillis(1L), Collections.singletonMap("isolation.level", "read_committed"), "test-topic").count();
            Assertions.assertTrue(count >= MINIMUM_MESSAGES, "Not enough records produced by source connector. Expected at least: 100 + but got " + count);
            parseAndAssertOffsetsForSingleTask(embeddedKafkaCluster.consumeAll(TimeUnit.MINUTES.toMillis(1L), Collections.singletonMap("isolation.level", "read_committed"), null, "exactlyOnceQuestionMark-offsets")).forEach(l -> {
                Assertions.assertEquals(0L, l.longValue() % 100, "Offset commits should occur on connector-defined poll boundaries, which happen every 100 records");
            });
            parseAndAssertOffsetsForSingleTask(this.connect.kafka().consumeAll(TimeUnit.MINUTES.toMillis(1L), "connect-worker-offsets-topic")).forEach(l2 -> {
                Assertions.assertEquals(0L, l2.longValue() % 100, "Offset commits should occur on connector-defined poll boundaries, which happen every 100 records");
            });
            Set<WorkerHandle> workers = this.connect.workers();
            EmbeddedConnectCluster embeddedConnectCluster = this.connect;
            Objects.requireNonNull(embeddedConnectCluster);
            workers.forEach(embeddedConnectCluster::removeWorker);
            this.workerProps.put("exactly.once.source.support", "disabled");
            this.connectorHandle.expectedRecords(MINIMUM_MESSAGES);
            this.connectorHandle.expectedCommits(MINIMUM_MESSAGES);
            for (int i2 = 0; i2 < DEFAULT_NUM_WORKERS; i2++) {
                this.connect.addWorker();
            }
            this.connect.assertions().assertAtLeastNumWorkersAreUp(DEFAULT_NUM_WORKERS, "cluster did not restart in time");
            this.connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, "connector and tasks did not resume running after cluster restart in time");
            log.info("Waiting for records to be provided to worker by task");
            this.connectorHandle.awaitRecords(30000L);
            log.info("Waiting for records to be committed to Kafka by worker");
            this.connectorHandle.awaitCommits(TimeUnit.MINUTES.toMillis(1L));
            StartAndStopLatch expectedStops = this.connectorHandle.expectedStops(1, true);
            this.connect.deleteConnector(CONNECTOR_NAME);
            assertConnectorStopped(expectedStops);
            ConsumerRecords<byte[], byte[]> consumeAll = embeddedKafkaCluster.consumeAll(ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, Collections.singletonMap("isolation.level", "read_committed"), null, "test-topic");
            Assertions.assertTrue(consumeAll.count() >= MINIMUM_MESSAGES, "Not enough records produced by source connector. Expected at least: 100 + but got " + consumeAll.count());
            assertAtLeastOnceSeqnos(consumeAll, embeddedKafkaCluster.consumeAll(ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, Collections.singletonMap("isolation.level", "read_committed"), null, "exactlyOnceQuestionMark-offsets"), 1);
            if (closeable != null) {
                closeable.close();
            }
        } catch (Throwable th) {
            if (closeable != null) {
                try {
                    closeable.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testPotentialDeadlockWhenProducingToOffsetsTopic() throws Exception {
        this.connectBuilder.numWorkers(1);
        startConnect();
        this.connect.kafka().createTopic("test-topic", DEFAULT_NUM_WORKERS);
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", NaughtyConnector.class.getName());
        hashMap.put("tasks.max", "1");
        hashMap.put("name", CONNECTOR_NAME);
        hashMap.put("transaction.boundary", SourceTask.TransactionBoundary.INTERVAL.toString());
        hashMap.put("offsets.storage.topic", "whoops");
        this.connect.configureConnector(CONNECTOR_NAME, hashMap);
        this.connect.assertions().assertConnectorIsRunningAndTasksHaveFailed(CONNECTOR_NAME, 1, "Task should have failed after trying to produce to its own offsets topic");
    }

    private ConfigInfo findConfigInfo(String str, ConfigInfos configInfos) {
        return (ConfigInfo) configInfos.values().stream().filter(configInfo -> {
            return str.equals(configInfo.configKey().name());
        }).findAny().orElseThrow(() -> {
            return new AssertionError("Failed to find configuration validation result for property '" + str + "'");
        });
    }

    private List<Long> parseAndAssertOffsetsForSingleTask(ConsumerRecords<byte[], byte[]> consumerRecords) {
        Map<Integer, List<Long>> parseOffsetForTasks = parseOffsetForTasks(consumerRecords);
        Assertions.assertEquals(Collections.singleton(0), parseOffsetForTasks.keySet(), "Expected records to only be produced from a single task");
        return parseOffsetForTasks.get(0);
    }

    private List<Long> parseAndAssertValuesForSingleTask(ConsumerRecords<byte[], byte[]> consumerRecords) {
        Map<Integer, List<Long>> parseValuesForTasks = parseValuesForTasks(consumerRecords);
        Assertions.assertEquals(Collections.singleton(0), parseValuesForTasks.keySet(), "Expected records to only be produced from a single task");
        return parseValuesForTasks.get(0);
    }

    private void assertExactlyOnceSeqnos(ConsumerRecords<byte[], byte[]> consumerRecords, int i) {
        assertSeqnos(parseValuesForTasks(consumerRecords), i);
    }

    private void assertAtLeastOnceSeqnos(ConsumerRecords<byte[], byte[]> consumerRecords, ConsumerRecords<byte[], byte[]> consumerRecords2, int i) {
        Map<Integer, List<Long>> parseValuesForTasks = parseValuesForTasks(consumerRecords);
        Map map = (Map) parseOffsetForTasks(consumerRecords2).entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return (Long) Collections.max((Collection) entry.getValue());
        }));
        parseValuesForTasks.replaceAll((num, list) -> {
            Long l = (Long) map.get(num);
            Assertions.assertNotNull(l, "No committed offset found for task " + num);
            return (List) list.stream().filter(l2 -> {
                return l2.longValue() <= l.longValue();
            }).collect(Collectors.toList());
        });
        assertSeqnos(parseValuesForTasks, i);
    }

    private void assertSeqnos(Map<Integer, List<Long>> map, int i) {
        Assertions.assertEquals((Set) IntStream.range(0, i).boxed().collect(Collectors.toSet()), map.keySet(), "Expected records to be produced by each task");
        map.forEach((num, list) -> {
            Set set = (Set) LongStream.range(1L, list.size() + 1).boxed().collect(Collectors.toSet());
            HashSet hashSet = new HashSet(list);
            HashSet hashSet2 = new HashSet(set);
            hashSet2.removeAll(hashSet);
            HashSet hashSet3 = new HashSet(hashSet);
            hashSet3.removeAll(set);
            Assertions.assertTrue(hashSet2.isEmpty() && hashSet3.isEmpty(), "Seqnos for task " + num + " should start at 1 and increase strictly by 1 with each record, but the actual seqnos did not.\nSeqnos that should have been emitted but were not: " + String.valueOf(hashSet2) + "\nseqnos that should not have been emitted but were: " + String.valueOf(hashSet3));
        });
    }

    private Map<Integer, List<Long>> parseValuesForTasks(ConsumerRecords<byte[], byte[]> consumerRecords) {
        HashMap hashMap = new HashMap();
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            Assertions.assertNotNull(consumerRecord.key(), "Record key should not be null");
            Assertions.assertNotNull(consumerRecord.value(), "Record value should not be null");
            String str = new String((byte[]) consumerRecord.key());
            String str2 = new String((byte[]) consumerRecord.value());
            Assertions.assertTrue(str.startsWith("key-"), "Key should start with \"" + "key-" + "\"");
            Assertions.assertTrue(str2.startsWith("value-"), "Value should start with \"" + "value-" + "\"");
            Assertions.assertEquals(str.substring("key-".length()), str2.substring("value-".length()), "key and value should be identical after prefix");
            String[] split = str.substring("key-".length()).split("-");
            Assertions.assertEquals(DEFAULT_NUM_WORKERS, split.length, "Key should match pattern 'key-<connectorName>-<taskId>-<seqno>");
            Assertions.assertEquals(CONNECTOR_NAME, split[0], "Key should match pattern 'key-<connectorName>-<taskId>-<seqno>");
            try {
                try {
                    ((List) hashMap.computeIfAbsent(Integer.valueOf(Integer.parseInt(split[1], 10)), num -> {
                        return new ArrayList();
                    })).add(Long.valueOf(Long.parseLong(split[2], 10)));
                } catch (NumberFormatException e) {
                    throw new AssertionError("Seqno in key should be a long, was '" + split[2] + "'", e);
                }
            } catch (NumberFormatException e2) {
                throw new AssertionError("Task ID in key should be an integer, was '" + split[1] + "'", e2);
            }
        }
        return hashMap;
    }

    private Map<Integer, List<Long>> parseOffsetForTasks(ConsumerRecords<byte[], byte[]> consumerRecords) {
        JsonConverter jsonConverter = new JsonConverter();
        jsonConverter.configure(Collections.singletonMap("schemas.enable", "false"), false);
        HashMap hashMap = new HashMap();
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            Object value = jsonConverter.toConnectData("topic name is not used by converter", (byte[]) consumerRecord.key()).value();
            Object value2 = jsonConverter.toConnectData("topic name is not used by converter", (byte[]) consumerRecord.value()).value();
            Assertions.assertNotNull(value, "Offset key should not be null");
            Assertions.assertNotNull(value2, "Offset value should not be null");
            List list = (List) assertAndCast(value, List.class, "Key");
            Assertions.assertEquals(2, list.size(), "Offset topic key should be a list containing two elements: the name of the connector, and the connector-provided source partition");
            Assertions.assertEquals(CONNECTOR_NAME, list.get(0));
            Object obj = ((Map) assertAndCast(list.get(1), Map.class, "Key[1]")).get("task.id");
            Assertions.assertNotNull(obj, "Serialized source partition should contain 'task.id' field from MonitorableSourceConnector");
            String str = (String) assertAndCast(obj, String.class, "task ID");
            Assertions.assertTrue(str.startsWith("exactlyOnceQuestionMark-"), "task ID should match pattern '<connectorName>-<taskId>");
            try {
                int parseInt = Integer.parseInt(str.substring(CONNECTOR_NAME.length() + 1));
                Object obj2 = ((Map) assertAndCast(value2, Map.class, "Value")).get("saved");
                Assertions.assertNotNull(obj2, "Serialized source offset should contain 'seqno' field from MonitorableSourceConnector");
                ((List) hashMap.computeIfAbsent(Integer.valueOf(parseInt), num -> {
                    return new ArrayList();
                })).add(Long.valueOf(((Long) assertAndCast(obj2, Long.class, "Seqno offset field")).longValue()));
            } catch (NumberFormatException e) {
                throw new AssertionError("task ID should match pattern '<connectorName>-<taskId>', where <taskId> is an integer", e);
            }
        }
        return hashMap;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private static <T> T assertAndCast(Object obj, Class<T> cls, String str) {
        Assertions.assertInstanceOf(cls, obj, str + " should be " + cls.getName() + "; was " + (obj == 0 ? "null" : obj.getClass().getName()) + " instead");
        return obj;
    }

    private StartAndStopLatch connectorAndTaskStart(int i) {
        this.connectorHandle.clearTasks();
        Stream mapToObj = IntStream.range(0, i).mapToObj(i2 -> {
            return MonitorableSourceConnector.taskId(CONNECTOR_NAME, i2);
        });
        ConnectorHandle connectorHandle = this.connectorHandle;
        Objects.requireNonNull(connectorHandle);
        mapToObj.forEach(connectorHandle::taskHandle);
        return this.connectorHandle.expectedStarts(1, true);
    }

    private void assertConnectorStarted(StartAndStopLatch startAndStopLatch) throws InterruptedException {
        Assertions.assertTrue(startAndStopLatch.await(ConnectAssertions.CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS), "Connector and tasks did not finish startup in time");
    }

    private void assertConnectorStopped(StartAndStopLatch startAndStopLatch) throws InterruptedException {
        Assertions.assertTrue(startAndStopLatch.await(ConnectAssertions.CONNECTOR_SHUTDOWN_DURATION_MS, TimeUnit.MILLISECONDS), "Connector and tasks did not finish shutdown in time");
    }

    private void assertProducersAreFencedOnReconfiguration(int i, int i2, String str, Map<String, String> map) throws InterruptedException {
        List list = (List) IntStream.range(0, i).mapToObj(i3 -> {
            return transactionalProducer("simulated-task-producer-exactlyOnceQuestionMark-" + i3, Worker.taskTransactionalId(CLUSTER_GROUP_ID, CONNECTOR_NAME, i3));
        }).collect(Collectors.toList());
        list.forEach((v0) -> {
            v0.initTransactions();
        });
        StartAndStopLatch connectorAndTaskStart = connectorAndTaskStart(i2);
        map.put("tasks.max", Integer.toString(i2));
        log.info("Reconfiguring connector from {} tasks to {}", Integer.valueOf(i), Integer.valueOf(i2));
        this.connect.configureConnector(CONNECTOR_NAME, map);
        assertConnectorStarted(connectorAndTaskStart);
        list.forEach(kafkaProducer -> {
            assertTransactionalProducerIsFenced(kafkaProducer, str);
        });
    }

    private KafkaProducer<byte[], byte[]> transactionalProducer(String str, String str2) {
        HashMap hashMap = new HashMap();
        hashMap.put("client.id", str);
        hashMap.put("enable.idempotence", true);
        hashMap.put("transactional.id", str2);
        return this.connect.kafka().createProducer(hashMap);
    }

    private void assertTransactionalProducerIsFenced(KafkaProducer<byte[], byte[]> kafkaProducer, String str) {
        kafkaProducer.beginTransaction();
        Assertions.assertThrows(InvalidProducerEpochException.class, () -> {
            kafkaProducer.send(new ProducerRecord(str, new byte[]{69}, new byte[]{96}));
            kafkaProducer.commitTransaction();
        }, "Producer should be fenced out");
        kafkaProducer.close(Duration.ZERO);
    }
}
