package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.class */
public abstract class AbstractRocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
    private static final String STORE_NAME = "rocksDB window store";
    private static final String METRICS_SCOPE = "test-state-id";
    private final KeyValueSegments segments = new KeyValueSegments(STORE_NAME, METRICS_SCOPE, 120000, 60000);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest$StoreType.class */
    public enum StoreType {
        RocksDBWindowStore,
        RocksDBTimeOrderedWindowStoreWithIndex,
        RocksDBTimeOrderedWindowStoreWithoutIndex
    }

    abstract StoreType storeType();

    @Override // org.apache.kafka.streams.state.internals.AbstractWindowBytesStoreTest
    <K, V> WindowStore<K, V> buildWindowStore(long j, long j2, boolean z, Serde<K> serde, Serde<V> serde2) {
        switch (storeType()) {
            case RocksDBWindowStore:
                return Stores.windowStoreBuilder(Stores.persistentWindowStore(STORE_NAME, Duration.ofMillis(j), Duration.ofMillis(j2), z), serde, serde2).build();
            case RocksDBTimeOrderedWindowStoreWithIndex:
                return Stores.windowStoreBuilder(new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(STORE_NAME, j, Math.max(j / 2, 60000L), j2, z, true), serde, serde2).build();
            case RocksDBTimeOrderedWindowStoreWithoutIndex:
                return Stores.windowStoreBuilder(new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(STORE_NAME, j, Math.max(j / 2, 60000L), j2, z, false), serde, serde2).build();
            default:
                throw new IllegalStateException("Unknown StoreType: " + String.valueOf(storeType()));
        }
    }

    @Test
    public void shouldOnlyIterateOpenSegments() {
        this.windowStore.put(1, "one", 0L);
        long j = 0 + 60000;
        this.windowStore.put(1, "two", j);
        long j2 = j + 60000;
        this.windowStore.put(1, "three", j2);
        WindowStoreIterator fetch = this.windowStore.fetch(1, 0L, j2);
        try {
            this.windowStore.put(1, "four", j2 + 60000);
            Assertions.assertEquals(new KeyValue(60000L, "two"), fetch.next());
            Assertions.assertEquals(new KeyValue(120000L, "three"), fetch.next());
            Assertions.assertFalse(fetch.hasNext());
            if (fetch != null) {
                fetch.close();
            }
        } catch (Throwable th) {
            if (fetch != null) {
                try {
                    fetch.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testRolling() {
        this.windowStore.put(0, "zero", 120000L);
        Assertions.assertEquals(Set.of(this.segments.segmentName(2L)), segmentDirs(this.baseDir));
        this.windowStore.put(1, "one", 150000L);
        Assertions.assertEquals(Set.of(this.segments.segmentName(2L)), segmentDirs(this.baseDir));
        this.windowStore.put(2, "two", 180000L);
        Assertions.assertEquals(Set.of(this.segments.segmentName(2L), this.segments.segmentName(3L)), segmentDirs(this.baseDir));
        this.windowStore.put(4, "four", 240000L);
        Assertions.assertEquals(Set.of(this.segments.segmentName(2L), this.segments.segmentName(3L), this.segments.segmentName(4L)), segmentDirs(this.baseDir));
        this.windowStore.put(5, "five", 270000L);
        Assertions.assertEquals(Set.of(this.segments.segmentName(2L), this.segments.segmentName(3L), this.segments.segmentName(4L)), segmentDirs(this.baseDir));
        Assertions.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(0, Instant.ofEpochMilli(119997L), Instant.ofEpochMilli(120003L))));
        if (storeType() == StoreType.RocksDBWindowStore) {
            Assertions.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(1, Instant.ofEpochMilli(149997L), Instant.ofEpochMilli(150003L))));
        } else {
            Assertions.assertEquals(new HashSet(Collections.singletonList("one")), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(1, Instant.ofEpochMilli(149997L), Instant.ofEpochMilli(150003L))));
        }
        Assertions.assertEquals(new HashSet(Collections.singletonList("two")), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(2, Instant.ofEpochMilli(179997L), Instant.ofEpochMilli(180003L))));
        Assertions.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(3, Instant.ofEpochMilli(209997L), Instant.ofEpochMilli(210003L))));
        Assertions.assertEquals(new HashSet(Collections.singletonList("four")), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(4, Instant.ofEpochMilli(239997L), Instant.ofEpochMilli(240003L))));
        Assertions.assertEquals(new HashSet(Collections.singletonList("five")), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(5, Instant.ofEpochMilli(269997L), Instant.ofEpochMilli(270003L))));
        this.windowStore.put(6, "six", 300000L);
        Assertions.assertEquals(Set.of(this.segments.segmentName(3L), this.segments.segmentName(4L), this.segments.segmentName(5L)), segmentDirs(this.baseDir));
        Assertions.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(0, Instant.ofEpochMilli(119997L), Instant.ofEpochMilli(120003L))));
        Assertions.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(1, Instant.ofEpochMilli(149997L), Instant.ofEpochMilli(150003L))));
        if (storeType() == StoreType.RocksDBWindowStore) {
            Assertions.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(2, Instant.ofEpochMilli(179997L), Instant.ofEpochMilli(180003L))));
        } else {
            Assertions.assertEquals(new HashSet(Collections.singletonList("two")), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(2, Instant.ofEpochMilli(179997L), Instant.ofEpochMilli(180003L))));
        }
        Assertions.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(3, Instant.ofEpochMilli(209997L), Instant.ofEpochMilli(210003L))));
        Assertions.assertEquals(new HashSet(Collections.singletonList("four")), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(4, Instant.ofEpochMilli(239997L), Instant.ofEpochMilli(240003L))));
        Assertions.assertEquals(new HashSet(Collections.singletonList("five")), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(5, Instant.ofEpochMilli(269997L), Instant.ofEpochMilli(270003L))));
        Assertions.assertEquals(new HashSet(Collections.singletonList("six")), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(6, Instant.ofEpochMilli(299997L), Instant.ofEpochMilli(300003L))));
        this.windowStore.put(7, "seven", 330000L);
        Assertions.assertEquals(Set.of(this.segments.segmentName(3L), this.segments.segmentName(4L), this.segments.segmentName(5L)), segmentDirs(this.baseDir));
        Assertions.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(0, Instant.ofEpochMilli(119997L), Instant.ofEpochMilli(120003L))));
        Assertions.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(1, Instant.ofEpochMilli(149997L), Instant.ofEpochMilli(150003L))));
        Assertions.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(2, Instant.ofEpochMilli(179997L), Instant.ofEpochMilli(180003L))));
        Assertions.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(3, Instant.ofEpochMilli(209997L), Instant.ofEpochMilli(210003L))));
        Assertions.assertEquals(new HashSet(Collections.singletonList("four")), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(4, Instant.ofEpochMilli(239997L), Instant.ofEpochMilli(240003L))));
        Assertions.assertEquals(new HashSet(Collections.singletonList("five")), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(5, Instant.ofEpochMilli(269997L), Instant.ofEpochMilli(270003L))));
        Assertions.assertEquals(new HashSet(Collections.singletonList("six")), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(6, Instant.ofEpochMilli(299997L), Instant.ofEpochMilli(300003L))));
        Assertions.assertEquals(new HashSet(Collections.singletonList("seven")), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(7, Instant.ofEpochMilli(329997L), Instant.ofEpochMilli(330003L))));
        this.windowStore.put(8, "eight", 360000L);
        Assertions.assertEquals(Set.of(this.segments.segmentName(4L), this.segments.segmentName(5L), this.segments.segmentName(6L)), segmentDirs(this.baseDir));
        Assertions.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(0, Instant.ofEpochMilli(119997L), Instant.ofEpochMilli(120003L))));
        Assertions.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(1, Instant.ofEpochMilli(149997L), Instant.ofEpochMilli(150003L))));
        Assertions.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(2, Instant.ofEpochMilli(179997L), Instant.ofEpochMilli(180003L))));
        Assertions.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(3, Instant.ofEpochMilli(209997L), Instant.ofEpochMilli(210003L))));
        if (storeType() == StoreType.RocksDBWindowStore) {
            Assertions.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(4, Instant.ofEpochMilli(239997L), Instant.ofEpochMilli(240003L))));
        } else {
            Assertions.assertEquals(new HashSet(Collections.singletonList("four")), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(4, Instant.ofEpochMilli(239997L), Instant.ofEpochMilli(240003L))));
        }
        Assertions.assertEquals(new HashSet(Collections.singletonList("five")), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(5, Instant.ofEpochMilli(269997L), Instant.ofEpochMilli(270003L))));
        Assertions.assertEquals(new HashSet(Collections.singletonList("six")), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(6, Instant.ofEpochMilli(299997L), Instant.ofEpochMilli(300003L))));
        Assertions.assertEquals(new HashSet(Collections.singletonList("seven")), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(7, Instant.ofEpochMilli(329997L), Instant.ofEpochMilli(330003L))));
        Assertions.assertEquals(new HashSet(Collections.singletonList("eight")), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(8, Instant.ofEpochMilli(359997L), Instant.ofEpochMilli(360003L))));
        this.windowStore.flush();
        Assertions.assertEquals(Set.of(this.segments.segmentName(4L), this.segments.segmentName(5L), this.segments.segmentName(6L)), segmentDirs(this.baseDir));
    }

    @Test
    public void testSegmentMaintenance() {
        this.windowStore.close();
        this.windowStore = buildWindowStore(120000L, 3L, true, Serdes.Integer(), Serdes.String());
        this.windowStore.init(this.context, this.windowStore);
        this.context.setTime(0L);
        this.windowStore.put(0, "v", 0L);
        Assertions.assertEquals(Set.of(this.segments.segmentName(0L)), segmentDirs(this.baseDir));
        this.windowStore.put(0, "v", 59999L);
        this.windowStore.put(0, "v", 59999L);
        Assertions.assertEquals(Set.of(this.segments.segmentName(0L)), segmentDirs(this.baseDir));
        this.windowStore.put(0, "v", 60000L);
        Assertions.assertEquals(Set.of(this.segments.segmentName(0L), this.segments.segmentName(1L)), segmentDirs(this.baseDir));
        WindowStoreIterator fetch = this.windowStore.fetch(0, Instant.ofEpochMilli(0L), Instant.ofEpochMilli(240000L));
        int i = 0;
        while (fetch.hasNext()) {
            try {
                fetch.next();
                i++;
            } finally {
            }
        }
        if (fetch != null) {
            fetch.close();
        }
        Assertions.assertEquals(4, i);
        Assertions.assertEquals(Set.of(this.segments.segmentName(0L), this.segments.segmentName(1L)), segmentDirs(this.baseDir));
        this.windowStore.put(0, "v", 180000L);
        fetch = this.windowStore.fetch(0, Instant.ofEpochMilli(0L), Instant.ofEpochMilli(240000L));
        int i2 = 0;
        while (fetch.hasNext()) {
            try {
                fetch.next();
                i2++;
            } finally {
            }
        }
        if (fetch != null) {
            fetch.close();
        }
        if (storeType() == StoreType.RocksDBWindowStore) {
            Assertions.assertEquals(1, i2);
        } else {
            Assertions.assertEquals(2, i2);
        }
        Assertions.assertEquals(Set.of(this.segments.segmentName(1L), this.segments.segmentName(3L)), segmentDirs(this.baseDir));
        this.windowStore.put(0, "v", 300000L);
        fetch = this.windowStore.fetch(0, Instant.ofEpochMilli(240000L), Instant.ofEpochMilli(600000L));
        int i3 = 0;
        while (fetch.hasNext()) {
            try {
                fetch.next();
                i3++;
            } finally {
                if (fetch != null) {
                    try {
                        fetch.close();
                    } catch (Throwable th) {
                        th.addSuppressed(th);
                    }
                }
            }
        }
        if (fetch != null) {
            fetch.close();
        }
        Assertions.assertEquals(1, i3);
        Assertions.assertEquals(Set.of(this.segments.segmentName(3L), this.segments.segmentName(5L)), segmentDirs(this.baseDir));
    }

    @Test
    public void testInitialLoading() {
        File file = new File(this.baseDir, STORE_NAME);
        new File(file, this.segments.segmentName(0L)).mkdir();
        new File(file, this.segments.segmentName(1L)).mkdir();
        new File(file, this.segments.segmentName(2L)).mkdir();
        new File(file, this.segments.segmentName(3L)).mkdir();
        new File(file, this.segments.segmentName(4L)).mkdir();
        new File(file, this.segments.segmentName(5L)).mkdir();
        new File(file, this.segments.segmentName(6L)).mkdir();
        this.windowStore.close();
        this.windowStore = buildWindowStore(120000L, 3L, false, Serdes.Integer(), Serdes.String());
        this.windowStore.init(this.context, this.windowStore);
        this.windowStore.put(1, "v", 360000L);
        List asList = Arrays.asList(this.segments.segmentName(4L), this.segments.segmentName(5L), this.segments.segmentName(6L));
        asList.sort((v0, v1) -> {
            return v0.compareTo(v1);
        });
        List list = Utils.toList(segmentDirs(this.baseDir).iterator());
        list.sort((v0, v1) -> {
            return v0.compareTo(v1);
        });
        Assertions.assertEquals(asList, list);
        WindowStoreIterator fetch = this.windowStore.fetch(0, Instant.ofEpochMilli(0L), Instant.ofEpochMilli(1000000L));
        while (fetch.hasNext()) {
            try {
                fetch.next();
            } catch (Throwable th) {
                if (fetch != null) {
                    try {
                        fetch.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (fetch != null) {
            fetch.close();
        }
        Assertions.assertEquals(Set.of(this.segments.segmentName(4L), this.segments.segmentName(5L), this.segments.segmentName(6L)), segmentDirs(this.baseDir));
    }

    @Test
    public void testRestore() throws Exception {
        this.windowStore.put(0, "zero", 120000L);
        this.windowStore.put(1, "one", 150000L);
        this.windowStore.put(2, "two", 180000L);
        this.windowStore.put(3, "three", 210000L);
        this.windowStore.put(4, "four", 240000L);
        this.windowStore.put(5, "five", 270000L);
        this.windowStore.put(6, "six", 300000L);
        this.windowStore.put(7, "seven", 330000L);
        this.windowStore.put(8, "eight", 360000L);
        this.windowStore.flush();
        this.windowStore.close();
        Utils.delete(this.baseDir);
        this.windowStore = buildWindowStore(120000L, 3L, false, Serdes.Integer(), Serdes.String());
        this.windowStore.init(this.context, this.windowStore);
        Assertions.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(0, Instant.ofEpochMilli(119997L), Instant.ofEpochMilli(120003L))));
        Assertions.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(1, Instant.ofEpochMilli(149997L), Instant.ofEpochMilli(150003L))));
        Assertions.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(2, Instant.ofEpochMilli(179997L), Instant.ofEpochMilli(180003L))));
        Assertions.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(3, Instant.ofEpochMilli(209997L), Instant.ofEpochMilli(210003L))));
        Assertions.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(4, Instant.ofEpochMilli(239997L), Instant.ofEpochMilli(240003L))));
        Assertions.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(5, Instant.ofEpochMilli(269997L), Instant.ofEpochMilli(270003L))));
        Assertions.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(6, Instant.ofEpochMilli(299997L), Instant.ofEpochMilli(300003L))));
        Assertions.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(7, Instant.ofEpochMilli(329997L), Instant.ofEpochMilli(330003L))));
        Assertions.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(8, Instant.ofEpochMilli(359997L), Instant.ofEpochMilli(360003L))));
        ArrayList arrayList = new ArrayList();
        for (ProducerRecord<Object, Object> producerRecord : this.recordCollector.collected()) {
            arrayList.add(new KeyValue(((Bytes) producerRecord.key()).get(), (byte[]) producerRecord.value()));
        }
        this.context.restore(STORE_NAME, arrayList);
        Assertions.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(0, Instant.ofEpochMilli(119997L), Instant.ofEpochMilli(120003L))));
        Assertions.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(1, Instant.ofEpochMilli(149997L), Instant.ofEpochMilli(150003L))));
        Assertions.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(2, Instant.ofEpochMilli(179997L), Instant.ofEpochMilli(180003L))));
        Assertions.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(3, Instant.ofEpochMilli(209997L), Instant.ofEpochMilli(210003L))));
        if (storeType() == StoreType.RocksDBWindowStore) {
            Assertions.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(4, Instant.ofEpochMilli(239997L), Instant.ofEpochMilli(240003L))));
        } else {
            Assertions.assertEquals(new HashSet(Collections.singletonList("four")), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(4, Instant.ofEpochMilli(239997L), Instant.ofEpochMilli(240003L))));
        }
        Assertions.assertEquals(new HashSet(Collections.singletonList("five")), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(5, Instant.ofEpochMilli(269997L), Instant.ofEpochMilli(270003L))));
        Assertions.assertEquals(new HashSet(Collections.singletonList("six")), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(6, Instant.ofEpochMilli(299997L), Instant.ofEpochMilli(300003L))));
        Assertions.assertEquals(new HashSet(Collections.singletonList("seven")), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(7, Instant.ofEpochMilli(329997L), Instant.ofEpochMilli(330003L))));
        Assertions.assertEquals(new HashSet(Collections.singletonList("eight")), StreamsTestUtils.valuesToSetAndCloseIterator(this.windowStore.fetch(8, Instant.ofEpochMilli(359997L), Instant.ofEpochMilli(360003L))));
        this.windowStore.flush();
        Assertions.assertEquals(Set.of(this.segments.segmentName(4L), this.segments.segmentName(5L), this.segments.segmentName(6L)), segmentDirs(this.baseDir));
    }

    @Test
    public void shouldMatchPositionAfterPut() {
        WrappedStateStore wrapped = this.windowStore.wrapped().wrapped();
        this.context.setRecordContext(new ProcessorRecordContext(0L, 1L, 0, "", new RecordHeaders()));
        this.windowStore.put(0, "0", 60000L);
        this.context.setRecordContext(new ProcessorRecordContext(0L, 2L, 0, "", new RecordHeaders()));
        this.windowStore.put(1, "1", 60000L);
        this.context.setRecordContext(new ProcessorRecordContext(0L, 3L, 0, "", new RecordHeaders()));
        this.windowStore.put(2, "2", 60000L);
        this.context.setRecordContext(new ProcessorRecordContext(0L, 4L, 0, "", new RecordHeaders()));
        this.windowStore.put(3, "3", 60000L);
        Assertions.assertEquals(Position.fromMap(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("", Utils.mkMap(new Map.Entry[]{Utils.mkEntry(0, 4L)}))})), wrapped.getPosition());
    }

    private Set<String> segmentDirs(File file) {
        return new HashSet(Arrays.asList((String[]) Objects.requireNonNull(new File(file, this.windowStore.name()).list())));
    }
}
