package org.apache.kafka.connect.runtime.errors;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.ErrorHandlingTaskTest;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.isolation.PluginsTest;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.easymock.EasyMock;
import org.easymock.Mock;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@PrepareForTest({ProcessingContext.class})
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.management.*"})
/* loaded from: input_file:org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.class */
public class RetryWithToleranceOperatorTest {
    public static final RetryWithToleranceOperator NOOP_OPERATOR = new RetryWithToleranceOperator(0, ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, ToleranceType.NONE, Time.SYSTEM);
    public static final RetryWithToleranceOperator ALL_OPERATOR = new RetryWithToleranceOperator(0, ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, ToleranceType.ALL, Time.SYSTEM);

    @Mock
    private Operation<String> mockOperation;

    @Mock
    private ConsumerRecord<byte[], byte[]> consumerRecord;

    @Mock
    ErrorHandlingMetrics errorHandlingMetrics;

    @Mock
    Plugins plugins;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest$ExceptionThrower.class */
    public static class ExceptionThrower implements Operation<Object> {
        private Exception e;

        public ExceptionThrower(Exception exc) {
            this.e = exc;
        }

        public Object call() throws Exception {
            throw this.e;
        }
    }

    @Test
    public void testExecuteFailed() {
        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(0L, ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, ToleranceType.ALL, Time.SYSTEM);
        retryWithToleranceOperator.metrics(this.errorHandlingMetrics);
        retryWithToleranceOperator.executeFailed(Stage.TASK_PUT, SinkTask.class, this.consumerRecord, new Throwable());
    }

    @Test
    public void testExecuteFailedNoTolerance() {
        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(0L, ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, ToleranceType.NONE, Time.SYSTEM);
        retryWithToleranceOperator.metrics(this.errorHandlingMetrics);
        Assert.assertThrows(ConnectException.class, () -> {
            retryWithToleranceOperator.executeFailed(Stage.TASK_PUT, SinkTask.class, this.consumerRecord, new Throwable());
        });
    }

    @Test
    public void testHandleExceptionInTransformations() {
        testHandleExceptionInStage(Stage.TRANSFORMATION, new Exception());
    }

    @Test
    public void testHandleExceptionInHeaderConverter() {
        testHandleExceptionInStage(Stage.HEADER_CONVERTER, new Exception());
    }

    @Test
    public void testHandleExceptionInValueConverter() {
        testHandleExceptionInStage(Stage.VALUE_CONVERTER, new Exception());
    }

    @Test
    public void testHandleExceptionInKeyConverter() {
        testHandleExceptionInStage(Stage.KEY_CONVERTER, new Exception());
    }

    @Test
    public void testHandleExceptionInTaskPut() {
        testHandleExceptionInStage(Stage.TASK_PUT, new RetriableException("Test"));
    }

    @Test
    public void testHandleExceptionInTaskPoll() {
        testHandleExceptionInStage(Stage.TASK_POLL, new RetriableException("Test"));
    }

    @Test
    public void testThrowExceptionInTaskPut() {
        Assert.assertThrows(ConnectException.class, () -> {
            testHandleExceptionInStage(Stage.TASK_PUT, new Exception());
        });
    }

    @Test
    public void testThrowExceptionInTaskPoll() {
        Assert.assertThrows(ConnectException.class, () -> {
            testHandleExceptionInStage(Stage.TASK_POLL, new Exception());
        });
    }

    @Test
    public void testThrowExceptionInKafkaConsume() {
        Assert.assertThrows(ConnectException.class, () -> {
            testHandleExceptionInStage(Stage.KAFKA_CONSUME, new Exception());
        });
    }

    @Test
    public void testThrowExceptionInKafkaProduce() {
        Assert.assertThrows(ConnectException.class, () -> {
            testHandleExceptionInStage(Stage.KAFKA_PRODUCE, new Exception());
        });
    }

