package org.apache.kafka.streams.kstream.internals.suppress;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collection;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.TimeWindowedSerializer;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.api.MockProcessorContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueChangeBuffer;
import org.apache.kafka.test.MockInternalProcessorContext;
import org.hamcrest.BaseMatcher;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@ExtendWith({MockitoExtension.class})
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.class */
public class KTableSuppressProcessorTest {
    private static final long ARBITRARY_LONG = 5;
    private static final Change<Long> ARBITRARY_CHANGE = new Change<>(7L, 14L);

    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest$Harness.class */
    private static class Harness<K, V> {
        private final Processor<K, Change<V>, K, Change<V>> processor;
        private final MockInternalProcessorContext<K, Change<V>> context;

        Harness(Suppressed<K> suppressed, Serde<K> serde, Serde<V> serde2) {
            StateStore build = new InMemoryTimeOrderedKeyValueChangeBuffer.Builder("test-store", serde, serde2).withLoggingDisabled().build();
            Processor<K, Change<V>, K, Change<V>> processor = new KTableSuppressProcessorSupplier((SuppressedInternal) suppressed, KTableSuppressProcessorTest.mockBuilderWithName("test-store"), (KTableImpl) Mockito.mock(KTableImpl.class)).get();
            MockInternalProcessorContext<K, Change<V>> mockInternalProcessorContext = new MockInternalProcessorContext<>();
            mockInternalProcessorContext.setCurrentNode(new ProcessorNode<>("testNode"));
            build.init(mockInternalProcessorContext, build);
            processor.init(mockInternalProcessorContext);
            this.processor = processor;
            this.context = mockInternalProcessorContext;
        }
    }

    @Test
    public void zeroTimeLimitShouldImmediatelyEmit() {
        Harness harness = new Harness(Suppressed.untilTimeLimit(Duration.ZERO, Suppressed.BufferConfig.unbounded()), Serdes.String(), Serdes.Long());
        MockInternalProcessorContext<K, Change<V>> mockInternalProcessorContext = harness.context;
        mockInternalProcessorContext.setRecordMetadata("", 0, 0L);
        mockInternalProcessorContext.setTimestamp(ARBITRARY_LONG);
        Change<Long> change = ARBITRARY_CHANGE;
        harness.processor.process(new Record("hey", change, ARBITRARY_LONG));
        MatcherAssert.assertThat(mockInternalProcessorContext.forwarded(), hasSize(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), CoreMatchers.is(new Record("hey", change, ARBITRARY_LONG)));
    }

    @Test
    public void windowedZeroTimeLimitShouldImmediatelyEmit() {
        Harness harness = new Harness(Suppressed.untilTimeLimit(Duration.ZERO, Suppressed.BufferConfig.unbounded()), timeWindowedSerdeFrom(String.class, 100L), Serdes.Long());
        MockInternalProcessorContext<K, Change<V>> mockInternalProcessorContext = harness.context;
        mockInternalProcessorContext.setRecordMetadata("", 0, 0L);
        mockInternalProcessorContext.setTimestamp(ARBITRARY_LONG);
        Windowed windowed = new Windowed("hey", new TimeWindow(0L, 100L));
        Change<Long> change = ARBITRARY_CHANGE;
        harness.processor.process(new Record(windowed, change, ARBITRARY_LONG));
        MatcherAssert.assertThat(mockInternalProcessorContext.forwarded(), hasSize(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), CoreMatchers.is(new Record(windowed, change, ARBITRARY_LONG)));
    }

