package org.apache.kafka.connect.runtime.errors;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.ErrorHandlingTaskTest;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.isolation.PluginsTest;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.mockito.stubbing.OngoingStubbing;

@ExtendWith({MockitoExtension.class})
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
/* loaded from: input_file:org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.class */
public class RetryWithToleranceOperatorTest {
    private static final Map<String, String> PROPERTIES = new HashMap<String, String>() { // from class: org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest.1
        {
            put("metrics.num.samples", Objects.toString(2));
            put("metrics.sample.window.ms", Objects.toString(3000));
            put("metrics.recording.level", Sensor.RecordingLevel.INFO.toString());
            put("key.converter", PluginsTest.TestConverter.class.getName());
            put("value.converter", PluginsTest.TestConverter.class.getName());
        }
    };

    @Mock
    private Operation<String> mockOperation;

    @Mock
    private ConsumerRecord<byte[], byte[]> consumerRecord;

    @Mock
    ErrorHandlingMetrics errorHandlingMetrics;

    @Mock
    Plugins plugins;

    public static <T> RetryWithToleranceOperator<T> noopOperator() {
        return genericOperator(0, ToleranceType.NONE, new ErrorHandlingMetrics(new ConnectorTaskId("noop-connector", -1), new ConnectMetrics("noop-worker", new PluginsTest.TestableWorkerConfig(PROPERTIES), Time.SYSTEM, "test-cluster")));
    }

    public static <T> RetryWithToleranceOperator<T> allOperator() {
        return genericOperator(0, ToleranceType.ALL, new ErrorHandlingMetrics(new ConnectorTaskId("errors-all-tolerate-connector", -1), new ConnectMetrics("errors-all-tolerate-worker", new PluginsTest.TestableWorkerConfig(PROPERTIES), Time.SYSTEM, "test-cluster")));
    }

    private static <T> RetryWithToleranceOperator<T> genericOperator(int i, ToleranceType toleranceType, ErrorHandlingMetrics errorHandlingMetrics) {
        return new RetryWithToleranceOperator<>(i, ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, toleranceType, Time.SYSTEM, errorHandlingMetrics);
    }

    @Test
    public void testExecuteFailed() {
        genericOperator(0, ToleranceType.ALL, this.errorHandlingMetrics).executeFailed(new ProcessingContext(this.consumerRecord), Stage.TASK_PUT, SinkTask.class, new Throwable());
    }

    @Test
    public void testExecuteFailedNoTolerance() {
        RetryWithToleranceOperator genericOperator = genericOperator(0, ToleranceType.NONE, this.errorHandlingMetrics);
        ProcessingContext processingContext = new ProcessingContext(this.consumerRecord);
        Assertions.assertThrows(ConnectException.class, () -> {
            genericOperator.executeFailed(processingContext, Stage.TASK_PUT, SinkTask.class, new Throwable());
        });
    }

    @Test
    public void testHandleExceptionInTransformations() {
        testHandleExceptionInStage(Stage.TRANSFORMATION, new Exception());
    }

    @Test
    public void testHandleExceptionInHeaderConverter() {
        testHandleExceptionInStage(Stage.HEADER_CONVERTER, new Exception());
    }

    @Test
    public void testHandleExceptionInValueConverter() {
        testHandleExceptionInStage(Stage.VALUE_CONVERTER, new Exception());
    }

    @Test
    public void testHandleExceptionInKeyConverter() {
        testHandleExceptionInStage(Stage.KEY_CONVERTER, new Exception());
    }

    @Test
    public void testHandleExceptionInTaskPut() {
        testHandleExceptionInStage(Stage.TASK_PUT, new RetriableException("Test"));
    }

    @Test
    public void testHandleExceptionInTaskPoll() {
        testHandleExceptionInStage(Stage.TASK_POLL, new RetriableException("Test"));
    }

    @Test
    public void testThrowExceptionInTaskPut() {
        Assertions.assertThrows(ConnectException.class, () -> {
            testHandleExceptionInStage(Stage.TASK_PUT, new Exception());
        });
    }

    @Test
    public void testThrowExceptionInTaskPoll() {
        Assertions.assertThrows(ConnectException.class, () -> {
            testHandleExceptionInStage(Stage.TASK_POLL, new Exception());
        });
    }

    @Test
    public void testThrowExceptionInKafkaConsume() {
        Assertions.assertThrows(ConnectException.class, () -> {
            testHandleExceptionInStage(Stage.KAFKA_CONSUME, new Exception());
        });
    }

