package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Set;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/PunctuationQueueTest.class */
public class PunctuationQueueTest {

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/PunctuationQueueTest$TestProcessor.class */
    private static class TestProcessor extends AbstractProcessor<String, String> {
        public final ArrayList<Long> punctuatedAt;

        private TestProcessor() {
            this.punctuatedAt = new ArrayList<>();
        }

        public void init(ProcessorContext processorContext) {
        }

        public void process(String str, String str2) {
        }

        public void punctuate(long j) {
            this.punctuatedAt.add(Long.valueOf(j));
        }

        public void close() {
        }
    }

    @Test
    public void testPunctuationInterval() {
        final ProcessorNode processorNode = new ProcessorNode("test", new TestProcessor(), (Set) null);
        PunctuationQueue punctuationQueue = new PunctuationQueue();
        PunctuationSchedule punctuationSchedule = new PunctuationSchedule(processorNode, 0L, 100L, new Punctuator() { // from class: org.apache.kafka.streams.processor.internals.PunctuationQueueTest.1
            public void punctuate(long j) {
                processorNode.processor().punctuate(j);
            }
        });
        long j = punctuationSchedule.timestamp - 100;
        punctuationQueue.schedule(punctuationSchedule);
        ProcessorNodePunctuator processorNodePunctuator = new ProcessorNodePunctuator() { // from class: org.apache.kafka.streams.processor.internals.PunctuationQueueTest.2
            public void punctuate(ProcessorNode processorNode2, long j2, PunctuationType punctuationType, Punctuator punctuator) {
                punctuator.punctuate(j2);
            }
        };
        punctuationQueue.mayPunctuate(j, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals(0L, r0.punctuatedAt.size());
        punctuationQueue.mayPunctuate(j + 99, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals(0L, r0.punctuatedAt.size());
        punctuationQueue.mayPunctuate(j + 100, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals(1L, r0.punctuatedAt.size());
        punctuationQueue.mayPunctuate(j + 199, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals(1L, r0.punctuatedAt.size());
        punctuationQueue.mayPunctuate(j + 200, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals(2L, r0.punctuatedAt.size());
        punctuationQueue.mayPunctuate(j + 1001, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals(3L, r0.punctuatedAt.size());
        punctuationQueue.mayPunctuate(j + 1002, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals(3L, r0.punctuatedAt.size());
        punctuationQueue.mayPunctuate(j + 1100, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals(4L, r0.punctuatedAt.size());
    }

    @Test
    public void testPunctuationIntervalCustomAlignment() {
        final ProcessorNode processorNode = new ProcessorNode("test", new TestProcessor(), (Set) null);
        PunctuationQueue punctuationQueue = new PunctuationQueue();
        PunctuationSchedule punctuationSchedule = new PunctuationSchedule(processorNode, 50L, 100L, new Punctuator() { // from class: org.apache.kafka.streams.processor.internals.PunctuationQueueTest.3
            public void punctuate(long j) {
                processorNode.processor().punctuate(j);
            }
        });
        long j = punctuationSchedule.timestamp - 50;
        punctuationQueue.schedule(punctuationSchedule);
        ProcessorNodePunctuator processorNodePunctuator = new ProcessorNodePunctuator() { // from class: org.apache.kafka.streams.processor.internals.PunctuationQueueTest.4
            public void punctuate(ProcessorNode processorNode2, long j2, PunctuationType punctuationType, Punctuator punctuator) {
                punctuator.punctuate(j2);
            }
        };
        punctuationQueue.mayPunctuate(j, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals(0L, r0.punctuatedAt.size());
        punctuationQueue.mayPunctuate(j + 49, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals(0L, r0.punctuatedAt.size());
        punctuationQueue.mayPunctuate(j + 50, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals(1L, r0.punctuatedAt.size());
        punctuationQueue.mayPunctuate(j + 149, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals(1L, r0.punctuatedAt.size());
        punctuationQueue.mayPunctuate(j + 150, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals(2L, r0.punctuatedAt.size());
        punctuationQueue.mayPunctuate(j + 1051, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals(3L, r0.punctuatedAt.size());
        punctuationQueue.mayPunctuate(j + 1052, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals(3L, r0.punctuatedAt.size());
        punctuationQueue.mayPunctuate(j + 1150, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals(4L, r0.punctuatedAt.size());
    }
}
