package org.apache.kafka.streams.processor.internals;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Properties;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorNodeTest.class */
public class ProcessorNodeTest {

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorNodeTest$ClassCastProcessor.class */
    private static class ClassCastProcessor extends ExceptionalProcessor {
        private ClassCastProcessor() {
            super();
        }

        @Override // org.apache.kafka.streams.processor.internals.ProcessorNodeTest.ExceptionalProcessor
        public void init(ProcessorContext<Object, Object> processorContext) {
        }

        @Override // org.apache.kafka.streams.processor.internals.ProcessorNodeTest.ExceptionalProcessor
        public void process(Record<Object, Object> record) {
            throw new ClassCastException("Incompatible types simulation exception.");
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorNodeTest$ExceptionalProcessor.class */
    private static class ExceptionalProcessor implements Processor<Object, Object, Object, Object> {
        private ExceptionalProcessor() {
        }

        public void init(ProcessorContext<Object, Object> processorContext) {
            throw new RuntimeException();
        }

        public void process(Record<Object, Object> record) {
            throw new RuntimeException();
        }

        public void close() {
            throw new RuntimeException();
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorNodeTest$NoOpProcessor.class */
    private static class NoOpProcessor implements Processor<Object, Object, Object, Object> {
        private NoOpProcessor() {
        }

        public void process(Record<Object, Object> record) {
        }
    }

    @Test
    public void shouldThrowStreamsExceptionIfExceptionCaughtDuringInit() {
        ProcessorNode processorNode = new ProcessorNode("name", new ExceptionalProcessor(), Collections.emptySet());
        Assert.assertThrows(StreamsException.class, () -> {
            processorNode.init((InternalProcessorContext) null);
        });
    }

    @Test
    public void shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() {
        ProcessorNode processorNode = new ProcessorNode("name", new ExceptionalProcessor(), Collections.emptySet());
        Assert.assertThrows(StreamsException.class, () -> {
            processorNode.init((InternalProcessorContext) null);
        });
    }

    @Test
    public void testMetricsWithBuiltInMetricsVersionLatest() {
        Metrics metrics = new Metrics();
        InternalMockProcessorContext internalMockProcessorContext = new InternalMockProcessorContext(new StreamsMetricsImpl(metrics, "test-client", "latest", new MockTime()));
        ProcessorNode processorNode = new ProcessorNode("name", new NoOpProcessor(), Collections.emptySet());
        processorNode.init(internalMockProcessorContext);
        String name = Thread.currentThread().getName();
        String[] strArr = {"process", "punctuate", "create", "destroy"};
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("processor-node-id", processorNode.name());
        linkedHashMap.put("task-id", internalMockProcessorContext.taskId().toString());
        linkedHashMap.put("client-id", name);
        for (String str : strArr) {
            Assert.assertFalse(StreamsTestUtils.containsMetric(metrics, str + "-latency-avg", "stream-processor-node-metrics", linkedHashMap));
            Assert.assertFalse(StreamsTestUtils.containsMetric(metrics, str + "-latency-max", "stream-processor-node-metrics", linkedHashMap));
            Assert.assertFalse(StreamsTestUtils.containsMetric(metrics, str + "-rate", "stream-processor-node-metrics", linkedHashMap));
            Assert.assertFalse(StreamsTestUtils.containsMetric(metrics, str + "-total", "stream-processor-node-metrics", linkedHashMap));
        }
        linkedHashMap.put("processor-node-id", "all");
        for (String str2 : strArr) {
            Assert.assertFalse(StreamsTestUtils.containsMetric(metrics, str2 + "-latency-avg", "stream-processor-node-metrics", linkedHashMap));
            Assert.assertFalse(StreamsTestUtils.containsMetric(metrics, str2 + "-latency-max", "stream-processor-node-metrics", linkedHashMap));
            Assert.assertFalse(StreamsTestUtils.containsMetric(metrics, str2 + "-rate", "stream-processor-node-metrics", linkedHashMap));
            Assert.assertFalse(StreamsTestUtils.containsMetric(metrics, str2 + "-total", "stream-processor-node-metrics", linkedHashMap));
        }
    }

    @Test
    public void testTopologyLevelClassCastException() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("streams-plaintext-input").flatMapValues(str -> {
            return Collections.singletonList("");
        });
        Topology build = streamsBuilder.build();
        Properties properties = new Properties();
        properties.put("default.key.serde", Serdes.ByteArraySerde.class);
        properties.put("default.value.serde", Serdes.ByteArraySerde.class);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(build, properties);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("streams-plaintext-input", new StringSerializer(), new StringSerializer());
                String message = Assert.assertThrows(StreamsException.class, () -> {
                    createInputTopic.pipeInput("a-key", "a value");
                }).getMessage();
                Assert.assertTrue("Error about class cast with serdes", message.contains("ClassCastException"));
                Assert.assertTrue("Error about class cast with serdes", message.contains("Serdes"));
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testTopologyLevelConfigException() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("streams-plaintext-input").flatMapValues(str -> {
            return Collections.singletonList("");
        });
        Topology build = streamsBuilder.build();
        StreamsException assertThrows = Assert.assertThrows(StreamsException.class, () -> {
            new TopologyTestDriver(build);
        });
        MatcherAssert.assertThat(assertThrows.getMessage(), CoreMatchers.containsString("Failed to initialize key serdes for source node"));
        MatcherAssert.assertThat(assertThrows.getCause().getMessage(), CoreMatchers.containsString("Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG"));
    }

    @Test
    public void testTopologyLevelClassCastExceptionDirect() {
        InternalMockProcessorContext internalMockProcessorContext = new InternalMockProcessorContext(new StreamsMetricsImpl(new Metrics(), "test-client", "latest", new MockTime()));
        ProcessorNode processorNode = new ProcessorNode("pname", new ClassCastProcessor(), Collections.emptySet());
        processorNode.init(internalMockProcessorContext);
        StreamsException assertThrows = Assert.assertThrows(StreamsException.class, () -> {
            processorNode.process(new Record("aKey", "aValue", 0L));
        });
        MatcherAssert.assertThat(assertThrows.getCause(), CoreMatchers.instanceOf(ClassCastException.class));
        MatcherAssert.assertThat(assertThrows.getMessage(), CoreMatchers.containsString("default Serdes"));
        MatcherAssert.assertThat(assertThrows.getMessage(), CoreMatchers.containsString("input types"));
        MatcherAssert.assertThat(assertThrows.getMessage(), CoreMatchers.containsString("pname"));
    }
}