    @Test
    public void testThrowExceptionInKafkaProduce() {
        Assertions.assertThrows(ConnectException.class, () -> {
            testHandleExceptionInStage(Stage.KAFKA_PRODUCE, new Exception());
        });
    }

    private void testHandleExceptionInStage(Stage stage, Exception exc) {
        RetryWithToleranceOperator retryWithToleranceOperator = setupExecutor();
        ProcessingContext processingContext = new ProcessingContext(this.consumerRecord);
        retryWithToleranceOperator.execute(processingContext, () -> {
            throw exc;
        }, stage, RetryWithToleranceOperator.class);
        Assertions.assertTrue(processingContext.failed());
    }

    private <T> RetryWithToleranceOperator<T> setupExecutor() {
        return genericOperator(0, ToleranceType.ALL, this.errorHandlingMetrics);
    }

    @Test
    public void testExecAndHandleRetriableErrorOnce() throws Exception {
        execAndHandleRetriableError(6000L, 1, Collections.singletonList(300L), new RetriableException("Test"), true);
    }

    @Test
    public void testExecAndHandleRetriableErrorThrice() throws Exception {
        execAndHandleRetriableError(6000L, 3, Arrays.asList(300L, 600L, 1200L), new RetriableException("Test"), true);
    }

    @Test
    public void testExecAndHandleRetriableErrorWithInfiniteRetries() throws Exception {
        execAndHandleRetriableError(-1L, 8, Arrays.asList(300L, 600L, 1200L, 2400L, 4800L, 9600L, 19200L, 38400L), new RetriableException("Test"), true);
    }

    @Test
    public void testExecAndHandleRetriableErrorWithMaxRetriesExceeded() throws Exception {
        execAndHandleRetriableError(6000L, 6, Arrays.asList(300L, 600L, 1200L, 2400L, 1500L), new RetriableException("Test"), false);
    }

    public void execAndHandleRetriableError(long j, int i, List<Long> list, Exception exc, boolean z) throws Exception {
        MockTime mockTime = new MockTime(0L, 0L, 0L);
        CountDownLatch countDownLatch = (CountDownLatch) Mockito.mock(CountDownLatch.class);
        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(j, ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, ToleranceType.ALL, mockTime, this.errorHandlingMetrics, countDownLatch);
        OngoingStubbing when = Mockito.when(this.mockOperation.call());
        for (int i2 = 0; i2 < i; i2++) {
            when = when.thenThrow(new Throwable[]{exc});
        }
        if (z) {
            when.thenReturn("Success");
        }
        for (Long l : list) {
            Mockito.when(Boolean.valueOf(countDownLatch.await(l.longValue(), TimeUnit.MILLISECONDS))).thenAnswer(invocationOnMock -> {
                mockTime.sleep(l.longValue());
                return false;
            });
        }
        ProcessingContext processingContext = new ProcessingContext(this.consumerRecord);
        String str = (String) retryWithToleranceOperator.execAndHandleError(processingContext, this.mockOperation, Exception.class);
        if (z) {
            Assertions.assertFalse(processingContext.failed());
            Assertions.assertEquals("Success", str);
        } else {
            Assertions.assertTrue(processingContext.failed());
        }
        Mockito.verifyNoMoreInteractions(new Object[]{countDownLatch});
        ((Operation) Mockito.verify(this.mockOperation, Mockito.times(z ? i + 1 : i))).call();
    }

    @Test
    public void testExecAndHandleNonRetriableError() throws Exception {
        MockTime mockTime = new MockTime(0L, 0L, 0L);
        CountDownLatch countDownLatch = (CountDownLatch) Mockito.mock(CountDownLatch.class);
        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(6000L, ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, ToleranceType.ALL, mockTime, this.errorHandlingMetrics, countDownLatch);
        Mockito.when(this.mockOperation.call()).thenThrow(new Throwable[]{new Exception("Test")});
        ProcessingContext processingContext = new ProcessingContext(this.consumerRecord);
        String str = (String) retryWithToleranceOperator.execAndHandleError(processingContext, this.mockOperation, Exception.class);
        Assertions.assertTrue(processingContext.failed());
        Assertions.assertNull(str);
        ((Operation) Mockito.verify(this.mockOperation)).call();
        ((CountDownLatch) Mockito.verify(countDownLatch, Mockito.never())).await(ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any());
    }