    private void testHandleExceptionInStage(Stage stage, Exception exc) {
        RetryWithToleranceOperator retryWithToleranceOperator = setupExecutor();
        retryWithToleranceOperator.execute(new ExceptionThrower(exc), stage, ExceptionThrower.class);
        Assert.assertTrue(retryWithToleranceOperator.failed());
        PowerMock.verifyAll();
    }

    private RetryWithToleranceOperator setupExecutor() {
        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(0L, ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, ToleranceType.ALL, Time.SYSTEM);
        retryWithToleranceOperator.metrics(this.errorHandlingMetrics);
        return retryWithToleranceOperator;
    }

    @Test
    public void testExecAndHandleRetriableErrorOnce() throws Exception {
        execAndHandleRetriableError(1, 300L, new RetriableException("Test"));
    }

    @Test
    public void testExecAndHandleRetriableErrorThrice() throws Exception {
        execAndHandleRetriableError(3, 2100L, new RetriableException("Test"));
    }

    @Test
    public void testExecAndHandleNonRetriableErrorOnce() throws Exception {
        execAndHandleNonRetriableError(1, 0L, new Exception("Non Retriable Test"));
    }

    @Test
    public void testExecAndHandleNonRetriableErrorThrice() throws Exception {
        execAndHandleNonRetriableError(3, 0L, new Exception("Non Retriable Test"));
    }

    public void execAndHandleRetriableError(int i, long j, Exception exc) throws Exception {
        MockTime mockTime = new MockTime(0L, 0L, 0L);
        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(6000L, ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, ToleranceType.ALL, mockTime);
        retryWithToleranceOperator.metrics(this.errorHandlingMetrics);
        EasyMock.expect(this.mockOperation.call()).andThrow(exc).times(i);
        EasyMock.expect(this.mockOperation.call()).andReturn("Success");
        EasyMock.replay(new Object[]{this.mockOperation});
        String str = (String) retryWithToleranceOperator.execAndHandleError(this.mockOperation, Exception.class);
        Assert.assertFalse(retryWithToleranceOperator.failed());
        Assert.assertEquals("Success", str);
        Assert.assertEquals(j, mockTime.hiResClockMs());
        PowerMock.verifyAll();
    }

    public void execAndHandleNonRetriableError(int i, long j, Exception exc) throws Exception {
        MockTime mockTime = new MockTime(0L, 0L, 0L);
        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(6000L, ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, ToleranceType.ALL, mockTime);
        retryWithToleranceOperator.metrics(this.errorHandlingMetrics);
        EasyMock.expect(this.mockOperation.call()).andThrow(exc).times(i);
        EasyMock.expect(this.mockOperation.call()).andReturn("Success");
        EasyMock.replay(new Object[]{this.mockOperation});
        String str = (String) retryWithToleranceOperator.execAndHandleError(this.mockOperation, Exception.class);
        Assert.assertTrue(retryWithToleranceOperator.failed());
        Assert.assertNull(str);
        Assert.assertEquals(j, mockTime.hiResClockMs());
        PowerMock.verifyAll();
    }

    @Test
    public void testCheckRetryLimit() {
        MockTime mockTime = new MockTime(0L, 0L, 0L);
        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(500L, 100L, ToleranceType.NONE, mockTime);
        mockTime.setCurrentTimeMs(100L);
        Assert.assertTrue(retryWithToleranceOperator.checkRetry(0L));
        mockTime.setCurrentTimeMs(200L);
        Assert.assertTrue(retryWithToleranceOperator.checkRetry(0L));
        mockTime.setCurrentTimeMs(400L);
        Assert.assertTrue(retryWithToleranceOperator.checkRetry(0L));
        mockTime.setCurrentTimeMs(499L);
        Assert.assertTrue(retryWithToleranceOperator.checkRetry(0L));
        mockTime.setCurrentTimeMs(501L);
        Assert.assertFalse(retryWithToleranceOperator.checkRetry(0L));
        mockTime.setCurrentTimeMs(600L);
        Assert.assertFalse(retryWithToleranceOperator.checkRetry(0L));
    }

