package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.ErrorHandlerContext;
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.test.MockProcessorSupplier;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Tag("integration")
@Timeout(600)
/* loaded from: input_file:org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.class */
public class ProcessingExceptionHandlerIntegrationTest {
    private final String threadId = Thread.currentThread().getName();
    private static final Instant TIMESTAMP = Instant.now();

    /* loaded from: input_file:org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest$ContinueProcessingExceptionHandlerMockTest.class */
    public static class ContinueProcessingExceptionHandlerMockTest implements ProcessingExceptionHandler {
        public ProcessingExceptionHandler.ProcessingHandlerResponse handle(ErrorHandlerContext errorHandlerContext, Record<?, ?> record, Exception exc) {
            if (((String) record.key()).contains("FATAL")) {
                throw new RuntimeException("KABOOM!");
            }
            if (((String) record.key()).contains("NULL")) {
                return null;
            }
            ProcessingExceptionHandlerIntegrationTest.assertProcessingExceptionHandlerInputs(errorHandlerContext, record, exc);
            return ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE;
        }

        public void configure(Map<String, ?> map) {
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest$FailProcessingExceptionHandlerMockTest.class */
    public static class FailProcessingExceptionHandlerMockTest implements ProcessingExceptionHandler {
        public ProcessingExceptionHandler.ProcessingHandlerResponse handle(ErrorHandlerContext errorHandlerContext, Record<?, ?> record, Exception exc) {
            ProcessingExceptionHandlerIntegrationTest.assertProcessingExceptionHandlerInputs(errorHandlerContext, record, exc);
            return ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL;
        }

        public void configure(Map<String, ?> map) {
        }
    }

    @Test
    public void shouldFailWhenProcessingExceptionOccursIfExceptionHandlerReturnsFail() {
        List asList = Arrays.asList(new KeyValue("ID123-1", "ID123-A1"), new KeyValue("ID123-2-ERR", "ID123-A2"), new KeyValue("ID123-3", "ID123-A3"), new KeyValue("ID123-4", "ID123-A4"));
        List singletonList = Collections.singletonList(new KeyValueTimestamp("ID123-1", "ID123-A1", TIMESTAMP.toEpochMilli()));
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String())).map((v1, v2) -> {
            return new KeyValue(v1, v2);
        }).mapValues(str -> {
            return str;
        }).process(runtimeErrorProcessorSupplierMock(), new String[0]).process(mockProcessorSupplier, new String[0]);
        Properties properties = new Properties();
        properties.put("processing.exception.handler", FailProcessingExceptionHandlerMockTest.class);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), properties, Instant.ofEpochMilli(0L));
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer());
                Assertions.assertTrue(Assertions.assertThrows(StreamsException.class, () -> {
                    createInputTopic.pipeKeyValueList(asList, TIMESTAMP, Duration.ZERO);
                }).getMessage().contains("Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=TOPIC_NAME, partition=0, offset=1"));
                Assertions.assertEquals(1, mockProcessorSupplier.theCapturedProcessor().processed().size());
                Assertions.assertIterableEquals(singletonList, mockProcessorSupplier.theCapturedProcessor().processed());
                MetricName droppedRecordsTotalMetric = droppedRecordsTotalMetric();
                MetricName droppedRecordsRateMetric = droppedRecordsRateMetric();
                Assertions.assertEquals(Double.valueOf(0.0d), ((Metric) topologyTestDriver.metrics().get(droppedRecordsTotalMetric)).metricValue());
                Assertions.assertEquals(Double.valueOf(0.0d), ((Metric) topologyTestDriver.metrics().get(droppedRecordsRateMetric)).metricValue());
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldContinueWhenProcessingExceptionOccursIfExceptionHandlerReturnsContinue() {
        List asList = Arrays.asList(new KeyValue("ID123-1", "ID123-A1"), new KeyValue("ID123-2-ERR", "ID123-A2"), new KeyValue("ID123-3", "ID123-A3"), new KeyValue("ID123-4", "ID123-A4"), new KeyValue("ID123-5-ERR", "ID123-A5"), new KeyValue("ID123-6", "ID123-A6"));
        List asList2 = Arrays.asList(new KeyValueTimestamp("ID123-1", "ID123-A1", TIMESTAMP.toEpochMilli()), new KeyValueTimestamp("ID123-3", "ID123-A3", TIMESTAMP.toEpochMilli()), new KeyValueTimestamp("ID123-4", "ID123-A4", TIMESTAMP.toEpochMilli()), new KeyValueTimestamp("ID123-6", "ID123-A6", TIMESTAMP.toEpochMilli()));
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String())).map((v1, v2) -> {
            return new KeyValue(v1, v2);
        }).mapValues(str -> {
            return str;
        }).process(runtimeErrorProcessorSupplierMock(), new String[0]).process(mockProcessorSupplier, new String[0]);
        Properties properties = new Properties();
        properties.put("processing.exception.handler", ContinueProcessingExceptionHandlerMockTest.class);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), properties, Instant.ofEpochMilli(0L));
        Throwable th = null;
        try {
            try {
                topologyTestDriver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer()).pipeKeyValueList(asList, TIMESTAMP, Duration.ZERO);
                Assertions.assertEquals(asList2.size(), mockProcessorSupplier.theCapturedProcessor().processed().size());
                Assertions.assertIterableEquals(asList2, mockProcessorSupplier.theCapturedProcessor().processed());
                MetricName droppedRecordsTotalMetric = droppedRecordsTotalMetric();
                MetricName droppedRecordsRateMetric = droppedRecordsRateMetric();
                Assertions.assertEquals(Double.valueOf(2.0d), ((Metric) topologyTestDriver.metrics().get(droppedRecordsTotalMetric)).metricValue());
                Assertions.assertTrue(((Double) ((Metric) topologyTestDriver.metrics().get(droppedRecordsRateMetric)).metricValue()).doubleValue() > 0.0d);
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldStopOnFailedProcessorWhenProcessingExceptionOccursInFailProcessingExceptionHandler() {
        KeyValue keyValue = new KeyValue("ID123-1", "ID123-A1");
        KeyValue keyValue2 = new KeyValue("ID123-2-ERR", "ID123-A2");
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        streamsBuilder.stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String())).map((v1, v2) -> {
            return new KeyValue(v1, v2);
        }).mapValues(str -> {
            return str;
        }).process(runtimeErrorProcessorSupplierMock(), new String[0]).map((str2, str3) -> {
            atomicBoolean.set(true);
            return KeyValue.pair(str2, str3);
        }).process(mockProcessorSupplier, new String[0]);
        Properties properties = new Properties();
        properties.put("processing.exception.handler", FailProcessingExceptionHandlerMockTest.class);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), properties, Instant.ofEpochMilli(0L));
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer());
                atomicBoolean.set(false);
                createInputTopic.pipeInput(keyValue.key, keyValue.value, TIMESTAMP);
                Assertions.assertTrue(atomicBoolean.get());
                atomicBoolean.set(false);
                Assertions.assertTrue(Assertions.assertThrows(StreamsException.class, () -> {
                    createInputTopic.pipeInput(keyValue2.key, keyValue2.value, TIMESTAMP);
                }).getMessage().contains("Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=TOPIC_NAME, partition=0, offset=1"));
                Assertions.assertFalse(atomicBoolean.get());
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldStopOnFailedProcessorWhenProcessingExceptionOccursInContinueProcessingExceptionHandler() {
        KeyValue keyValue = new KeyValue("ID123-1", "ID123-A1");
        KeyValue keyValue2 = new KeyValue("ID123-2-ERR", "ID123-A2");
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        streamsBuilder.stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String())).map((v1, v2) -> {
            return new KeyValue(v1, v2);
        }).mapValues(str -> {
            return str;
        }).process(runtimeErrorProcessorSupplierMock(), new String[0]).map((str2, str3) -> {
            atomicBoolean.set(true);
            return KeyValue.pair(str2, str3);
        }).process(mockProcessorSupplier, new String[0]);
        Properties properties = new Properties();
        properties.put("processing.exception.handler", ContinueProcessingExceptionHandlerMockTest.class);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), properties, Instant.ofEpochMilli(0L));
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer());
                atomicBoolean.set(false);
                createInputTopic.pipeInput(keyValue.key, keyValue.value, TIMESTAMP);
                Assertions.assertTrue(atomicBoolean.get());
                atomicBoolean.set(false);
                createInputTopic.pipeInput(keyValue2.key, keyValue2.value, TIMESTAMP);
                Assertions.assertFalse(atomicBoolean.get());
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldStopProcessingWhenProcessingExceptionHandlerReturnsNull() {
        KeyValue keyValue = new KeyValue("ID123-1", "ID123-A1");
        KeyValue keyValue2 = new KeyValue("ID123-ERR-NULL", "ID123-A2");
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        streamsBuilder.stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String())).map((v1, v2) -> {
            return new KeyValue(v1, v2);
        }).mapValues(str -> {
            return str;
        }).process(runtimeErrorProcessorSupplierMock(), new String[0]).map((str2, str3) -> {
            atomicBoolean.set(true);
            return KeyValue.pair(str2, str3);
        }).process(mockProcessorSupplier, new String[0]);
        Properties properties = new Properties();
        properties.put("processing.exception.handler", ContinueProcessingExceptionHandlerMockTest.class);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), properties, Instant.ofEpochMilli(0L));
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer());
                atomicBoolean.set(false);
                createInputTopic.pipeInput(keyValue.key, keyValue.value, TIMESTAMP);
                Assertions.assertTrue(atomicBoolean.get());
                atomicBoolean.set(false);
                StreamsException assertThrows = Assertions.assertThrows(StreamsException.class, () -> {
                    createInputTopic.pipeInput(keyValue2.key, keyValue2.value, Instant.EPOCH);
                });
                Assertions.assertEquals("Fatal user code error in processing error callback", assertThrows.getMessage());
                Assertions.assertInstanceOf(NullPointerException.class, assertThrows.getCause());
                Assertions.assertEquals("Invalid ProductionExceptionHandler response.", assertThrows.getCause().getMessage());
                Assertions.assertFalse(atomicBoolean.get());
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldStopProcessingWhenFatalUserExceptionProcessingExceptionHandler() {
        KeyValue keyValue = new KeyValue("ID123-1", "ID123-A1");
        KeyValue keyValue2 = new KeyValue("ID123-ERR-FATAL", "ID123-A2");
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        streamsBuilder.stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String())).map((v1, v2) -> {
            return new KeyValue(v1, v2);
        }).mapValues(str -> {
            return str;
        }).process(runtimeErrorProcessorSupplierMock(), new String[0]).map((str2, str3) -> {
            atomicBoolean.set(true);
            return KeyValue.pair(str2, str3);
        }).process(mockProcessorSupplier, new String[0]);
        Properties properties = new Properties();
        properties.put("processing.exception.handler", ContinueProcessingExceptionHandlerMockTest.class);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), properties, Instant.ofEpochMilli(0L));
        Throwable th = null;
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer());
            atomicBoolean.set(false);
            createInputTopic.pipeInput(keyValue.key, keyValue.value, TIMESTAMP);
            Assertions.assertTrue(atomicBoolean.get());
            atomicBoolean.set(false);
            StreamsException assertThrows = Assertions.assertThrows(StreamsException.class, () -> {
                createInputTopic.pipeInput(keyValue2.key, keyValue2.value, Instant.EPOCH);
            });
            Assertions.assertEquals("Fatal user code error in processing error callback", assertThrows.getMessage());
            Assertions.assertEquals("KABOOM!", assertThrows.getCause().getMessage());
            Assertions.assertFalse(atomicBoolean.get());
            if (topologyTestDriver != null) {
                if (0 == 0) {
                    topologyTestDriver.close();
                    return;
                }
                try {
                    topologyTestDriver.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void assertProcessingExceptionHandlerInputs(ErrorHandlerContext errorHandlerContext, Record<?, ?> record, Exception exc) {
        Assertions.assertTrue(Arrays.asList("ID123-2-ERR", "ID123-5-ERR").contains((String) record.key()));
        Assertions.assertTrue(Arrays.asList("ID123-A2", "ID123-A5").contains((String) record.value()));
        Assertions.assertEquals("TOPIC_NAME", errorHandlerContext.topic());
        Assertions.assertEquals("KSTREAM-PROCESSOR-0000000003", errorHandlerContext.processorNodeId());
        Assertions.assertEquals(TIMESTAMP.toEpochMilli(), errorHandlerContext.timestamp());
        Assertions.assertTrue(exc.getMessage().contains("Exception should be handled by processing exception handler"));
    }

    private MetricName droppedRecordsTotalMetric() {
        return new MetricName("dropped-records-total", "stream-task-metrics", "The total number of dropped records", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", this.threadId), Utils.mkEntry("task-id", "0_0")}));
    }

    private MetricName droppedRecordsRateMetric() {
        return new MetricName("dropped-records-rate", "stream-task-metrics", "The average number of dropped records per second", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", this.threadId), Utils.mkEntry("task-id", "0_0")}));
    }

    private ProcessorSupplier<String, String, String, String> runtimeErrorProcessorSupplierMock() {
        return () -> {
            return new ContextualProcessor<String, String, String, String>() { // from class: org.apache.kafka.streams.integration.ProcessingExceptionHandlerIntegrationTest.1
                public void process(Record<String, String> record) {
                    if (((String) record.key()).contains("ERR")) {
                        throw new RuntimeException("Exception should be handled by processing exception handler");
                    }
                    context().forward(new Record(record.key(), record.value(), record.timestamp()));
                }
            };
        };
    }
}
