package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.class */
public class LogicalKeyValueSegmentsTest {
    private static final long SEGMENT_INTERVAL = 100;
    private static final long RETENTION_PERIOD = 400;
    private static final String STORE_NAME = "logical-segments";
    private static final String METRICS_SCOPE = "metrics-scope";
    private static final String DB_FILE_DIR = "rocksdb";
    private InternalMockProcessorContext context;
    private LogicalKeyValueSegments segments;

    @BeforeEach
    public void setUp() {
        this.context = new InternalMockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.Long(), new MockRecordCollector(), new ThreadCache(new LogContext("testCache "), 0L, new MockStreamsMetrics(new Metrics())));
        this.segments = new LogicalKeyValueSegments(STORE_NAME, DB_FILE_DIR, RETENTION_PERIOD, SEGMENT_INTERVAL, new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME));
        this.segments.setPosition(Position.emptyPosition());
        this.segments.openExisting(this.context, 0L);
    }

    @AfterEach
    public void tearDown() {
        this.segments.close();
    }

    @Test
    public void shouldGetSegmentIdsFromTimestamp() {
        Assertions.assertEquals(0L, this.segments.segmentId(0L));
        Assertions.assertEquals(1L, this.segments.segmentId(SEGMENT_INTERVAL));
        Assertions.assertEquals(2L, this.segments.segmentId(200L));
        Assertions.assertEquals(3L, this.segments.segmentId(300L));
    }

    @Test
    public void shouldCreateSegments() {
        LogicalKeyValueSegment orCreateSegmentIfLive = this.segments.getOrCreateSegmentIfLive(0L, this.context, 0L);
        LogicalKeyValueSegment orCreateSegmentIfLive2 = this.segments.getOrCreateSegmentIfLive(1L, this.context, SEGMENT_INTERVAL);
        LogicalKeyValueSegment orCreateSegmentIfLive3 = this.segments.getOrCreateSegmentIfLive(2L, this.context, 200L);
        Assertions.assertTrue(new File(new File(this.context.stateDir(), DB_FILE_DIR), STORE_NAME).isDirectory());
        Assertions.assertTrue(orCreateSegmentIfLive.isOpen());
        Assertions.assertTrue(orCreateSegmentIfLive2.isOpen());
        Assertions.assertTrue(orCreateSegmentIfLive3.isOpen());
    }

    @Test
    public void shouldNotCreateSegmentThatIsAlreadyExpired() {
        Assertions.assertNull(this.segments.getOrCreateSegmentIfLive(0L, this.context, updateStreamTimeAndCreateSegment(7)));
    }

    @Test
    public void shouldCreateReservedSegments() {
        LogicalKeyValueSegment createReservedSegment = this.segments.createReservedSegment(-1L, "reserved-1");
        LogicalKeyValueSegment createReservedSegment2 = this.segments.createReservedSegment(-2L, "reserved-2");
        Assertions.assertTrue(new File(new File(this.context.stateDir(), DB_FILE_DIR), STORE_NAME).isDirectory());
        Assertions.assertTrue(createReservedSegment.isOpen());
        Assertions.assertTrue(createReservedSegment2.isOpen());
    }

    @Test
    public void shouldNotCreateReservedSegmentWithNonNegativeId() {
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.segments.createReservedSegment(0L, "reserved");
        });
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.segments.createReservedSegment(1L, "reserved");
        });
    }

    @Test
    public void shouldNotCreateReservedSegmentFromRegularMethod() {
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.segments.getOrCreateSegmentIfLive(-1L, this.context, 0L);
        });
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.segments.getOrCreateSegment(-1L, this.context);
        });
    }

    @Test
    public void shouldCleanupSegmentsThatHaveExpired() {
        this.segments.getOrCreateSegmentIfLive(0L, this.context, 0L);
        this.segments.getOrCreateSegmentIfLive(2L, this.context, 200L);
        LogicalKeyValueSegment orCreateSegmentIfLive = this.segments.getOrCreateSegmentIfLive(3L, this.context, 300L);
        LogicalKeyValueSegment orCreateSegmentIfLive2 = this.segments.getOrCreateSegmentIfLive(7L, this.context, 700L);
        this.segments.cleanupExpiredSegments(700L);
        List allSegments = this.segments.allSegments(true);
        Assertions.assertEquals(2, allSegments.size());
        Assertions.assertEquals(orCreateSegmentIfLive, allSegments.get(0));
        Assertions.assertEquals(orCreateSegmentIfLive2, allSegments.get(1));
    }

    @Test
    public void shouldNotCleanUpReservedSegments() {
        LogicalKeyValueSegment createReservedSegment = this.segments.createReservedSegment(-1L, "reserved");
        this.segments.getOrCreateSegmentIfLive(1L, this.context, SEGMENT_INTERVAL);
        LogicalKeyValueSegment orCreateSegmentIfLive = this.segments.getOrCreateSegmentIfLive(2L, this.context, 200L);
        this.segments.cleanupExpiredSegments(600L);
        List allSegments = this.segments.allSegments(true);
        Assertions.assertEquals(1, allSegments.size());
        Assertions.assertEquals(orCreateSegmentIfLive, allSegments.get(0));
        Assertions.assertEquals(createReservedSegment, this.segments.getReservedSegment(-1L));
    }

    @Test
    public void shouldGetSegmentForTimestamp() {
        LogicalKeyValueSegment orCreateSegmentIfLive = this.segments.getOrCreateSegmentIfLive(0L, this.context, 0L);
        LogicalKeyValueSegment orCreateSegmentIfLive2 = this.segments.getOrCreateSegmentIfLive(1L, this.context, SEGMENT_INTERVAL);
        Assertions.assertEquals(orCreateSegmentIfLive, this.segments.segmentForTimestamp(0L));
        Assertions.assertEquals(orCreateSegmentIfLive, this.segments.segmentForTimestamp(99L));
        Assertions.assertEquals(orCreateSegmentIfLive2, this.segments.segmentForTimestamp(SEGMENT_INTERVAL));
        Assertions.assertEquals(orCreateSegmentIfLive2, this.segments.segmentForTimestamp(199L));
    }

    @Test
    public void shouldGetSegmentsWithinTimeRange() {
        this.segments.createReservedSegment(-1L, "reserved");
        long updateStreamTimeAndCreateSegment = updateStreamTimeAndCreateSegment(4);
        this.segments.getOrCreateSegmentIfLive(0L, this.context, updateStreamTimeAndCreateSegment);
        this.segments.getOrCreateSegmentIfLive(2L, this.context, updateStreamTimeAndCreateSegment);
        this.segments.getOrCreateSegmentIfLive(1L, this.context, updateStreamTimeAndCreateSegment);
        this.segments.getOrCreateSegmentIfLive(3L, this.context, updateStreamTimeAndCreateSegment);
        this.segments.getOrCreateSegmentIfLive(4L, this.context, updateStreamTimeAndCreateSegment);
        List segments = this.segments.segments(0L, 200L, true);
        Assertions.assertEquals(3, segments.size());
        Assertions.assertEquals(0L, ((LogicalKeyValueSegment) segments.get(0)).id());
        Assertions.assertEquals(1L, ((LogicalKeyValueSegment) segments.get(1)).id());
        Assertions.assertEquals(2L, ((LogicalKeyValueSegment) segments.get(2)).id());
    }

    @Test
    public void shouldGetSegmentsWithinBackwardTimeRange() {
        this.segments.createReservedSegment(-1L, "reserved");
        long updateStreamTimeAndCreateSegment = updateStreamTimeAndCreateSegment(4);
        this.segments.getOrCreateSegmentIfLive(0L, this.context, updateStreamTimeAndCreateSegment);
        this.segments.getOrCreateSegmentIfLive(2L, this.context, updateStreamTimeAndCreateSegment);
        this.segments.getOrCreateSegmentIfLive(1L, this.context, updateStreamTimeAndCreateSegment);
        this.segments.getOrCreateSegmentIfLive(3L, this.context, updateStreamTimeAndCreateSegment);
        this.segments.getOrCreateSegmentIfLive(4L, this.context, updateStreamTimeAndCreateSegment);
        List segments = this.segments.segments(0L, 200L, false);
        Assertions.assertEquals(3, segments.size());
        Assertions.assertEquals(2L, ((LogicalKeyValueSegment) segments.get(0)).id());
        Assertions.assertEquals(1L, ((LogicalKeyValueSegment) segments.get(1)).id());
        Assertions.assertEquals(0L, ((LogicalKeyValueSegment) segments.get(2)).id());
    }

    @Test
    public void shouldClearSegmentsOnClose() {
        LogicalKeyValueSegment orCreateSegmentIfLive = this.segments.getOrCreateSegmentIfLive(0L, this.context, 0L);
        LogicalKeyValueSegment createReservedSegment = this.segments.createReservedSegment(-1L, "reserved");
        orCreateSegmentIfLive.put(new Bytes("k".getBytes()), "v".getBytes());
        createReservedSegment.put(new Bytes("k".getBytes()), "v".getBytes());
        KeyValueIterator all = orCreateSegmentIfLive.all();
        KeyValueIterator all2 = createReservedSegment.all();
        Assertions.assertTrue(all.hasNext());
        Assertions.assertTrue(all2.hasNext());
        this.segments.close();
        MatcherAssert.assertThat(this.segments.segmentForTimestamp(0L), CoreMatchers.is(CoreMatchers.nullValue()));
        MatcherAssert.assertThat(this.segments.getReservedSegment(-1L), CoreMatchers.is(CoreMatchers.nullValue()));
        Objects.requireNonNull(all);
        Assertions.assertThrows(InvalidStateStoreException.class, all::hasNext);
        Objects.requireNonNull(all2);
        Assertions.assertThrows(InvalidStateStoreException.class, all2::hasNext);
    }

    private long updateStreamTimeAndCreateSegment(int i) {
        long j = SEGMENT_INTERVAL * i;
        this.segments.getOrCreateSegmentIfLive(i, this.context, j);
        return j;
    }
}
