package org.apache.kafka.streams.state.internals;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.test.MockInternalProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@ExtendWith({MockitoExtension.class})
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
/* loaded from: input_file:org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.class */
public class RocksDBTimeOrderedKeyValueBufferTest {
    public RocksDBTimeOrderedKeyValueBuffer<String, String> buffer;

    @Mock
    public SerdeGetter serdeGetter;
    public InternalProcessorContext<String, String> context;
    public StreamsMetricsImpl streamsMetrics;

    @Mock
    public Sensor sensor;
    public long offset;

    @BeforeEach
    public void setUp() {
        Metrics metrics = new Metrics();
        this.offset = 0L;
        this.streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", "processId", new MockTime());
        this.context = new MockInternalProcessorContext(StreamsTestUtils.getStreamsConfig(), new TaskId(0, 0), TestUtils.tempDirectory());
    }

    private void createBuffer(Duration duration, Serde<String> serde) {
        RocksDBTimeOrderedKeyValueBytesStore rocksDBTimeOrderedKeyValueBytesStore = new RocksDBTimeOrderedKeyValueBytesStoreSupplier("testing").get();
        this.buffer = new RocksDBTimeOrderedKeyValueBuffer<>(rocksDBTimeOrderedKeyValueBytesStore, serde, serde, duration, "testing", false);
        this.buffer.setSerdesIfNull(this.serdeGetter);
        this.buffer.init(this.context, rocksDBTimeOrderedKeyValueBytesStore);
    }

    private boolean pipeRecord(String str, String str2, long j) {
        Record record = new Record(str, str2, j);
        InternalProcessorContext<String, String> internalProcessorContext = this.context;
        long j2 = this.offset;
        this.offset = j2 + 1;
        internalProcessorContext.setRecordContext(new ProcessorRecordContext(j, j2, 0, "testing", new RecordHeaders()));
        return this.buffer.put(j, record, this.context.recordContext());
    }

    @Test
    public void shouldReturnIfRecordWasAdded() {
        Mockito.when(this.serdeGetter.keySerde()).thenReturn(new Serdes.StringSerde());
        Mockito.when(this.serdeGetter.valueSerde()).thenReturn(new Serdes.StringSerde());
        createBuffer(Duration.ofMillis(1L), null);
        MatcherAssert.assertThat(Boolean.valueOf(pipeRecord("K", "V", 2L)), Matchers.equalTo(true));
        MatcherAssert.assertThat(Boolean.valueOf(pipeRecord("K", "V", 0L)), Matchers.equalTo(false));
    }

    @Test
    public void shouldPutInBufferAndUpdateFields() {
        createBuffer(Duration.ofMinutes(1L), Serdes.String());
        assertNumSizeAndTimestamp(this.buffer, 0, Long.MAX_VALUE, 0L);
        pipeRecord("1", "0", 0L);
        assertNumSizeAndTimestamp(this.buffer, 1, 0L, 42L);
        pipeRecord("3", "0", 2L);
        assertNumSizeAndTimestamp(this.buffer, 2, 0L, 84L);
    }

    @Test
    public void shouldAddAndEvictRecord() {
        createBuffer(Duration.ZERO, Serdes.String());
        AtomicInteger atomicInteger = new AtomicInteger(0);
        pipeRecord("1", "0", 0L);
        assertNumSizeAndTimestamp(this.buffer, 1, 0L, 42L);
        this.buffer.evictWhile(() -> {
            return Boolean.valueOf(this.buffer.numRecords() > 0);
        }, eviction -> {
            atomicInteger.getAndIncrement();
        });
        assertNumSizeAndTimestamp(this.buffer, 0, Long.MAX_VALUE, 0L);
        MatcherAssert.assertThat(Integer.valueOf(atomicInteger.get()), Matchers.equalTo(1));
    }

    @Test
    public void shouldAddAndEvictRecordTwice() {
        Mockito.when(this.serdeGetter.keySerde()).thenReturn(new Serdes.StringSerde());
        Mockito.when(this.serdeGetter.valueSerde()).thenReturn(new Serdes.StringSerde());
        createBuffer(Duration.ZERO, null);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        pipeRecord("1", "0", 0L);
        assertNumSizeAndTimestamp(this.buffer, 1, 0L, 42L);
        this.buffer.evictWhile(() -> {
            return Boolean.valueOf(this.buffer.numRecords() > 0);
        }, eviction -> {
            atomicInteger.getAndIncrement();
        });
        assertNumSizeAndTimestamp(this.buffer, 0, Long.MAX_VALUE, 0L);
        MatcherAssert.assertThat(Integer.valueOf(atomicInteger.get()), Matchers.equalTo(1));
        pipeRecord("2", "0", 1L);
        assertNumSizeAndTimestamp(this.buffer, 1, 1L, 42L);
        this.buffer.evictWhile(() -> {
            return Boolean.valueOf(this.buffer.numRecords() > 0);
        }, eviction2 -> {
            atomicInteger.getAndIncrement();
        });
        assertNumSizeAndTimestamp(this.buffer, 0, Long.MAX_VALUE, 0L);
        MatcherAssert.assertThat(Integer.valueOf(atomicInteger.get()), Matchers.equalTo(2));
    }

