package org.apache.kafka.streams.processor.internals;

import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.ErrorHandlerContext;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/RecordDeserializerTest.class */
public class RecordDeserializerTest {
    private final String sourceNodeName = "source-node";
    private final TaskId taskId = new TaskId(0, 0);
    private final RecordHeaders headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
    private final ConsumerRecord<byte[], byte[]> rawRecord = new ConsumerRecord<>(AssignmentTestUtils.TOPIC_PREFIX, 1, 1, 10, TimestampType.LOG_APPEND_TIME, 3, 5, new byte[0], new byte[0], this.headers, Optional.of(5));
    private final InternalProcessorContext<Void, Void> context = new InternalMockProcessorContext();

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/RecordDeserializerTest$DeserializationExceptionHandlerMock.class */
    public static class DeserializationExceptionHandlerMock implements DeserializationExceptionHandler {
        private final Optional<DeserializationExceptionHandler.DeserializationHandlerResponse> response;
        private final ConsumerRecord<byte[], byte[]> expectedRecord;
        private final String expectedProcessorNodeId;
        private final TaskId expectedTaskId;

        public DeserializationExceptionHandlerMock(Optional<DeserializationExceptionHandler.DeserializationHandlerResponse> optional, ConsumerRecord<byte[], byte[]> consumerRecord, String str, TaskId taskId) {
            this.response = optional;
            this.expectedRecord = consumerRecord;
            this.expectedProcessorNodeId = str;
            this.expectedTaskId = taskId;
        }

        public DeserializationExceptionHandler.DeserializationHandlerResponse handle(ErrorHandlerContext errorHandlerContext, ConsumerRecord<byte[], byte[]> consumerRecord, Exception exc) {
            Assertions.assertEquals(this.expectedRecord.topic(), errorHandlerContext.topic());
            Assertions.assertEquals(this.expectedRecord.partition(), errorHandlerContext.partition());
            Assertions.assertEquals(this.expectedRecord.offset(), errorHandlerContext.offset());
            Assertions.assertEquals(this.expectedProcessorNodeId, errorHandlerContext.processorNodeId());
            Assertions.assertEquals(this.expectedTaskId, errorHandlerContext.taskId());
            Assertions.assertEquals(this.expectedRecord.timestamp(), errorHandlerContext.timestamp());
            Assertions.assertEquals(this.expectedRecord, consumerRecord);
            Assertions.assertInstanceOf(RuntimeException.class, exc);
            Assertions.assertEquals("KABOOM!", exc.getMessage());
            if (this.response == null) {
                throw new RuntimeException("CRASH");
            }
            return this.response.orElse(null);
        }

