package org.apache.kafka.connect.runtime;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.integration.MonitorableSourceConnector;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ProcessingContext;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.test.util.ConcurrencyUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.TopicAdmin;
import org.apache.kafka.connect.util.TopicCreationGroup;
import org.apache.log4j.Level;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.AdditionalAnswers;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.mockito.stubbing.Answer;

@ExtendWith({MockitoExtension.class})
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
/* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerSourceTaskTest.class */
public class WorkerSourceTaskTest {
    public static final String POLL_TIMEOUT_MSG = "Timeout waiting for poll";
    private static final String TOPIC = "topic";
    private final ExecutorService executor = Executors.newSingleThreadExecutor();
    private final ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
    private WorkerConfig config;
    private SourceConnectorConfig sourceConfig;
    private Plugins plugins;
    private MockConnectMetrics metrics;

    @Mock
    private SourceTask sourceTask;

    @Mock
    private Converter keyConverter;

    @Mock
    private Converter valueConverter;

    @Mock
    private HeaderConverter headerConverter;

    @Mock
    private TransformationChain<SourceRecord, SourceRecord> transformationChain;

    @Mock
    private KafkaProducer<byte[], byte[]> producer;

    @Mock
    private TopicAdmin admin;

    @Mock
    private CloseableOffsetStorageReader offsetReader;

    @Mock
    private OffsetStorageWriter offsetWriter;

    @Mock
    private ConnectorOffsetBackingStore offsetStore;

    @Mock
    private ClusterConfigState clusterConfigState;
    private WorkerSourceTask workerTask;

    @Mock
    private TaskStatus.Listener statusListener;

    @Mock
    private StatusBackingStore statusBackingStore;

    @Mock
    private ErrorHandlingMetrics errorHandlingMetrics;
    private static final TaskConfig TASK_CONFIG;
    private static final List<SourceRecord> RECORDS;
    private static final Map<String, Object> PARTITION = Collections.singletonMap("key", "partition".getBytes());
    private static final Map<String, Object> OFFSET = Collections.singletonMap("key", 12);
    private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
    private static final Integer KEY = -1;
    private static final Schema RECORD_SCHEMA = Schema.INT64_SCHEMA;
    private static final Long RECORD = 12L;
    private static final byte[] SERIALIZED_KEY = "converted-key".getBytes();
    private static final byte[] SERIALIZED_RECORD = "converted-record".getBytes();
    private static final Map<String, String> TASK_PROPS = new HashMap();

    /* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerSourceTaskTest$TestSourceTask.class */
    private static abstract class TestSourceTask extends SourceTask {
        private TestSourceTask() {
        }
    }

    public void setup(boolean z) {
        Map<String, String> workerProps = workerProps(z);
        this.plugins = new Plugins(workerProps);
        this.config = new StandaloneConfig(workerProps);
        this.sourceConfig = new SourceConnectorConfig(this.plugins, sourceConnectorPropsWithGroups("topic"), true);
        this.metrics = new MockConnectMetrics();
    }

    private Map<String, String> workerProps(boolean z) {
        HashMap hashMap = new HashMap();
        hashMap.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("offset.storage.file.filename", "/tmp/connect.offsets");
        hashMap.put("topic.creation.enable", String.valueOf(z));
        return hashMap;
    }

    private Map<String, String> sourceConnectorPropsWithGroups(String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("name", "foo-connector");
        hashMap.put("connector.class", MonitorableSourceConnector.class.getSimpleName());
        hashMap.put("tasks.max", String.valueOf(1));
        hashMap.put("topic", str);
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        hashMap.put("topic.creation.groups", String.join(",", "foo", "bar"));
        hashMap.put("topic.creation.default.replication.factor", String.valueOf(1));
        hashMap.put("topic.creation.default.partitions", String.valueOf(1));
        hashMap.put("topic.creation.foo.include", str);
        hashMap.put("topic.creation.bar.include", ".*");
        hashMap.put("topic.creation.bar.exclude", str);
        return hashMap;
    }

    @AfterEach
    public void tearDown() {
        if (this.metrics != null) {
            this.metrics.stop();
        }
        Mockito.verifyNoMoreInteractions(new Object[]{this.statusListener});
    }

    private void createWorkerTask() {
        createWorkerTask(TargetState.STARTED, RetryWithToleranceOperatorTest.noopOperator());
    }

    private void createWorkerTaskWithErrorToleration() {
        createWorkerTask(TargetState.STARTED, RetryWithToleranceOperatorTest.allOperator());
    }

