package org.apache.kafka.streams.kstream.internals.graph;

import java.time.Duration;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate;
import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.class */
public class GraphGraceSearchUtilTest {
    @Test
    public void shouldThrowOnNull() {
        try {
            GraphGraceSearchUtil.findAndVerifyWindowGrace((GraphNode) null);
            Assert.fail("Should have thrown.");
        } catch (TopologyException e) {
            MatcherAssert.assertThat(e.getMessage(), CoreMatchers.is("Invalid topology: Window close time is only defined for windowed computations. Got []."));
        }
    }

    @Test
    public void shouldFailIfThereIsNoGraceAncestor() {
        StatefulProcessorNode statefulProcessorNode = new StatefulProcessorNode("stateful", new ProcessorParameters(() -> {
            return new Processor<String, Long, String, Long>() { // from class: org.apache.kafka.streams.kstream.internals.graph.GraphGraceSearchUtilTest.1
                public void process(Record<String, Long> record) {
                }
            };
        }, "dummy"), (StoreFactory) null);
        ProcessorGraphNode processorGraphNode = new ProcessorGraphNode("stateless", (ProcessorParameters) null);
        statefulProcessorNode.addChild(processorGraphNode);
        try {
            GraphGraceSearchUtil.findAndVerifyWindowGrace(processorGraphNode);
            Assert.fail("should have thrown.");
        } catch (TopologyException e) {
            MatcherAssert.assertThat(e.getMessage(), CoreMatchers.is("Invalid topology: Window close time is only defined for windowed computations. Got [stateful->stateless]."));
        }
    }

    @Test
    public void shouldExtractGraceFromKStreamWindowAggregateNode() {
        TimeWindows ofSizeAndGrace = TimeWindows.ofSizeAndGrace(Duration.ofMillis(10L), Duration.ofMillis(1234L));
        MatcherAssert.assertThat(Long.valueOf(GraphGraceSearchUtil.findAndVerifyWindowGrace(new StatefulProcessorNode("asdf", new ProcessorParameters(new KStreamWindowAggregate(ofSizeAndGrace, "asdf", EmitStrategy.onWindowUpdate(), (Initializer) null, (Aggregator) null), "asdf"), (StoreFactory) null))), CoreMatchers.is(Long.valueOf(ofSizeAndGrace.gracePeriodMs())));
    }

    @Test
    public void shouldExtractGraceFromKStreamSessionWindowAggregateNode() {
        SessionWindows ofInactivityGapAndGrace = SessionWindows.ofInactivityGapAndGrace(Duration.ofMillis(10L), Duration.ofMillis(1234L));
        MatcherAssert.assertThat(Long.valueOf(GraphGraceSearchUtil.findAndVerifyWindowGrace(new StatefulProcessorNode("asdf", new ProcessorParameters(new KStreamSessionWindowAggregate(ofInactivityGapAndGrace, "asdf", EmitStrategy.onWindowUpdate(), (Initializer) null, (Aggregator) null, (Merger) null), "asdf"), (StoreFactory) null))), CoreMatchers.is(Long.valueOf(ofInactivityGapAndGrace.gracePeriodMs() + ofInactivityGapAndGrace.inactivityGap())));
    }

    @Test
    public void shouldExtractGraceFromSessionAncestorThroughStatefulParent() {
        SessionWindows ofInactivityGapAndGrace = SessionWindows.ofInactivityGapAndGrace(Duration.ofMillis(10L), Duration.ofMillis(1234L));
        StatefulProcessorNode statefulProcessorNode = new StatefulProcessorNode("asdf", new ProcessorParameters(new KStreamSessionWindowAggregate(ofInactivityGapAndGrace, "asdf", EmitStrategy.onWindowUpdate(), (Initializer) null, (Aggregator) null, (Merger) null), "asdf"), (StoreFactory) null);
        StatefulProcessorNode statefulProcessorNode2 = new StatefulProcessorNode("stateful", new ProcessorParameters(() -> {
            return new Processor<String, Long, String, Long>() { // from class: org.apache.kafka.streams.kstream.internals.graph.GraphGraceSearchUtilTest.2
                public void process(Record<String, Long> record) {
                }
            };
        }, "dummy"), (StoreFactory) null);
        statefulProcessorNode.addChild(statefulProcessorNode2);
        ProcessorGraphNode processorGraphNode = new ProcessorGraphNode("stateless", (ProcessorParameters) null);
        statefulProcessorNode2.addChild(processorGraphNode);
        MatcherAssert.assertThat(Long.valueOf(GraphGraceSearchUtil.findAndVerifyWindowGrace(processorGraphNode)), CoreMatchers.is(Long.valueOf(ofInactivityGapAndGrace.gracePeriodMs() + ofInactivityGapAndGrace.inactivityGap())));
    }

    @Test
    public void shouldExtractGraceFromSessionAncestorThroughStatelessParent() {
        SessionWindows ofInactivityGapAndGrace = SessionWindows.ofInactivityGapAndGrace(Duration.ofMillis(10L), Duration.ofMillis(1234L));
        StatefulProcessorNode statefulProcessorNode = new StatefulProcessorNode("asdf", new ProcessorParameters(new KStreamSessionWindowAggregate(ofInactivityGapAndGrace, "asdf", EmitStrategy.onWindowUpdate(), (Initializer) null, (Aggregator) null, (Merger) null), "asdf"), (StoreFactory) null);
        ProcessorGraphNode processorGraphNode = new ProcessorGraphNode("stateless", (ProcessorParameters) null);
        statefulProcessorNode.addChild(processorGraphNode);
        ProcessorGraphNode processorGraphNode2 = new ProcessorGraphNode("stateless", (ProcessorParameters) null);
        processorGraphNode.addChild(processorGraphNode2);
        MatcherAssert.assertThat(Long.valueOf(GraphGraceSearchUtil.findAndVerifyWindowGrace(processorGraphNode2)), CoreMatchers.is(Long.valueOf(ofInactivityGapAndGrace.gracePeriodMs() + ofInactivityGapAndGrace.inactivityGap())));
    }

    @Test
    public void shouldUseMaxIfMultiParentsDoNotAgreeOnGrace() {
        StatefulProcessorNode statefulProcessorNode = new StatefulProcessorNode("asdf", new ProcessorParameters(new KStreamSessionWindowAggregate(SessionWindows.ofInactivityGapAndGrace(Duration.ofMillis(10L), Duration.ofMillis(1234L)), "asdf", EmitStrategy.onWindowUpdate(), (Initializer) null, (Aggregator) null, (Merger) null), "asdf"), (StoreFactory) null);
        StatefulProcessorNode statefulProcessorNode2 = new StatefulProcessorNode("asdf", new ProcessorParameters(new KStreamWindowAggregate(TimeWindows.ofSizeAndGrace(Duration.ofMillis(10L), Duration.ofMillis(4321L)), "asdf", EmitStrategy.onWindowUpdate(), (Initializer) null, (Aggregator) null), "asdf"), (StoreFactory) null);
        ProcessorGraphNode processorGraphNode = new ProcessorGraphNode("stateless", (ProcessorParameters) null);
        statefulProcessorNode.addChild(processorGraphNode);
        statefulProcessorNode2.addChild(processorGraphNode);
        MatcherAssert.assertThat(Long.valueOf(GraphGraceSearchUtil.findAndVerifyWindowGrace(processorGraphNode)), CoreMatchers.is(4321L));
    }
}