        public void configure(Map<String, ?> map) {
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/RecordDeserializerTest$TheSourceNode.class */
    static class TheSourceNode extends SourceNode<Object, Object> {
        private final boolean keyThrowsException;
        private final boolean valueThrowsException;
        private final Object key;
        private final Object value;

        TheSourceNode(String str, boolean z, boolean z2, Object obj, Object obj2) {
            super(str, (Deserializer) null, (Deserializer) null);
            this.keyThrowsException = z;
            this.valueThrowsException = z2;
            this.key = obj;
            this.value = obj2;
        }

        public Object deserializeKey(String str, Headers headers, byte[] bArr) {
            if (this.keyThrowsException) {
                throw new RuntimeException("KABOOM!");
            }
            return this.key;
        }

        public Object deserializeValue(String str, Headers headers, byte[] bArr) {
            if (this.valueThrowsException) {
                throw new RuntimeException("KABOOM!");
            }
            return this.value;
        }
    }

    @Test
    public void shouldReturnConsumerRecordWithDeserializedValueWhenNoExceptions() {
        Metrics metrics = new Metrics();
        try {
            ConsumerRecord deserialize = new RecordDeserializer(new TheSourceNode("source-node", false, false, "key", "value"), (DeserializationExceptionHandler) null, new LogContext(), metrics.sensor("dropped-records")).deserialize((ProcessorContext) null, this.rawRecord);
            Assertions.assertEquals(this.rawRecord.topic(), deserialize.topic());
            Assertions.assertEquals(this.rawRecord.partition(), deserialize.partition());
            Assertions.assertEquals(this.rawRecord.offset(), deserialize.offset());
            Assertions.assertEquals("key", deserialize.key());
            Assertions.assertEquals("value", deserialize.value());
            Assertions.assertEquals(this.rawRecord.timestamp(), deserialize.timestamp());
            Assertions.assertEquals(TimestampType.CREATE_TIME, deserialize.timestampType());
            Assertions.assertEquals(this.rawRecord.headers(), deserialize.headers());
            Assertions.assertEquals(this.rawRecord.leaderEpoch(), deserialize.leaderEpoch());
            metrics.close();
        } catch (Throwable th) {
            try {
                metrics.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @ParameterizedTest
    @CsvSource({"true, true", "true, false", "false, true"})
    public void shouldThrowStreamsExceptionWhenDeserializationFailsAndExceptionHandlerRepliesWithFail(boolean z, boolean z2) {
        Metrics metrics = new Metrics();
        try {
            RecordDeserializer recordDeserializer = new RecordDeserializer(new TheSourceNode("source-node", z, z2, "key", "value"), new DeserializationExceptionHandlerMock(Optional.of(DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL), this.rawRecord, "source-node", this.taskId), new LogContext(), metrics.sensor("dropped-records"));
            Assertions.assertEquals(Assertions.assertThrows(StreamsException.class, () -> {
                recordDeserializer.deserialize(this.context, this.rawRecord);
            }).getMessage(), "Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the deserialization.exception.handler appropriately.");
            metrics.close();
        } catch (Throwable th) {
            try {
                metrics.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @ParameterizedTest
    @CsvSource({"true, true", "true, false", "false, true"})
    public void shouldNotThrowStreamsExceptionWhenDeserializationFailsAndExceptionHandlerRepliesWithContinue(boolean z, boolean z2) {
        Metrics metrics = new Metrics();
        try {
            Assertions.assertNull(new RecordDeserializer(new TheSourceNode("source-node", z, z2, "key", "value"), new DeserializationExceptionHandlerMock(Optional.of(DeserializationExceptionHandler.DeserializationHandlerResponse.CONTINUE), this.rawRecord, "source-node", this.taskId), new LogContext(), metrics.sensor("dropped-records")).deserialize(this.context, this.rawRecord));
            metrics.close();
        } catch (Throwable th) {
            try {
                metrics.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldFailWhenDeserializationFailsAndExceptionHandlerReturnsNull() {
        Metrics metrics = new Metrics();
        try {
            RecordDeserializer recordDeserializer = new RecordDeserializer(new TheSourceNode("source-node", true, false, "key", "value"), new DeserializationExceptionHandlerMock(Optional.empty(), this.rawRecord, "source-node", this.taskId), new LogContext(), metrics.sensor("dropped-records"));
            StreamsException assertThrows = Assertions.assertThrows(StreamsException.class, () -> {
                recordDeserializer.deserialize(this.context, this.rawRecord);
            });
            Assertions.assertEquals("Fatal user code error in deserialization error callback", assertThrows.getMessage());
            Assertions.assertInstanceOf(NullPointerException.class, assertThrows.getCause());
            Assertions.assertEquals("Invalid DeserializationExceptionHandler response.", assertThrows.getCause().getMessage());
            metrics.close();
        } catch (Throwable th) {
            try {
                metrics.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldFailWhenDeserializationFailsAndExceptionHandlerThrows() {
        Metrics metrics = new Metrics();
        try {
            RecordDeserializer recordDeserializer = new RecordDeserializer(new TheSourceNode("source-node", true, false, "key", "value"), new DeserializationExceptionHandlerMock(null, this.rawRecord, "source-node", this.taskId), new LogContext(), metrics.sensor("dropped-records"));
            StreamsException assertThrows = Assertions.assertThrows(StreamsException.class, () -> {
                recordDeserializer.deserialize(this.context, this.rawRecord);
            });
            Assertions.assertEquals("Fatal user code error in deserialization error callback", assertThrows.getMessage());
            Assertions.assertEquals("CRASH", assertThrows.getCause().getMessage());
            metrics.close();
        } catch (Throwable th) {
            try {
                metrics.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }
}