    @Test
    public void intermediateSuppressionShouldBufferAndEmitLater() {
        Harness harness = new Harness(Suppressed.untilTimeLimit(Duration.ofMillis(1L), Suppressed.BufferConfig.unbounded()), Serdes.String(), Serdes.Long());
        MockInternalProcessorContext<K, Change<V>> mockInternalProcessorContext = harness.context;
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        mockInternalProcessorContext.setTimestamp(0L);
        Change change = new Change((Object) null, 1L);
        harness.processor.process(new Record("hey", change, 0L));
        MatcherAssert.assertThat(mockInternalProcessorContext.forwarded(), hasSize(0));
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 1L);
        mockInternalProcessorContext.setTimestamp(1L);
        harness.processor.process(new Record("tick", new Change((Object) null, (Object) null), 1L));
        MatcherAssert.assertThat(mockInternalProcessorContext.forwarded(), hasSize(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), CoreMatchers.is(new Record("hey", change, 0L)));
    }

    @Test
    public void finalResultsSuppressionShouldBufferAndEmitAtGraceExpiration() {
        Harness harness = new Harness(finalResults(Duration.ofMillis(1L)), timeWindowedSerdeFrom(String.class, 1L), Serdes.Long());
        MockInternalProcessorContext<K, Change<V>> mockInternalProcessorContext = harness.context;
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L);
        mockInternalProcessorContext.setTimestamp(99L);
        Windowed windowed = new Windowed("hey", new TimeWindow(99L, 100L));
        Change<Long> change = ARBITRARY_CHANGE;
        harness.processor.process(new Record(windowed, change, 99L));
        MatcherAssert.assertThat(mockInternalProcessorContext.forwarded(), hasSize(0));
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 1L);
        mockInternalProcessorContext.setTimestamp(100L);
        harness.processor.process(new Record(new Windowed("dummyKey1", new TimeWindow(100L, 101L)), ARBITRARY_CHANGE, 100L));
        MatcherAssert.assertThat(mockInternalProcessorContext.forwarded(), hasSize(0));
        mockInternalProcessorContext.setRecordMetadata(AssignmentTestUtils.TOPIC_PREFIX, 0, 1L);
        mockInternalProcessorContext.setTimestamp(101L);
        harness.processor.process(new Record(new Windowed("dummyKey2", new TimeWindow(101L, 102L)), ARBITRARY_CHANGE, 101L));
        MatcherAssert.assertThat(mockInternalProcessorContext.forwarded(), hasSize(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), CoreMatchers.is(new Record(windowed, change, 99L)));
    }

    @Test
    public void finalResultsWithZeroGraceShouldStillBufferUntilTheWindowEnd() {
        Harness harness = new Harness(finalResults(Duration.ofMillis(0L)), timeWindowedSerdeFrom(String.class, 100L), Serdes.Long());
        MockInternalProcessorContext<K, Change<V>> mockInternalProcessorContext = harness.context;
        mockInternalProcessorContext.setRecordMetadata("", 0, 0L);
        mockInternalProcessorContext.setTimestamp(ARBITRARY_LONG);
        Windowed windowed = new Windowed("hey", new TimeWindow(0L, 100L));
        Change<Long> change = ARBITRARY_CHANGE;
        harness.processor.process(new Record(windowed, change, ARBITRARY_LONG));
        MatcherAssert.assertThat(mockInternalProcessorContext.forwarded(), hasSize(0));
        mockInternalProcessorContext.setRecordMetadata("", 0, 1L);
        mockInternalProcessorContext.setTimestamp(100L);
        harness.processor.process(new Record(new Windowed("dummyKey", new TimeWindow(100L, 200L)), ARBITRARY_CHANGE, 100L));
        MatcherAssert.assertThat(mockInternalProcessorContext.forwarded(), hasSize(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), CoreMatchers.is(new Record(windowed, change, ARBITRARY_LONG)));
    }

    @Test
    public void finalResultsWithZeroGraceAtWindowEndShouldImmediatelyEmit() {
        Harness harness = new Harness(finalResults(Duration.ofMillis(0L)), timeWindowedSerdeFrom(String.class, 100L), Serdes.Long());
        MockInternalProcessorContext<K, Change<V>> mockInternalProcessorContext = harness.context;
        mockInternalProcessorContext.setRecordMetadata("", 0, 0L);
        mockInternalProcessorContext.setTimestamp(100L);
        Windowed windowed = new Windowed("hey", new TimeWindow(0L, 100L));
        Change<Long> change = ARBITRARY_CHANGE;
        harness.processor.process(new Record(windowed, change, 100L));
        MatcherAssert.assertThat(mockInternalProcessorContext.forwarded(), hasSize(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), CoreMatchers.is(new Record(windowed, change, 100L)));
    }

    @Test
    public void finalResultsShouldDropTombstonesForTimeWindows() {
        Harness harness = new Harness(finalResults(Duration.ofMillis(0L)), timeWindowedSerdeFrom(String.class, 100L), Serdes.Long());
        MockInternalProcessorContext<K, Change<V>> mockInternalProcessorContext = harness.context;
        mockInternalProcessorContext.setRecordMetadata("", 0, 0L);
        mockInternalProcessorContext.setTimestamp(100L);
        harness.processor.process(new Record(new Windowed("hey", new TimeWindow(0L, 100L)), new Change((Object) null, Long.valueOf(ARBITRARY_LONG)), 100L));
        MatcherAssert.assertThat(mockInternalProcessorContext.forwarded(), hasSize(0));
    }

    @Test
    public void finalResultsShouldDropTombstonesForSessionWindows() {
        Harness harness = new Harness(finalResults(Duration.ofMillis(0L)), WindowedSerdes.sessionWindowedSerdeFrom(String.class), Serdes.Long());
        MockInternalProcessorContext<K, Change<V>> mockInternalProcessorContext = harness.context;
        mockInternalProcessorContext.setRecordMetadata("", 0, 0L);
        mockInternalProcessorContext.setTimestamp(100L);
        harness.processor.process(new Record(new Windowed("hey", new SessionWindow(0L, 0L)), new Change((Object) null, Long.valueOf(ARBITRARY_LONG)), 100L));
        MatcherAssert.assertThat(mockInternalProcessorContext.forwarded(), hasSize(0));
    }

    @Test
    public void suppressShouldNotDropTombstonesForTimeWindows() {
        Harness harness = new Harness(Suppressed.untilTimeLimit(Duration.ofMillis(0L), Suppressed.BufferConfig.maxRecords(0L)), timeWindowedSerdeFrom(String.class, 100L), Serdes.Long());
        MockInternalProcessorContext<K, Change<V>> mockInternalProcessorContext = harness.context;
        Headers add = new RecordHeaders().add("k", "v".getBytes(StandardCharsets.UTF_8));
        mockInternalProcessorContext.setRecordMetadata("", 0, 0L);
        mockInternalProcessorContext.setTimestamp(100L);
        mockInternalProcessorContext.setHeaders(add);
        Windowed windowed = new Windowed("hey", new TimeWindow(0L, 100L));
        Change change = new Change((Object) null, Long.valueOf(ARBITRARY_LONG));
        harness.processor.process(new Record(windowed, change, 100L));
        MatcherAssert.assertThat(mockInternalProcessorContext.forwarded(), hasSize(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), CoreMatchers.is(new Record(windowed, change, 100L, add)));
    }

    @Test
    public void suppressShouldNotDropTombstonesForSessionWindows() {
        Harness harness = new Harness(Suppressed.untilTimeLimit(Duration.ofMillis(0L), Suppressed.BufferConfig.maxRecords(0L)), WindowedSerdes.sessionWindowedSerdeFrom(String.class), Serdes.Long());
        MockInternalProcessorContext<K, Change<V>> mockInternalProcessorContext = harness.context;
        mockInternalProcessorContext.setRecordMetadata("", 0, 0L);
        mockInternalProcessorContext.setTimestamp(100L);
        Windowed windowed = new Windowed("hey", new SessionWindow(0L, 0L));
        Change change = new Change((Object) null, Long.valueOf(ARBITRARY_LONG));
        harness.processor.process(new Record(windowed, change, 100L));
        MatcherAssert.assertThat(mockInternalProcessorContext.forwarded(), hasSize(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), CoreMatchers.is(new Record(windowed, change, 100L)));
    }

    @Test
    public void suppressShouldNotDropTombstonesForKTable() {
        Harness harness = new Harness(Suppressed.untilTimeLimit(Duration.ofMillis(0L), Suppressed.BufferConfig.maxRecords(0L)), Serdes.String(), Serdes.Long());
        MockInternalProcessorContext<K, Change<V>> mockInternalProcessorContext = harness.context;
        mockInternalProcessorContext.setRecordMetadata("", 0, 0L);
        mockInternalProcessorContext.setTimestamp(100L);
        Change change = new Change((Object) null, Long.valueOf(ARBITRARY_LONG));
        harness.processor.process(new Record("hey", change, 100L));
        MatcherAssert.assertThat(mockInternalProcessorContext.forwarded(), hasSize(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), CoreMatchers.is(new Record("hey", change, 100L)));
    }

    @Test
    public void suppressShouldEmitWhenOverRecordCapacity() {
        Harness harness = new Harness(Suppressed.untilTimeLimit(Duration.ofDays(100L), Suppressed.BufferConfig.maxRecords(1L)), Serdes.String(), Serdes.Long());
        MockInternalProcessorContext<K, Change<V>> mockInternalProcessorContext = harness.context;
        mockInternalProcessorContext.setRecordMetadata("", 0, 0L);
        mockInternalProcessorContext.setTimestamp(100L);
        Change change = new Change((Object) null, Long.valueOf(ARBITRARY_LONG));
        harness.processor.process(new Record("hey", change, 100L));
        mockInternalProcessorContext.setRecordMetadata("", 0, 1L);
        mockInternalProcessorContext.setTimestamp(101L);
        harness.processor.process(new Record("dummyKey", change, 101L));
        MatcherAssert.assertThat(mockInternalProcessorContext.forwarded(), hasSize(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), CoreMatchers.is(new Record("hey", change, 100L)));
    }

    @Test
    public void suppressShouldEmitWhenOverByteCapacity() {
        Harness harness = new Harness(Suppressed.untilTimeLimit(Duration.ofDays(100L), Suppressed.BufferConfig.maxBytes(60L)), Serdes.String(), Serdes.Long());
        MockInternalProcessorContext<K, Change<V>> mockInternalProcessorContext = harness.context;
        mockInternalProcessorContext.setRecordMetadata("", 0, 0L);
        mockInternalProcessorContext.setTimestamp(100L);
        Change change = new Change((Object) null, Long.valueOf(ARBITRARY_LONG));
        harness.processor.process(new Record("hey", change, 100L));
        mockInternalProcessorContext.setRecordMetadata("", 0, 1L);
        mockInternalProcessorContext.setTimestamp(101L);
        harness.processor.process(new Record("dummyKey", change, 101L));
        MatcherAssert.assertThat(mockInternalProcessorContext.forwarded(), hasSize(1));
        MatcherAssert.assertThat(((MockProcessorContext.CapturedForward) mockInternalProcessorContext.forwarded().get(0)).record(), CoreMatchers.is(new Record("hey", change, 100L)));
    }

    @Test
    public void suppressShouldShutDownWhenOverRecordCapacity() {
        Harness harness = new Harness(Suppressed.untilTimeLimit(Duration.ofDays(100L), Suppressed.BufferConfig.maxRecords(1L).shutDownWhenFull()), Serdes.String(), Serdes.Long());
        MockInternalProcessorContext<K, Change<V>> mockInternalProcessorContext = harness.context;
        mockInternalProcessorContext.setRecordMetadata("", 0, 0L);
        mockInternalProcessorContext.setTimestamp(100L);
        mockInternalProcessorContext.setCurrentNode(new ProcessorNode<>("testNode"));
        Change change = new Change((Object) null, Long.valueOf(ARBITRARY_LONG));
        harness.processor.process(new Record("hey", change, 100L));
        mockInternalProcessorContext.setRecordMetadata("", 0, 1L);
        mockInternalProcessorContext.setTimestamp(100L);
        try {
            harness.processor.process(new Record("dummyKey", change, 100L));
            Assertions.fail("expected an exception");
        } catch (StreamsException e) {
            MatcherAssert.assertThat(e.getMessage(), CoreMatchers.containsString("buffer exceeded its max capacity"));
        }
    }

    @Test
    public void suppressShouldShutDownWhenOverByteCapacity() {
        Harness harness = new Harness(Suppressed.untilTimeLimit(Duration.ofDays(100L), Suppressed.BufferConfig.maxBytes(60L).shutDownWhenFull()), Serdes.String(), Serdes.Long());
        MockInternalProcessorContext<K, Change<V>> mockInternalProcessorContext = harness.context;
        mockInternalProcessorContext.setRecordMetadata("", 0, 0L);
        mockInternalProcessorContext.setTimestamp(100L);
        mockInternalProcessorContext.setCurrentNode(new ProcessorNode<>("testNode"));
        Change change = new Change((Object) null, Long.valueOf(ARBITRARY_LONG));
        harness.processor.process(new Record("hey", change, 100L));
        mockInternalProcessorContext.setRecordMetadata("", 0, 1L);
        mockInternalProcessorContext.setTimestamp(1L);
        try {
            harness.processor.process(new Record("dummyKey", change, 100L));
            Assertions.fail("expected an exception");
        } catch (StreamsException e) {
            MatcherAssert.assertThat(e.getMessage(), CoreMatchers.containsString("buffer exceeded its max capacity"));
        }
    }

    private static <K extends Windowed> SuppressedInternal<K> finalResults(Duration duration) {
        return Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()).buildFinalResultsSuppression(duration);
    }

    private static <E> Matcher<Collection<E>> hasSize(final int i) {
        return new BaseMatcher<Collection<E>>() { // from class: org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorTest.1
            public void describeTo(Description description) {
                description.appendText("a collection of size " + i);
            }

            public boolean matches(Object obj) {
                return obj != null && ((Collection) obj).size() == i;
            }
        };
    }

    private static <K> Serde<Windowed<K>> timeWindowedSerdeFrom(Class<K> cls, long j) {
        Serde serdeFrom = Serdes.serdeFrom(cls);
        return new Serdes.WrapperSerde(new TimeWindowedSerializer(serdeFrom.serializer()), new TimeWindowedDeserializer(serdeFrom.deserializer(), Long.valueOf(j)));
    }

    private static StoreBuilder<?> mockBuilderWithName(String str) {
        StoreBuilder<?> storeBuilder = (StoreBuilder) Mockito.mock(StoreBuilder.class);
        Mockito.when(storeBuilder.name()).thenReturn(str);
        return storeBuilder;
    }
}
