package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/SinkNodeTest.class */
public class SinkNodeTest {
    private final StateSerdes<Bytes, Bytes> anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
    private final Serializer<byte[]> anySerializer = Serdes.ByteArray().serializer();
    private final RecordCollector recordCollector = new MockRecordCollector();
    private final InternalMockProcessorContext<Void, Void> context = new InternalMockProcessorContext<>((StateSerdes<?, ?>) this.anyStateSerde, this.recordCollector);
    private final SinkNode<byte[], byte[]> sink = new SinkNode<>("anyNodeName", new StaticTopicNameExtractor("any-output-topic"), this.anySerializer, this.anySerializer, (StreamPartitioner) null);
    private final SinkNode<Object, Object> illTypedSink = this.sink;
    private MockedStatic<WrappingNullableUtils> utilsMock;

    @BeforeEach
    public void setup() {
        this.utilsMock = Mockito.mockStatic(WrappingNullableUtils.class);
    }

    @AfterEach
    public void cleanup() {
        this.utilsMock.close();
    }

    @Test
    public void shouldThrowStreamsExceptionOnInputRecordWithInvalidTimestamp() {
        this.sink.init(this.context);
        this.context.setTime(-1L);
        try {
            this.illTypedSink.process(new Record("any key".getBytes(), "any value".getBytes(), -1L));
            Assert.fail("Should have thrown StreamsException");
        } catch (StreamsException e) {
        }
    }

    @Test
    public void shouldThrowStreamsExceptionOnUndefinedKeySerde() {
        this.utilsMock.when(() -> {
            WrappingNullableUtils.prepareKeySerializer((Serializer) ArgumentMatchers.any(), (ProcessorContext) ArgumentMatchers.any(), (String) ArgumentMatchers.any());
        }).thenThrow(new Throwable[]{new ConfigException("Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG")});
        Throwable assertThrows = Assertions.assertThrows(StreamsException.class, () -> {
            this.sink.init(this.context);
        });
        MatcherAssert.assertThat(assertThrows.getMessage(), Matchers.equalTo("Failed to initialize key serdes for sink node anyNodeName"));
        MatcherAssert.assertThat(assertThrows.getCause().getMessage(), Matchers.equalTo("Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG"));
    }

    @Test
    public void shouldThrowStreamsExceptionOnUndefinedValueSerde() {
        this.utilsMock.when(() -> {
            WrappingNullableUtils.prepareValueSerializer((Serializer) ArgumentMatchers.any(), (ProcessorContext) ArgumentMatchers.any(), (String) ArgumentMatchers.any());
        }).thenThrow(new Throwable[]{new ConfigException("Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG")});
        Throwable assertThrows = Assertions.assertThrows(StreamsException.class, () -> {
            this.sink.init(this.context);
        });
        MatcherAssert.assertThat(assertThrows.getMessage(), Matchers.equalTo("Failed to initialize value serdes for sink node anyNodeName"));
        MatcherAssert.assertThat(assertThrows.getCause().getMessage(), Matchers.equalTo("Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG"));
    }

    @Test
    public void shouldThrowStreamsExceptionWithExplicitErrorMessage() {
        this.utilsMock.when(() -> {
            WrappingNullableUtils.prepareKeySerializer((Serializer) ArgumentMatchers.any(), (ProcessorContext) ArgumentMatchers.any(), (String) ArgumentMatchers.any());
        }).thenThrow(new Throwable[]{new StreamsException("")});
        MatcherAssert.assertThat(Assertions.assertThrows(StreamsException.class, () -> {
            this.sink.init(this.context);
        }).getMessage(), Matchers.equalTo("Failed to initialize key serdes for sink node anyNodeName"));
    }
}