    @Test
    public void testExitLatch() throws Exception {
        MockTime mockTime = new MockTime(0L, 0L, 0L);
        CountDownLatch countDownLatch = (CountDownLatch) Mockito.mock(CountDownLatch.class);
        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(-1L, ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, ToleranceType.ALL, mockTime, this.errorHandlingMetrics, countDownLatch);
        Mockito.when(this.mockOperation.call()).thenThrow(new Throwable[]{new RetriableException("test")});
        Mockito.when(Boolean.valueOf(countDownLatch.await(300L, TimeUnit.MILLISECONDS))).thenAnswer(invocationOnMock -> {
            mockTime.sleep(300L);
            return false;
        });
        Mockito.when(Boolean.valueOf(countDownLatch.await(600L, TimeUnit.MILLISECONDS))).thenAnswer(invocationOnMock2 -> {
            mockTime.sleep(600L);
            return false;
        });
        Mockito.when(Boolean.valueOf(countDownLatch.await(1200L, TimeUnit.MILLISECONDS))).thenAnswer(invocationOnMock3 -> {
            mockTime.sleep(1200L);
            return false;
        });
        Mockito.when(Boolean.valueOf(countDownLatch.await(2400L, TimeUnit.MILLISECONDS))).thenAnswer(invocationOnMock4 -> {
            mockTime.sleep(2400L);
            retryWithToleranceOperator.triggerStop();
            return false;
        });
        ProcessingContext processingContext = new ProcessingContext(this.consumerRecord);
        retryWithToleranceOperator.execAndHandleError(processingContext, this.mockOperation, Exception.class);
        Assertions.assertTrue(processingContext.failed());
        Assertions.assertEquals(4500L, mockTime.milliseconds());
        ((CountDownLatch) Mockito.verify(countDownLatch)).countDown();
        Mockito.verifyNoMoreInteractions(new Object[]{countDownLatch});
    }

    @Test
    public void testBackoffLimit() throws Exception {
        MockTime mockTime = new MockTime(0L, 0L, 0L);
        CountDownLatch countDownLatch = (CountDownLatch) Mockito.mock(CountDownLatch.class);
        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(5L, ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS, ToleranceType.NONE, mockTime, this.errorHandlingMetrics, countDownLatch);
        Mockito.when(Boolean.valueOf(countDownLatch.await(300L, TimeUnit.MILLISECONDS))).thenAnswer(invocationOnMock -> {
            mockTime.sleep(300L);
            return false;
        });
        Mockito.when(Boolean.valueOf(countDownLatch.await(600L, TimeUnit.MILLISECONDS))).thenAnswer(invocationOnMock2 -> {
            mockTime.sleep(600L);
            return false;
        });
        Mockito.when(Boolean.valueOf(countDownLatch.await(1200L, TimeUnit.MILLISECONDS))).thenAnswer(invocationOnMock3 -> {
            mockTime.sleep(1200L);
            return false;
        });
        Mockito.when(Boolean.valueOf(countDownLatch.await(2400L, TimeUnit.MILLISECONDS))).thenAnswer(invocationOnMock4 -> {
            mockTime.sleep(2400L);
            return false;
        });
        Mockito.when(Boolean.valueOf(countDownLatch.await(500L, TimeUnit.MILLISECONDS))).thenAnswer(invocationOnMock5 -> {
            mockTime.sleep(500L);
            return false;
        });
        Mockito.when(Boolean.valueOf(countDownLatch.await(0L, TimeUnit.MILLISECONDS))).thenReturn(false);
        retryWithToleranceOperator.backoff(1, ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS);
        retryWithToleranceOperator.backoff(2, ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS);
        retryWithToleranceOperator.backoff(3, ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS);
        retryWithToleranceOperator.backoff(4, ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS);
        retryWithToleranceOperator.backoff(5, ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS);
        mockTime.sleep(1L);
        retryWithToleranceOperator.backoff(6, ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS);
        Mockito.verifyNoMoreInteractions(new Object[]{countDownLatch});
    }