    @Test
    public void shouldAddAndEvictRecordTwiceWithNonZeroGrace() {
        createBuffer(Duration.ofMillis(1L), Serdes.String());
        AtomicInteger atomicInteger = new AtomicInteger(0);
        pipeRecord("1", "0", 0L);
        this.buffer.evictWhile(() -> {
            return Boolean.valueOf(this.buffer.numRecords() > 0);
        }, eviction -> {
            atomicInteger.getAndIncrement();
        });
        assertNumSizeAndTimestamp(this.buffer, 1, 0L, 42L);
        MatcherAssert.assertThat(Integer.valueOf(atomicInteger.get()), Matchers.equalTo(0));
        pipeRecord("2", "0", 1L);
        this.buffer.evictWhile(() -> {
            return Boolean.valueOf(this.buffer.numRecords() > 0);
        }, eviction2 -> {
            atomicInteger.getAndIncrement();
        });
        assertNumSizeAndTimestamp(this.buffer, 1, 1L, 42L);
        MatcherAssert.assertThat(Integer.valueOf(atomicInteger.get()), Matchers.equalTo(1));
    }

    @Test
    public void shouldAddRecordsTwiceAndEvictRecordsOnce() {
        Mockito.when(this.serdeGetter.keySerde()).thenReturn(new Serdes.StringSerde());
        Mockito.when(this.serdeGetter.valueSerde()).thenReturn(new Serdes.StringSerde());
        createBuffer(Duration.ZERO, null);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        pipeRecord("1", "0", 0L);
        this.buffer.evictWhile(() -> {
            return Boolean.valueOf(this.buffer.numRecords() > 1);
        }, eviction -> {
            atomicInteger.getAndIncrement();
        });
        MatcherAssert.assertThat(Integer.valueOf(atomicInteger.get()), Matchers.equalTo(0));
        pipeRecord("2", "0", 1L);
        this.buffer.evictWhile(() -> {
            return Boolean.valueOf(this.buffer.numRecords() > 0);
        }, eviction2 -> {
            atomicInteger.getAndIncrement();
        });
        MatcherAssert.assertThat(Integer.valueOf(atomicInteger.get()), Matchers.equalTo(2));
    }

    @Test
    public void shouldDropLateRecords() {
        Mockito.when(this.serdeGetter.keySerde()).thenReturn(new Serdes.StringSerde());
        Mockito.when(this.serdeGetter.valueSerde()).thenReturn(new Serdes.StringSerde());
        createBuffer(Duration.ZERO, null);
        pipeRecord("1", "0", 1L);
        assertNumSizeAndTimestamp(this.buffer, 1, 1L, 42L);
        pipeRecord("2", "0", 0L);
        assertNumSizeAndTimestamp(this.buffer, 1, 1L, 42L);
    }

    @Test
    public void shouldDropLateRecordsWithNonZeroGrace() {
        createBuffer(Duration.ofMillis(1L), Serdes.String());
        pipeRecord("1", "0", 2L);
        assertNumSizeAndTimestamp(this.buffer, 1, 2L, 42L);
        pipeRecord("2", "0", 1L);
        assertNumSizeAndTimestamp(this.buffer, 2, 1L, 84L);
        pipeRecord("3", "0", 0L);
        assertNumSizeAndTimestamp(this.buffer, 2, 1L, 84L);
    }

    @Test
    public void shouldHandleCollidingKeys() {
        Mockito.when(this.serdeGetter.keySerde()).thenReturn(new Serdes.StringSerde());
        Mockito.when(this.serdeGetter.valueSerde()).thenReturn(new Serdes.StringSerde());
        createBuffer(Duration.ofMillis(1L), null);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        pipeRecord("2", "0", 0L);
        this.buffer.evictWhile(() -> {
            return Boolean.valueOf(this.buffer.numRecords() > 0);
        }, eviction -> {
            atomicInteger.getAndIncrement();
        });
        MatcherAssert.assertThat(Integer.valueOf(atomicInteger.get()), Matchers.equalTo(0));
        assertNumSizeAndTimestamp(this.buffer, 1, 0L, 42L);
        pipeRecord("2", "2", 0L);
        this.buffer.evictWhile(() -> {
            return Boolean.valueOf(this.buffer.numRecords() > 0);
        }, eviction2 -> {
            atomicInteger.getAndIncrement();
        });
        MatcherAssert.assertThat(Integer.valueOf(atomicInteger.get()), Matchers.equalTo(0));
        assertNumSizeAndTimestamp(this.buffer, 2, 0L, 84L);
        pipeRecord("1", "0", 7L);
        assertNumSizeAndTimestamp(this.buffer, 3, 0L, 126L);
        this.buffer.evictWhile(() -> {
            return Boolean.valueOf(this.buffer.numRecords() > 0);
        }, eviction3 -> {
            atomicInteger.getAndIncrement();
        });
        MatcherAssert.assertThat(Integer.valueOf(atomicInteger.get()), Matchers.equalTo(2));
        assertNumSizeAndTimestamp(this.buffer, 1, 7L, 42L);
    }

    private void assertNumSizeAndTimestamp(TimeOrderedKeyValueBuffer<String, String, String> timeOrderedKeyValueBuffer, int i, long j, long j2) {
        MatcherAssert.assertThat(Integer.valueOf(timeOrderedKeyValueBuffer.numRecords()), Matchers.equalTo(Integer.valueOf(i)));
        MatcherAssert.assertThat(Long.valueOf(timeOrderedKeyValueBuffer.minTimestamp()), Matchers.equalTo(Long.valueOf(j)));
        MatcherAssert.assertThat(Long.valueOf(timeOrderedKeyValueBuffer.bufferSize()), Matchers.equalTo(Long.valueOf(j2)));
    }
}