    @Test
    public void testBackoffLimit() {
        MockTime mockTime = new MockTime(0L, 0L, 0L);
        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(5L, ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS, ToleranceType.NONE, mockTime);
        long hiResClockMs = mockTime.hiResClockMs();
        retryWithToleranceOperator.backoff(1, ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS);
        Assert.assertEquals(300L, mockTime.hiResClockMs() - hiResClockMs);
        long hiResClockMs2 = mockTime.hiResClockMs();
        retryWithToleranceOperator.backoff(2, ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS);
        Assert.assertEquals(600L, mockTime.hiResClockMs() - hiResClockMs2);
        long hiResClockMs3 = mockTime.hiResClockMs();
        retryWithToleranceOperator.backoff(3, ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS);
        Assert.assertEquals(1200L, mockTime.hiResClockMs() - hiResClockMs3);
        long hiResClockMs4 = mockTime.hiResClockMs();
        retryWithToleranceOperator.backoff(4, ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS);
        Assert.assertEquals(2400L, mockTime.hiResClockMs() - hiResClockMs4);
        long hiResClockMs5 = mockTime.hiResClockMs();
        retryWithToleranceOperator.backoff(5, ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS);
        Assert.assertEquals(500L, mockTime.hiResClockMs() - hiResClockMs5);
        long hiResClockMs6 = mockTime.hiResClockMs();
        retryWithToleranceOperator.backoff(6, ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS);
        Assert.assertEquals(0L, mockTime.hiResClockMs() - hiResClockMs6);
        PowerMock.verifyAll();
    }

    @Test
    public void testToleranceLimit() {
        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(0L, ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, ToleranceType.NONE, Time.SYSTEM);
        retryWithToleranceOperator.metrics(this.errorHandlingMetrics);
        retryWithToleranceOperator.markAsFailed();
        Assert.assertFalse("should not tolerate any errors", retryWithToleranceOperator.withinToleranceLimits());
        RetryWithToleranceOperator retryWithToleranceOperator2 = new RetryWithToleranceOperator(0L, ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, ToleranceType.ALL, Time.SYSTEM);
        retryWithToleranceOperator2.metrics(this.errorHandlingMetrics);
        retryWithToleranceOperator2.markAsFailed();
        retryWithToleranceOperator2.markAsFailed();
        Assert.assertTrue("should tolerate all errors", retryWithToleranceOperator2.withinToleranceLimits());
        Assert.assertTrue("no tolerance is within limits if no failures", new RetryWithToleranceOperator(0L, ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, ToleranceType.NONE, Time.SYSTEM).withinToleranceLimits());
    }

    @Test
    public void testDefaultConfigs() {
        ConnectorConfig config = config(Collections.emptyMap());
        Assert.assertEquals(config.errorRetryTimeout(), 0L);
        Assert.assertEquals(config.errorMaxDelayInMillis(), ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS);
        Assert.assertEquals(config.errorToleranceType(), ConnectorConfig.ERRORS_TOLERANCE_DEFAULT);
        PowerMock.verifyAll();
    }

    ConnectorConfig config(Map<String, String> map) {
        HashMap hashMap = new HashMap();
        hashMap.put("name", "test");
        hashMap.put("connector.class", SinkTask.class.getName());
        hashMap.putAll(map);
        return new ConnectorConfig(this.plugins, hashMap);
    }

    @Test
    public void testSetConfigs() {
        Assert.assertEquals(config(Collections.singletonMap("errors.retry.timeout", "100")).errorRetryTimeout(), 100L);
        Assert.assertEquals(config(Collections.singletonMap("errors.retry.delay.max.ms", "100")).errorMaxDelayInMillis(), 100L);
        Assert.assertEquals(config(Collections.singletonMap("errors.tolerance", "none")).errorToleranceType(), ToleranceType.NONE);
        PowerMock.verifyAll();
    }