    @Test
    public void testToleranceLimit() {
        RetryWithToleranceOperator genericOperator = genericOperator(0, ToleranceType.NONE, this.errorHandlingMetrics);
        genericOperator.markAsFailed();
        Assertions.assertFalse(genericOperator.withinToleranceLimits(), "should not tolerate any errors");
        RetryWithToleranceOperator genericOperator2 = genericOperator(0, ToleranceType.ALL, this.errorHandlingMetrics);
        genericOperator2.markAsFailed();
        genericOperator2.markAsFailed();
        Assertions.assertTrue(genericOperator2.withinToleranceLimits(), "should tolerate all errors");
        Assertions.assertTrue(genericOperator(0, ToleranceType.NONE, this.errorHandlingMetrics).withinToleranceLimits(), "no tolerance is within limits if no failures");
    }

    @Test
    public void testDefaultConfigs() {
        ConnectorConfig config = config(Collections.emptyMap());
        Assertions.assertEquals(config.errorRetryTimeout(), 0L);
        Assertions.assertEquals(config.errorMaxDelayInMillis(), ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS);
        Assertions.assertEquals(config.errorToleranceType(), ConnectorConfig.ERRORS_TOLERANCE_DEFAULT);
    }

    ConnectorConfig config(Map<String, String> map) {
        HashMap hashMap = new HashMap();
        hashMap.put("name", "test");
        hashMap.put("connector.class", SinkConnector.class.getName());
        hashMap.putAll(map);
        return new ConnectorConfig(this.plugins, hashMap);
    }

    @Test
    public void testSetConfigs() {
        Assertions.assertEquals(config(Collections.singletonMap("errors.retry.timeout", "100")).errorRetryTimeout(), 100L);
        Assertions.assertEquals(config(Collections.singletonMap("errors.retry.delay.max.ms", "100")).errorMaxDelayInMillis(), 100L);
        Assertions.assertEquals(config(Collections.singletonMap("errors.tolerance", "none")).errorToleranceType(), ToleranceType.NONE);
    }

    @Test
    public void testReportWithSingleReporter() {
        testReport(1);
    }

    @Test
    public void testReportWithMultipleReporters() {
        testReport(2);
    }

    private void testReport(int i) {
        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(-1L, ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, ToleranceType.ALL, new MockTime(0L, 0L, 0L), this.errorHandlingMetrics, (CountDownLatch) Mockito.mock(CountDownLatch.class));
        ConsumerRecord consumerRecord = new ConsumerRecord("t", 0, 0L, (Object) null, (Object) null);
        List list = (List) IntStream.range(0, i).mapToObj(i2 -> {
            return new CompletableFuture();
        }).collect(Collectors.toList());
        retryWithToleranceOperator.reporters((List) IntStream.range(0, i).mapToObj(i3 -> {
            return processingContext -> {
                return (CompletableFuture) list.get(i3);
            };
        }).collect(Collectors.toList()));
        Future report = retryWithToleranceOperator.report(new ProcessingContext(consumerRecord));
        list.forEach(completableFuture -> {
            Assertions.assertFalse(report.isDone());
            completableFuture.complete(new RecordMetadata(new TopicPartition("t", 0), 0L, 0, 0L, 0, 0));
        });
        Assertions.assertTrue(report.isDone());
    }

    @Test
    public void testCloseErrorReporters() {
        ErrorReporter errorReporter = (ErrorReporter) Mockito.mock(ErrorReporter.class);
        ErrorReporter errorReporter2 = (ErrorReporter) Mockito.mock(ErrorReporter.class);
        RetryWithToleranceOperator allOperator = allOperator();
        allOperator.reporters(Arrays.asList(errorReporter, errorReporter2));
        allOperator.close();
        ((ErrorReporter) Mockito.verify(errorReporter)).close();
        ((ErrorReporter) Mockito.verify(errorReporter2)).close();
    }

    @Test
    public void testCloseErrorReportersExceptionPropagation() {
        ErrorReporter errorReporter = (ErrorReporter) Mockito.mock(ErrorReporter.class);
        ErrorReporter errorReporter2 = (ErrorReporter) Mockito.mock(ErrorReporter.class);
        RetryWithToleranceOperator allOperator = allOperator();
        allOperator.reporters(Arrays.asList(errorReporter, errorReporter2));
        ((ErrorReporter) Mockito.doThrow(new Throwable[]{new RuntimeException()}).when(errorReporter)).close();
        ((ErrorReporter) Mockito.doThrow(new Throwable[]{new RuntimeException()}).when(errorReporter2)).close();
        allOperator.getClass();
        Assertions.assertThrows(ConnectException.class, allOperator::close);
        ((ErrorReporter) Mockito.verify(errorReporter)).close();
        ((ErrorReporter) Mockito.verify(errorReporter2)).close();
    }
}
