package org.apache.kafka.streams.processor.internals;

import java.util.Collections;
import java.util.Map;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.MockProcessorContext;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorNodeTest.class */
public class ProcessorNodeTest {

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorNodeTest$ExceptionalProcessor.class */
    private static class ExceptionalProcessor implements Processor {
        private ExceptionalProcessor() {
        }

        public void init(ProcessorContext processorContext) {
            throw new RuntimeException();
        }

        public void process(Object obj, Object obj2) {
            throw new RuntimeException();
        }

        public void punctuate(long j) {
            throw new RuntimeException();
        }

        public void close() {
            throw new RuntimeException();
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorNodeTest$NoOpProcessor.class */
    private static class NoOpProcessor implements Processor {
        private NoOpProcessor() {
        }

        public void init(ProcessorContext processorContext) {
        }

        public void process(Object obj, Object obj2) {
        }

        public void punctuate(long j) {
        }

        public void close() {
        }
    }

    @Test(expected = StreamsException.class)
    public void shouldThrowStreamsExceptionIfExceptionCaughtDuringInit() throws Exception {
        new ProcessorNode("name", new ExceptionalProcessor(), Collections.emptySet()).init((ProcessorContext) null);
    }

    @Test(expected = StreamsException.class)
    public void shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() throws Exception {
        new ProcessorNode("name", new ExceptionalProcessor(), Collections.emptySet()).close();
    }

    @Test
    public void testMetrics() {
        MockProcessorContext mockProcessorContext = new MockProcessorContext(StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class), new RecordCollectorImpl((Producer) null, (String) null));
        ProcessorNode processorNode = new ProcessorNode("name", new NoOpProcessor(), Collections.emptySet());
        processorNode.init(mockProcessorContext);
        Metrics baseMetrics = mockProcessorContext.baseMetrics();
        String str = "task." + mockProcessorContext.taskId() + "." + processorNode.name();
        String[] strArr = {"all", str};
        String[] strArr2 = {"process", "punctuate", "create", "destroy"};
        Map singletonMap = Collections.singletonMap("processor-node-id", processorNode.name());
        for (String str2 : strArr2) {
            Assert.assertNotNull(baseMetrics.getSensor(str2));
            Assert.assertNotNull(baseMetrics.getSensor(str + "-" + str2));
        }
        Assert.assertNotNull(baseMetrics.getSensor("forward"));
        for (String str3 : strArr) {
            for (String str4 : strArr2) {
                Assert.assertNotNull(baseMetrics.metrics().get(baseMetrics.metricName(str3 + "-" + str4 + "-latency-avg", "stream-processor-node-metrics", "The average latency in milliseconds of " + str3 + " " + str4 + " operation.", singletonMap)));
                Assert.assertNotNull(baseMetrics.metrics().get(baseMetrics.metricName(str3 + "-" + str4 + "-latency-max", "stream-processor-node-metrics", "The max latency in milliseconds of " + str3 + " " + str4 + " operation.", singletonMap)));
                Assert.assertNotNull(baseMetrics.metrics().get(baseMetrics.metricName(str3 + "-" + str4 + "-rate", "stream-processor-node-metrics", "The average number of occurrence of " + str3 + " " + str4 + " operation per second.", singletonMap)));
            }
            Assert.assertNotNull(baseMetrics.metrics().get(baseMetrics.metricName(str3 + "-forward-rate", "stream-processor-node-metrics", "The average number of occurrence of " + str3 + " forward operation per second.", singletonMap)));
        }
        mockProcessorContext.close();
    }
}