    private void createWorkerTask(TargetState targetState) {
        createWorkerTask(targetState, RetryWithToleranceOperatorTest.noopOperator());
    }

    private void createWorkerTask(TargetState targetState, RetryWithToleranceOperator<SourceRecord> retryWithToleranceOperator) {
        createWorkerTask(targetState, this.keyConverter, this.valueConverter, this.headerConverter, retryWithToleranceOperator);
    }

    private void createWorkerTask(TargetState targetState, Converter converter, Converter converter2, HeaderConverter headerConverter, RetryWithToleranceOperator<SourceRecord> retryWithToleranceOperator) {
        this.workerTask = new WorkerSourceTask(this.taskId, this.sourceTask, this.statusListener, targetState, converter, converter2, this.errorHandlingMetrics, headerConverter, this.transformationChain, this.producer, this.admin, TopicCreationGroup.configuredGroups(this.sourceConfig), this.offsetReader, this.offsetWriter, this.offsetStore, this.config, this.clusterConfigState, this.metrics, this.plugins.delegatingLoader(), Time.SYSTEM, retryWithToleranceOperator, this.statusBackingStore, (v0) -> {
            v0.run();
        }, Collections::emptyList);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testStartPaused(boolean z) throws Exception {
        setup(z);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        createWorkerTask(TargetState.PAUSED);
        ((TaskStatus.Listener) Mockito.doAnswer(invocationOnMock -> {
            countDownLatch.countDown();
            return null;
        }).when(this.statusListener)).onPause(this.taskId);
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
        this.workerTask.stop();
        Assertions.assertTrue(this.workerTask.awaitStop(1000L));
        submit.get();
        ((TaskStatus.Listener) Mockito.verify(this.statusListener)).onPause(this.taskId);
        ((TaskStatus.Listener) Mockito.verify(this.statusListener)).onShutdown(this.taskId);
        verifyClose();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testPause(boolean z) throws Exception {
        setup(z);
        createWorkerTask();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        CountDownLatch expectPolls = expectPolls(10, atomicInteger);
        expectTopicCreation("topic");
        expectOffsetFlush();
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        ConcurrencyUtils.awaitLatch(expectPolls, POLL_TIMEOUT_MSG);
        this.workerTask.transitionTo(TargetState.PAUSED);
        int i = atomicInteger.get();
        Thread.sleep(100L);
        Assertions.assertTrue(atomicInteger.get() - i <= 1);
        this.workerTask.stop();
        Assertions.assertTrue(this.workerTask.awaitStop(1000L));
        submit.get();
        verifyCleanStartup();
        verifyTaskGetTopic(atomicInteger.get());
        verifyOffsetFlush(true);
        verifyTopicCreation("topic");
        ((TaskStatus.Listener) Mockito.verify(this.statusListener)).onPause(this.taskId);
        ((TaskStatus.Listener) Mockito.verify(this.statusListener)).onShutdown(this.taskId);
        ((SourceTask) Mockito.verify(this.sourceTask)).stop();
        ((OffsetStorageWriter) Mockito.verify(this.offsetWriter)).offset(PARTITION, OFFSET);
        verifyClose();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testPollsInBackground(boolean z) throws Exception {
        setup(z);
        createWorkerTask();
        CountDownLatch expectPolls = expectPolls(10);
        expectTopicCreation("topic");
        expectOffsetFlush();
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        ConcurrencyUtils.awaitLatch(expectPolls, POLL_TIMEOUT_MSG);
        this.workerTask.stop();
        Assertions.assertTrue(this.workerTask.awaitStop(1000L));
        submit.get();
        assertPollMetrics(10);
        verifyCleanStartup();
        verifyOffsetFlush(true);
        ((OffsetStorageWriter) Mockito.verify(this.offsetWriter)).offset(PARTITION, OFFSET);
        ((TaskStatus.Listener) Mockito.verify(this.statusListener)).onShutdown(this.taskId);
        verifyClose();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testFailureInPoll(boolean z) throws Exception {
        setup(z);
        createWorkerTask();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        RuntimeException runtimeException = new RuntimeException();
        Mockito.when(this.sourceTask.poll()).thenAnswer(invocationOnMock -> {
            countDownLatch.countDown();
            throw runtimeException;
        });
        expectEmptyOffsetFlush();
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        ConcurrencyUtils.awaitLatch(countDownLatch, POLL_TIMEOUT_MSG);
        Assertions.assertTrue(this.workerTask.awaitStop(1000L));
        submit.get();
        assertPollMetrics(0);
        verifyCleanStartup();
        ((TaskStatus.Listener) Mockito.verify(this.statusListener)).onFailure(this.taskId, runtimeException);
        ((SourceTask) Mockito.verify(this.sourceTask)).stop();
        assertShouldSkipCommit();
        verifyOffsetFlush(true);
        verifyClose();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testFailureInPollAfterCancel(boolean z) throws Exception {
        setup(z);
        createWorkerTask();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        RuntimeException runtimeException = new RuntimeException();
        Mockito.when(this.sourceTask.poll()).thenAnswer(invocationOnMock -> {
            countDownLatch.countDown();
            ConcurrencyUtils.awaitLatch(countDownLatch2, "Timeout waiting for main test thread to cancel task.");
            throw runtimeException;
        });
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        ConcurrencyUtils.awaitLatch(countDownLatch, POLL_TIMEOUT_MSG);
        this.workerTask.cancel();
        countDownLatch2.countDown();
        Assertions.assertTrue(this.workerTask.awaitStop(1000L));
        submit.get();
        assertPollMetrics(0);
        verifyCleanStartup();
        ((CloseableOffsetStorageReader) Mockito.verify(this.offsetReader, Mockito.atLeastOnce())).close();
        ((KafkaProducer) Mockito.verify(this.producer)).close(Duration.ZERO);
        ((SourceTask) Mockito.verify(this.sourceTask)).stop();
        ((TopicAdmin) Mockito.verify(this.admin)).close((Duration) ArgumentMatchers.any(Duration.class));
        ((TransformationChain) Mockito.verify(this.transformationChain)).close();
        ((ConnectorOffsetBackingStore) Mockito.verify(this.offsetStore)).stop();
        try {
            ((HeaderConverter) Mockito.verify(this.headerConverter)).close();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testFailureInPollAfterStop(boolean z) throws Exception {
        setup(z);
        createWorkerTask();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        RuntimeException runtimeException = new RuntimeException();
        Mockito.when(this.sourceTask.poll()).thenAnswer(invocationOnMock -> {
            countDownLatch.countDown();
            ConcurrencyUtils.awaitLatch(countDownLatch2, "Timeout waiting for main test thread to stop task");
            throw runtimeException;
        });
        expectOffsetFlush();
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        ConcurrencyUtils.awaitLatch(countDownLatch, POLL_TIMEOUT_MSG);
        this.workerTask.stop();
        countDownLatch2.countDown();
        Assertions.assertTrue(this.workerTask.awaitStop(1000L));
        assertShouldSkipCommit();
        submit.get();
        assertPollMetrics(0);
        verifyCleanStartup();
        ((TaskStatus.Listener) Mockito.verify(this.statusListener)).onShutdown(this.taskId);
        ((SourceTask) Mockito.verify(this.sourceTask)).stop();
        verifyOffsetFlush(true);
        verifyClose();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testPollReturnsNoRecords(boolean z) throws Exception {
        setup(z);
        createWorkerTask();
        CountDownLatch expectEmptyPolls = expectEmptyPolls(new AtomicInteger());
        expectEmptyOffsetFlush();
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        ConcurrencyUtils.awaitLatch(expectEmptyPolls, POLL_TIMEOUT_MSG);
        Assertions.assertTrue(this.workerTask.commitOffsets());
        ((OffsetStorageWriter) Mockito.verify(this.offsetWriter)).beginFlush(ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class));
        this.workerTask.stop();
        Assertions.assertTrue(this.workerTask.awaitStop(1000L));
        ((OffsetStorageWriter) Mockito.verify(this.offsetWriter, Mockito.times(2))).beginFlush(ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class));
        Mockito.verifyNoMoreInteractions(new Object[]{this.offsetWriter});
        submit.get();
        assertPollMetrics(0);
        verifyCleanStartup();
        ((SourceTask) Mockito.verify(this.sourceTask)).stop();
        ((TaskStatus.Listener) Mockito.verify(this.statusListener)).onShutdown(this.taskId);
        verifyClose();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testCommit(boolean z) throws Exception {
        setup(z);
        createWorkerTask();
        CountDownLatch expectPolls = expectPolls(1);
        expectTopicCreation("topic");
        Iterator it = Arrays.asList(true, false).iterator();
        it.getClass();
        expectBeginFlush(it::next);
        expectOffsetFlush(true, true);
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        ConcurrencyUtils.awaitLatch(expectPolls, POLL_TIMEOUT_MSG);
        Assertions.assertTrue(this.workerTask.commitOffsets());
        this.workerTask.stop();
        Assertions.assertTrue(this.workerTask.awaitStop(1000L));
        submit.get();
        assertPollMetrics(1);
        verifyCleanStartup();
        verifyTopicCreation("topic");
        ((OffsetStorageWriter) Mockito.verify(this.offsetWriter, Mockito.times(2))).beginFlush(ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class));
        ((OffsetStorageWriter) Mockito.verify(this.offsetWriter, Mockito.atLeastOnce())).offset(PARTITION, OFFSET);
        ((SourceTask) Mockito.verify(this.sourceTask)).stop();
        ((TaskStatus.Listener) Mockito.verify(this.statusListener)).onShutdown(this.taskId);
        verifyOffsetFlush(true, 2);
        verifyClose();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testCommitFailure(boolean z) throws Exception {
        setup(z);
        createWorkerTask();
        CountDownLatch expectPolls = expectPolls(1);
        expectBeginFlush();
        expectOffsetFlush(true, false);
        expectTopicCreation("topic");
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        ConcurrencyUtils.awaitLatch(expectPolls, POLL_TIMEOUT_MSG);
        Assertions.assertTrue(this.workerTask.commitOffsets());
        this.workerTask.stop();
        Assertions.assertTrue(this.workerTask.awaitStop(1000L));
        submit.get();
        assertPollMetrics(1);
        verifyCleanStartup();
        ((SourceTask) Mockito.verify(this.sourceTask)).stop();
        ((OffsetStorageWriter) Mockito.verify(this.offsetWriter, Mockito.atLeastOnce())).offset(PARTITION, OFFSET);
        ((TaskStatus.Listener) Mockito.verify(this.statusListener)).onShutdown(this.taskId);
        verifyOffsetFlush(true);
        verifyOffsetFlush(false);
        verifyClose();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testSendRecordsRetries(boolean z) {
        setup(z);
        createWorkerTask();
        SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        SourceRecord sourceRecord3 = new SourceRecord(PARTITION, OFFSET, "topic", 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        expectTopicCreation("topic");
        expectPreliminaryCalls();
        expectTaskGetTopic();
        Mockito.when(this.producer.send((ProducerRecord) ArgumentMatchers.any(ProducerRecord.class), (Callback) ArgumentMatchers.any(Callback.class))).thenAnswer(producerSendAnswer(true)).thenThrow(new Throwable[]{new TimeoutException("retriable sync failure")}).thenAnswer(producerSendAnswer(true)).thenAnswer(producerSendAnswer(true));
        this.workerTask.toSend = Arrays.asList(sourceRecord, sourceRecord2, sourceRecord3);
        this.workerTask.sendRecords();
        Assertions.assertEquals(Arrays.asList(sourceRecord2, sourceRecord3), this.workerTask.toSend);
        this.workerTask.sendRecords();
        Assertions.assertNull(this.workerTask.toSend);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testSendRecordsProducerCallbackFail(boolean z) {
        setup(z);
        createWorkerTask();
        SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        expectTopicCreation("topic");
        expectSendRecordProducerCallbackFail();
        this.workerTask.toSend = Arrays.asList(sourceRecord, sourceRecord2);
        Assertions.assertThrows(ConnectException.class, () -> {
            this.workerTask.sendRecords();
        });
        ((TransformationChain) Mockito.verify(this.transformationChain, Mockito.times(2))).apply((ProcessingContext) ArgumentMatchers.any(), (ConnectRecord) ArgumentMatchers.any(SourceRecord.class));
        ((Converter) Mockito.verify(this.keyConverter, Mockito.times(2))).fromConnectData(ArgumentMatchers.anyString(), (Headers) ArgumentMatchers.any(Headers.class), (Schema) ArgumentMatchers.eq(KEY_SCHEMA), ArgumentMatchers.eq(KEY));
        ((Converter) Mockito.verify(this.valueConverter, Mockito.times(2))).fromConnectData(ArgumentMatchers.anyString(), (Headers) ArgumentMatchers.any(Headers.class), (Schema) ArgumentMatchers.eq(RECORD_SCHEMA), ArgumentMatchers.eq(RECORD));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testSendRecordsProducerSendFailsImmediately(boolean z) {
        setup(z);
        createWorkerTask();
        SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        expectPreliminaryCalls();
        expectTopicCreation("topic");
        Mockito.when(this.producer.send((ProducerRecord) ArgumentMatchers.any(ProducerRecord.class), (Callback) ArgumentMatchers.any(Callback.class))).thenThrow(new Throwable[]{new KafkaException("Producer closed while send in progress", new InvalidTopicException("topic"))});
        this.workerTask.toSend = Arrays.asList(sourceRecord, sourceRecord2);
        Assertions.assertThrows(ConnectException.class, () -> {
            this.workerTask.sendRecords();
        });
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testSendRecordsTaskCommitRecordFail(boolean z) throws Exception {
        setup(z);
        createWorkerTask();
        SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        SourceRecord sourceRecord3 = new SourceRecord(PARTITION, OFFSET, "topic", 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        expectTopicCreation("topic");
        expectSendRecord();
        ((SourceTask) Mockito.doNothing().doThrow(new Throwable[]{new RuntimeException("Error committing record in source task")}).doNothing().when(this.sourceTask)).commitRecord((SourceRecord) ArgumentMatchers.any(SourceRecord.class), (RecordMetadata) ArgumentMatchers.any(RecordMetadata.class));
        this.workerTask.toSend = Arrays.asList(sourceRecord, sourceRecord2, sourceRecord3);
        this.workerTask.sendRecords();
        Assertions.assertNull(this.workerTask.toSend);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testSourceTaskIgnoresProducerException(boolean z) throws Exception {
        setup(z);
        createWorkerTaskWithErrorToleration();
        expectTopicCreation("topic");
        Map singletonMap = Collections.singletonMap("key", 13);
        SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        SourceRecord sourceRecord2 = new SourceRecord(PARTITION, singletonMap, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        expectOffsetFlush();
        expectPreliminaryCalls();
        Mockito.when(this.producer.send((ProducerRecord) ArgumentMatchers.any(ProducerRecord.class), (Callback) ArgumentMatchers.any(Callback.class))).thenAnswer(producerSendAnswer(true)).thenAnswer(producerSendAnswer(false));
        this.workerTask.toSend = Arrays.asList(sourceRecord, sourceRecord2);
        this.workerTask.sendRecords();
        this.workerTask.updateCommittableOffsets();
        this.workerTask.commitOffsets();
        ((OffsetStorageWriter) Mockito.verify(this.offsetWriter)).offset(PARTITION, singletonMap);
        ((SourceTask) Mockito.verify(this.sourceTask)).commitRecord((SourceRecord) ArgumentMatchers.any(SourceRecord.class), (RecordMetadata) ArgumentMatchers.isNull());
        Assertions.assertEquals(0, this.workerTask.submittedRecords.records.size());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testSlowTaskStart(boolean z) throws Exception {
        setup(z);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        createWorkerTask();
        ((SourceTask) Mockito.doAnswer(invocationOnMock -> {
            countDownLatch.countDown();
            ConcurrencyUtils.awaitLatch(countDownLatch2, "Timeout waiting for main test thread to allow task startup to complete");
            return null;
        }).when(this.sourceTask)).start(TASK_PROPS);
        expectOffsetFlush();
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        ConcurrencyUtils.awaitLatch(countDownLatch, "Timeout waiting for task to begin startup");
        this.workerTask.stop();
        countDownLatch2.countDown();
        Assertions.assertTrue(this.workerTask.awaitStop(1000L));
        submit.get();
        ((ConnectorOffsetBackingStore) Mockito.verify(this.offsetStore)).start();
        ((SourceTask) Mockito.verify(this.sourceTask)).initialize((SourceTaskContext) ArgumentMatchers.any(SourceTaskContext.class));
        ((SourceTask) Mockito.verify(this.sourceTask)).start(TASK_PROPS);
        ((TaskStatus.Listener) Mockito.verify(this.statusListener)).onStartup(this.taskId);
        ((TaskStatus.Listener) Mockito.verify(this.statusListener)).onShutdown(this.taskId);
        ((SourceTask) Mockito.verify(this.sourceTask)).stop();
        verifyClose();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testCancel(boolean z) {
        setup(z);
        createWorkerTask();
        this.workerTask.cancel();
        ((CloseableOffsetStorageReader) Mockito.verify(this.offsetReader)).close();
        ((KafkaProducer) Mockito.verify(this.producer)).close(Duration.ZERO);
    }

    private TopicAdmin.TopicCreationResponse createdTopic(String str) {
        return new TopicAdmin.TopicCreationResponse(Collections.singleton(str), Collections.emptySet());
    }

    private void expectPreliminaryCalls() {
        expectConvertHeadersAndKeyValue("topic", emptyHeaders());
        expectApplyTransformationChain();
    }

    private CountDownLatch expectEmptyPolls(AtomicInteger atomicInteger) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Mockito.when(this.sourceTask.poll()).thenAnswer(invocationOnMock -> {
            atomicInteger.incrementAndGet();
            countDownLatch.countDown();
            Thread.sleep(10L);
            return Collections.emptyList();
        });
        return countDownLatch;
    }

    private CountDownLatch expectPolls(int i, AtomicInteger atomicInteger) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(i);
        ((SourceTask) Mockito.doAnswer(invocationOnMock -> {
            atomicInteger.incrementAndGet();
            countDownLatch.countDown();
            Thread.sleep(10L);
            return RECORDS;
        }).when(this.sourceTask)).poll();
        expectSendRecord();
        return countDownLatch;
    }

    private CountDownLatch expectPolls(int i) throws InterruptedException {
        return expectPolls(i, new AtomicInteger());
    }

    private void expectSendRecord() {
        expectSendRecordTaskCommitRecordSucceed();
    }

    private void expectSendRecordProducerCallbackFail() {
        expectSendRecord("topic", false, emptyHeaders());
    }

    private void expectSendRecordTaskCommitRecordSucceed() {
        expectSendRecord("topic", true, emptyHeaders());
    }

    private void expectSendRecord(String str, boolean z, Headers headers) {
        expectConvertHeadersAndKeyValue(str, headers);
        expectApplyTransformationChain();
        if (z) {
            expectTaskGetTopic();
        }
        ((KafkaProducer) Mockito.doAnswer(producerSendAnswer(z)).when(this.producer)).send((ProducerRecord) ArgumentMatchers.any(ProducerRecord.class), (Callback) ArgumentMatchers.any(Callback.class));
    }

    private Answer<Future<RecordMetadata>> producerSendAnswer(boolean z) {
        return invocationOnMock -> {
            Callback callback = (Callback) invocationOnMock.getArgument(1);
            if (z) {
                callback.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0L, 0, 0L, 0, 0), (Exception) null);
                return null;
            }
            callback.onCompletion((RecordMetadata) null, new TopicAuthorizationException("foo"));
            return null;
        };
    }

    private void expectConvertHeadersAndKeyValue(String str, Headers headers) {
        if (headers.iterator().hasNext()) {
            Mockito.when(this.headerConverter.fromConnectHeader(ArgumentMatchers.anyString(), ArgumentMatchers.anyString(), (Schema) ArgumentMatchers.eq(Schema.STRING_SCHEMA), ArgumentMatchers.anyString())).thenAnswer(invocationOnMock -> {
                return ((String) invocationOnMock.getArgument(3, String.class)).getBytes(StandardCharsets.UTF_8);
            });
        }
        Mockito.when(this.keyConverter.fromConnectData((String) ArgumentMatchers.eq(str), (Headers) ArgumentMatchers.any(Headers.class), (Schema) ArgumentMatchers.eq(KEY_SCHEMA), ArgumentMatchers.eq(KEY))).thenReturn(SERIALIZED_KEY);
        Mockito.when(this.valueConverter.fromConnectData((String) ArgumentMatchers.eq(str), (Headers) ArgumentMatchers.any(Headers.class), (Schema) ArgumentMatchers.eq(RECORD_SCHEMA), ArgumentMatchers.eq(RECORD))).thenReturn(SERIALIZED_RECORD);
    }

    private void expectApplyTransformationChain() {
        Mockito.when(this.transformationChain.apply((ProcessingContext) ArgumentMatchers.any(), (ConnectRecord) ArgumentMatchers.any(SourceRecord.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
    }

    private void expectTaskGetTopic() {
        Mockito.when(this.statusBackingStore.getTopic(ArgumentMatchers.anyString(), ArgumentMatchers.anyString())).thenAnswer(invocationOnMock -> {
            return new TopicStatus((String) invocationOnMock.getArgument(1, String.class), new ConnectorTaskId((String) invocationOnMock.getArgument(0, String.class), 0), Time.SYSTEM.milliseconds());
        });
    }

    private void verifyTaskGetTopic(int i) {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(String.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(String.class);
        ((StatusBackingStore) Mockito.verify(this.statusBackingStore, Mockito.times(i))).getTopic((String) forClass.capture(), (String) forClass2.capture());
        Assertions.assertEquals("job", forClass.getValue());
        Assertions.assertEquals("topic", forClass2.getValue());
    }

    private void expectBeginFlush() throws Exception {
        expectBeginFlush(() -> {
            return true;
        });
    }

    private void expectBeginFlush(Supplier<Boolean> supplier) throws Exception {
        Mockito.when(Boolean.valueOf(this.offsetWriter.beginFlush(ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class)))).thenAnswer(invocationOnMock -> {
            return (Boolean) supplier.get();
        });
    }

    private void expectOffsetFlush() throws Exception {
        expectBeginFlush();
        expectOffsetFlush(true);
    }

    private void expectOffsetFlush(Boolean... boolArr) throws Exception {
        Future future = (Future) Mockito.mock(Future.class);
        Mockito.when(this.offsetWriter.doFlush((org.apache.kafka.connect.util.Callback) ArgumentMatchers.any(org.apache.kafka.connect.util.Callback.class))).thenReturn(future);
        LinkedList linkedList = new LinkedList(Arrays.asList(boolArr));
        ((Future) Mockito.doAnswer(invocationOnMock -> {
            if (((Boolean) linkedList.pop()).booleanValue()) {
                return null;
            }
            throw new java.util.concurrent.TimeoutException();
        }).when(future)).get(ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class));
    }

    private void expectEmptyOffsetFlush() throws Exception {
        expectBeginFlush(() -> {
            return false;
        });
    }

    private void verifyOffsetFlush(boolean z) throws Exception {
        verifyOffsetFlush(z, 1);
    }

    private void verifyOffsetFlush(boolean z, int i) throws Exception {
        if (z) {
            ((SourceTask) Mockito.verify(this.sourceTask, Mockito.atLeast(i))).commit();
        } else {
            ((OffsetStorageWriter) Mockito.verify(this.offsetWriter, Mockito.atLeast(i))).cancelFlush();
        }
    }

    private void assertPollMetrics(int i) {
        ConnectMetrics.MetricGroup metricGroup = this.workerTask.sourceTaskMetricsGroup().metricGroup();
        ConnectMetrics.MetricGroup metricGroup2 = this.workerTask.taskMetricsGroup().metricGroup();
        double currentMetricValueAsDouble = this.metrics.currentMetricValueAsDouble(metricGroup, "source-record-poll-rate");
        double currentMetricValueAsDouble2 = this.metrics.currentMetricValueAsDouble(metricGroup, "source-record-poll-total");
        if (i > 0) {
            Assertions.assertEquals(RECORDS.size(), this.metrics.currentMetricValueAsDouble(metricGroup2, "batch-size-max"), 1.0E-6d);
            Assertions.assertEquals(RECORDS.size(), this.metrics.currentMetricValueAsDouble(metricGroup2, "batch-size-avg"), 1.0E-6d);
            Assertions.assertTrue(currentMetricValueAsDouble > 0.0d);
        } else {
            Assertions.assertEquals(0.0d, currentMetricValueAsDouble, 0.0d);
        }
        Assertions.assertTrue(currentMetricValueAsDouble2 >= ((double) i));
        double currentMetricValueAsDouble3 = this.metrics.currentMetricValueAsDouble(metricGroup, "source-record-write-rate");
        double currentMetricValueAsDouble4 = this.metrics.currentMetricValueAsDouble(metricGroup, "source-record-write-total");
        if (i > 0) {
            Assertions.assertTrue(currentMetricValueAsDouble3 > 0.0d);
        } else {
            Assertions.assertEquals(0.0d, currentMetricValueAsDouble3, 0.0d);
        }
        Assertions.assertTrue(currentMetricValueAsDouble4 >= ((double) i));
        double currentMetricValueAsDouble5 = this.metrics.currentMetricValueAsDouble(metricGroup, "poll-batch-max-time-ms");
        double currentMetricValueAsDouble6 = this.metrics.currentMetricValueAsDouble(metricGroup, "poll-batch-avg-time-ms");
        if (i > 0) {
            Assertions.assertTrue(currentMetricValueAsDouble5 >= 0.0d);
        }
        Assertions.assertTrue(Double.isNaN(currentMetricValueAsDouble6) || currentMetricValueAsDouble6 > 0.0d);
        double currentMetricValueAsDouble7 = this.metrics.currentMetricValueAsDouble(metricGroup, "source-record-active-count");
        double currentMetricValueAsDouble8 = this.metrics.currentMetricValueAsDouble(metricGroup, "source-record-active-count-max");
        Assertions.assertEquals(0.0d, currentMetricValueAsDouble7, 1.0E-6d);
        if (i > 0) {
            Assertions.assertEquals(RECORDS.size(), currentMetricValueAsDouble8, 1.0E-6d);
        }
    }

    private RecordHeaders emptyHeaders() {
        return new RecordHeaders();
    }

    private void verifyCleanStartup() {
        ((ConnectorOffsetBackingStore) Mockito.verify(this.offsetStore)).start();
        ((SourceTask) Mockito.verify(this.sourceTask)).initialize((SourceTaskContext) ArgumentMatchers.any(SourceTaskContext.class));
        ((SourceTask) Mockito.verify(this.sourceTask)).start(TASK_PROPS);
        ((TaskStatus.Listener) Mockito.verify(this.statusListener)).onStartup(this.taskId);
    }

    private void verifyClose() {
        ((KafkaProducer) Mockito.verify(this.producer)).close((Duration) ArgumentMatchers.any(Duration.class));
        ((TopicAdmin) Mockito.verify(this.admin)).close((Duration) ArgumentMatchers.any(Duration.class));
        ((TransformationChain) Mockito.verify(this.transformationChain)).close();
        ((CloseableOffsetStorageReader) Mockito.verify(this.offsetReader)).close();
        ((ConnectorOffsetBackingStore) Mockito.verify(this.offsetStore)).stop();
        try {
            ((HeaderConverter) Mockito.verify(this.headerConverter)).close();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void expectTopicCreation(String str) {
        if (this.config.topicCreationEnable()) {
            Mockito.when(this.admin.describeTopics(new String[]{str})).thenReturn(Collections.emptyMap());
            Mockito.when(this.admin.createOrFindTopics(new NewTopic[]{(NewTopic) ArgumentMatchers.any(NewTopic.class)})).thenReturn(createdTopic(str));
        }
    }

    private void verifyTopicCreation(String... strArr) {
        if (this.config.topicCreationEnable()) {
            ArgumentCaptor forClass = ArgumentCaptor.forClass(NewTopic.class);
            ((TopicAdmin) Mockito.verify(this.admin)).createOrFindTopics(new NewTopic[]{(NewTopic) forClass.capture()});
            Assertions.assertArrayEquals(strArr, forClass.getAllValues().stream().map((v0) -> {
                return v0.name();
            }).toArray(i -> {
                return new String[i];
            }));
        }
    }

    private void assertShouldSkipCommit() {
        Assertions.assertFalse(this.workerTask.shouldCommitOffsets());
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(SourceTaskOffsetCommitter.class);
        Throwable th = null;
        try {
            LogCaptureAppender createAndRegister2 = LogCaptureAppender.createAndRegister(WorkerSourceTask.class);
            Throwable th2 = null;
            try {
                createAndRegister.setClassLogger(SourceTaskOffsetCommitter.class, Level.TRACE);
                createAndRegister2.setClassLogger(WorkerSourceTask.class, Level.TRACE);
                SourceTaskOffsetCommitter.commit(this.workerTask);
                Assertions.assertEquals(Collections.emptyList(), createAndRegister2.getMessages());
                List messages = createAndRegister.getMessages();
                Assertions.assertEquals(1, messages.size());
                Assertions.assertTrue(((String) messages.get(0)).contains("Skipping offset commit"));
                if (createAndRegister2 != null) {
                    if (0 != 0) {
                        try {
                            createAndRegister2.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        createAndRegister2.close();
                    }
                }
                if (createAndRegister != null) {
                    if (0 == 0) {
                        createAndRegister.close();
                        return;
                    }
                    try {
                        createAndRegister.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                }
            } catch (Throwable th5) {
                if (createAndRegister2 != null) {
                    if (0 != 0) {
                        try {
                            createAndRegister2.close();
                        } catch (Throwable th6) {
                            th2.addSuppressed(th6);
                        }
                    } else {
                        createAndRegister2.close();
                    }
                }
                throw th5;
            }
        } catch (Throwable th7) {
            if (createAndRegister != null) {
                if (0 != 0) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th8) {
                        th.addSuppressed(th8);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th7;
        }
    }

    static {
        TASK_PROPS.put("task.class", TestSourceTask.class.getName());
        TASK_CONFIG = new TaskConfig(TASK_PROPS);
        RECORDS = Collections.singletonList(new SourceRecord(PARTITION, OFFSET, "topic", (Integer) null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD));
    }
}