    @Test
    public void testThreadSafety() throws Throwable {
        long j = ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS;
        int i = 10;
        final AtomicReference atomicReference = new AtomicReference(null);
        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(0L, ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS, ToleranceType.ALL, Time.SYSTEM, new ProcessingContext() { // from class: org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest.1
            private AtomicInteger count = new AtomicInteger();
            private AtomicInteger attempt = new AtomicInteger();

            public void error(Throwable th) {
                if (this.count.getAndIncrement() > 0) {
                    atomicReference.compareAndSet(null, new AssertionError("Concurrent call to error()"));
                }
                super.error(th);
            }

            public Future<Void> report() {
                if (this.count.getAndSet(0) > 1) {
                    atomicReference.compareAndSet(null, new AssertionError("Concurrent call to error() in report()"));
                }
                return super.report();
            }

            public void currentContext(Stage stage, Class<?> cls) {
                this.attempt.set(0);
                super.currentContext(stage, cls);
            }

            public void attempt(int i2) {
                if (!this.attempt.compareAndSet(i2 - 1, i2)) {
                    atomicReference.compareAndSet(null, new AssertionError("Concurrent call to attempt(): Attempts should increase monotonically within the scope of a given currentContext()"));
                }
                super.attempt(i2);
            }
        });
        retryWithToleranceOperator.metrics(this.errorHandlingMetrics);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
        List list = (List) IntStream.range(0, 10).boxed().map(num -> {
            return newFixedThreadPool.submit(() -> {
                long currentTimeMillis = System.currentTimeMillis();
                long j2 = 0;
                while (true) {
                    long j3 = j2 + 1;
                    j2 = j3;
                    if ((j3 % 10000 == 0 && System.currentTimeMillis() > currentTimeMillis + j) || atomicReference.get() != null) {
                        return;
                    }
                    try {
                        if (num.intValue() < i / 2) {
                            retryWithToleranceOperator.executeFailed(Stage.TASK_PUT, SinkTask.class, this.consumerRecord, new Throwable()).get();
                        } else {
                            retryWithToleranceOperator.execute(() -> {
                                return null;
                            }, Stage.TRANSFORMATION, SinkTask.class);
                        }
                    } catch (Exception e) {
                        atomicReference.compareAndSet(null, e);
                    }
                }
            });
        }).collect(Collectors.toList());
        newFixedThreadPool.shutdown();
        newFixedThreadPool.awaitTermination((long) (1.5d * ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS), TimeUnit.MILLISECONDS);
        list.forEach(future -> {
            try {
                future.get();
            } catch (Exception e) {
                atomicReference.compareAndSet(null, e);
            }
        });
        Throwable th = (Throwable) atomicReference.get();
        if (th != null) {
            throw th;
        }
    }

    static {
        HashMap hashMap = new HashMap();
        hashMap.put("metrics.num.samples", Objects.toString(2));
        hashMap.put("metrics.sample.window.ms", Objects.toString(3000));
        hashMap.put("metrics.recording.level", Sensor.RecordingLevel.INFO.toString());
        hashMap.put("key.converter", PluginsTest.TestConverter.class.getName());
        hashMap.put("value.converter", PluginsTest.TestConverter.class.getName());
        NOOP_OPERATOR.metrics(new ErrorHandlingMetrics(new ConnectorTaskId("noop-connector", -1), new ConnectMetrics("noop-worker", new PluginsTest.TestableWorkerConfig(hashMap), new SystemTime(), "test-cluster")));
        ALL_OPERATOR.metrics(new ErrorHandlingMetrics(new ConnectorTaskId("errors-all-tolerate-connector", -1), new ConnectMetrics("errors-all-tolerate-worker", new PluginsTest.TestableWorkerConfig(hashMap), new SystemTime(), "test-cluster")));
    }
}
