package org.apache.kafka.streams.processor.internals;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.SensorAccessor;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/SourceNodeTest.class */
public class SourceNodeTest {
    private MockedStatic<WrappingNullableUtils> utilsMock;

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/SourceNodeTest$TheDeserializer.class */
    public static class TheDeserializer implements Deserializer<String> {
        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public String m138deserialize(String str, Headers headers, byte[] bArr) {
            return str + headers + new String(bArr, StandardCharsets.UTF_8);
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public String m139deserialize(String str, byte[] bArr) {
            return m138deserialize(str, (Headers) null, bArr);
        }
    }

    @BeforeEach
    public void setup() {
        this.utilsMock = Mockito.mockStatic(WrappingNullableUtils.class);
    }

    @AfterEach
    public void cleanup() {
        this.utilsMock.close();
    }

    @Test
    public void shouldProvideTopicHeadersAndDataToKeyDeserializer() {
        MockSourceNode mockSourceNode = new MockSourceNode(new TheDeserializer(), new TheDeserializer());
        RecordHeaders recordHeaders = new RecordHeaders();
        MatcherAssert.assertThat((String) mockSourceNode.deserializeKey(AssignmentTestUtils.TOPIC_PREFIX, recordHeaders, "data".getBytes(StandardCharsets.UTF_8)), CoreMatchers.is(AssignmentTestUtils.TOPIC_PREFIX + recordHeaders + "data"));
    }

    @Test
    public void shouldProvideTopicHeadersAndDataToValueDeserializer() {
        MockSourceNode mockSourceNode = new MockSourceNode(new TheDeserializer(), new TheDeserializer());
        RecordHeaders recordHeaders = new RecordHeaders();
        MatcherAssert.assertThat((String) mockSourceNode.deserializeValue(AssignmentTestUtils.TOPIC_PREFIX, recordHeaders, "data".getBytes(StandardCharsets.UTF_8)), CoreMatchers.is(AssignmentTestUtils.TOPIC_PREFIX + recordHeaders + "data"));
    }

    @Test
    public void shouldExposeProcessMetrics() {
        Metrics metrics = new Metrics();
        InternalMockProcessorContext internalMockProcessorContext = new InternalMockProcessorContext(new StreamsMetricsImpl(metrics, "test-client", "latest", new MockTime()));
        SourceNode sourceNode = new SourceNode(internalMockProcessorContext.currentNode().name(), new TheDeserializer(), new TheDeserializer());
        sourceNode.init(internalMockProcessorContext);
        String name = Thread.currentThread().getName();
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", name), Utils.mkEntry("task-id", internalMockProcessorContext.taskId().toString()), Utils.mkEntry("processor-node-id", sourceNode.name())});
        Assertions.assertTrue(StreamsTestUtils.containsMetric(metrics, "process-rate", "stream-processor-node-metrics", mkMap));
        Assertions.assertTrue(StreamsTestUtils.containsMetric(metrics, "process-total", "stream-processor-node-metrics", mkMap));
        mkMap.remove("processor-node-id");
        Assertions.assertTrue(StreamsTestUtils.containsMetric(metrics, "process-rate", "stream-task-metrics", mkMap));
        Assertions.assertTrue(StreamsTestUtils.containsMetric(metrics, "process-total", "stream-task-metrics", mkMap));
        String str = "internal." + name + ".task." + internalMockProcessorContext.taskId().toString();
        MatcherAssert.assertThat(new SensorAccessor(metrics.getSensor(str + ".node." + internalMockProcessorContext.currentNode().name() + ".s.process")).parents().stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toList()), Matchers.contains(new String[]{str + ".s.process"}));
    }

    @Test
    public void shouldThrowStreamsExceptionOnUndefinedKeySerde() {
        InternalMockProcessorContext internalMockProcessorContext = new InternalMockProcessorContext();
        SourceNode sourceNode = new SourceNode(internalMockProcessorContext.currentNode().name(), new TheDeserializer(), new TheDeserializer());
        this.utilsMock.when(() -> {
            WrappingNullableUtils.prepareKeyDeserializer((Deserializer) ArgumentMatchers.any(), (ProcessorContext) ArgumentMatchers.any(), (String) ArgumentMatchers.any());
        }).thenThrow(new Throwable[]{new ConfigException("Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG")});
        Throwable assertThrows = Assertions.assertThrows(StreamsException.class, () -> {
            sourceNode.init(internalMockProcessorContext);
        });
        MatcherAssert.assertThat(assertThrows.getMessage(), Matchers.equalTo("Failed to initialize key serdes for source node TESTING_NODE"));
        MatcherAssert.assertThat(assertThrows.getCause().getMessage(), Matchers.equalTo("Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG"));
    }

    @Test
    public void shouldThrowStreamsExceptionOnUndefinedValueSerde() {
        InternalMockProcessorContext internalMockProcessorContext = new InternalMockProcessorContext();
        SourceNode sourceNode = new SourceNode(internalMockProcessorContext.currentNode().name(), new TheDeserializer(), new TheDeserializer());
        this.utilsMock.when(() -> {
            WrappingNullableUtils.prepareValueDeserializer((Deserializer) ArgumentMatchers.any(), (ProcessorContext) ArgumentMatchers.any(), (String) ArgumentMatchers.any());
        }).thenThrow(new Throwable[]{new ConfigException("Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG")});
        Throwable assertThrows = Assertions.assertThrows(StreamsException.class, () -> {
            sourceNode.init(internalMockProcessorContext);
        });
        MatcherAssert.assertThat(assertThrows.getMessage(), Matchers.equalTo("Failed to initialize value serdes for source node TESTING_NODE"));
        MatcherAssert.assertThat(assertThrows.getCause().getMessage(), Matchers.equalTo("Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG"));
    }

    @Test
    public void shouldThrowStreamsExceptionWithExplicitErrorMessage() {
        InternalMockProcessorContext internalMockProcessorContext = new InternalMockProcessorContext();
        SourceNode sourceNode = new SourceNode(internalMockProcessorContext.currentNode().name(), new TheDeserializer(), new TheDeserializer());
        this.utilsMock.when(() -> {
            WrappingNullableUtils.prepareKeyDeserializer((Deserializer) ArgumentMatchers.any(), (ProcessorContext) ArgumentMatchers.any(), (String) ArgumentMatchers.any());
        }).thenThrow(new Throwable[]{new StreamsException("")});
        MatcherAssert.assertThat(Assertions.assertThrows(StreamsException.class, () -> {
            sourceNode.init(internalMockProcessorContext);
        }).getMessage(), Matchers.equalTo("Failed to initialize key serdes for source node TESTING_NODE"));
    }
}
