package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorContext;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.class */
public class RocksDBWindowStoreTest {
    private final Serde<byte[]> byteArraySerde = Serdes.ByteArray();
    private final String windowName = "window";
    private final int numSegments = 3;
    private final long segmentSize = 60000;
    private final long retentionPeriod = 120000;
    private final long windowSize = 3;
    private final Serde<Integer> intSerde = Serdes.Integer();
    private final Serde<String> stringSerde = Serdes.String();
    private final StateSerdes<Integer, String> serdes = new StateSerdes<>("", this.intSerde, this.stringSerde);

    protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext processorContext) {
        WindowStore<K, V> windowStore = new RocksDBWindowStoreSupplier("window", 120000L, 3, true, this.intSerde, this.stringSerde).get();
        windowStore.init(processorContext, windowStore);
        return windowStore;
    }

    @Test
    public void testPutAndFetch() throws IOException {
        File file = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            final List<KeyValue<byte[], byte[]>> arrayList = new ArrayList<>();
            MockProcessorContext mockProcessorContext = new MockProcessorContext((KStreamTestDriver) null, file, (Serde<?>) this.byteArraySerde, (Serde<?>) this.byteArraySerde, new RecordCollector(new MockProducer(true, this.byteArraySerde.serializer(), this.byteArraySerde.serializer())) { // from class: org.apache.kafka.streams.state.internals.RocksDBWindowStoreTest.1
                public <K1, V1> void send(ProducerRecord<K1, V1> producerRecord, Serializer<K1> serializer, Serializer<V1> serializer2) {
                    arrayList.add(new KeyValue(serializer.serialize(producerRecord.topic(), producerRecord.key()), serializer2.serialize(producerRecord.topic(), producerRecord.value())));
                }
            });
            WindowStore createWindowStore = createWindowStore(mockProcessorContext);
            try {
                mockProcessorContext.setTime(59996 + 0);
                createWindowStore.put(0, "zero");
                mockProcessorContext.setTime(59996 + 1);
                createWindowStore.put(1, "one");
                mockProcessorContext.setTime(59996 + 2);
                createWindowStore.put(2, "two");
                mockProcessorContext.setTime(59996 + 3);
                mockProcessorContext.setTime(59996 + 4);
                createWindowStore.put(4, "four");
                mockProcessorContext.setTime(59996 + 5);
                createWindowStore.put(5, "five");
                Assert.assertEquals(Utils.mkList(new String[]{"zero"}), toList(createWindowStore.fetch(0, (59996 + 0) - 3, 59996 + 0 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"one"}), toList(createWindowStore.fetch(1, (59996 + 1) - 3, 59996 + 1 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"two"}), toList(createWindowStore.fetch(2, (59996 + 2) - 3, 59996 + 2 + 3)));
                Assert.assertEquals(Utils.mkList(new Object[0]), toList(createWindowStore.fetch(3, (59996 + 3) - 3, 59996 + 3 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"four"}), toList(createWindowStore.fetch(4, (59996 + 4) - 3, 59996 + 4 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"five"}), toList(createWindowStore.fetch(5, (59996 + 5) - 3, 59996 + 5 + 3)));
                mockProcessorContext.setTime(59996 + 3);
                createWindowStore.put(2, "two+1");
                mockProcessorContext.setTime(59996 + 4);
                createWindowStore.put(2, "two+2");
                mockProcessorContext.setTime(59996 + 5);
                createWindowStore.put(2, "two+3");
                mockProcessorContext.setTime(59996 + 6);
                createWindowStore.put(2, "two+4");
                mockProcessorContext.setTime(59996 + 7);
                createWindowStore.put(2, "two+5");
                mockProcessorContext.setTime(59996 + 8);
                createWindowStore.put(2, "two+6");
                Assert.assertEquals(Utils.mkList(new Object[0]), toList(createWindowStore.fetch(2, (59996 - 2) - 3, (59996 - 2) + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"two"}), toList(createWindowStore.fetch(2, (59996 - 1) - 3, (59996 - 1) + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"two", "two+1"}), toList(createWindowStore.fetch(2, 59996 - 3, 59996 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"two", "two+1", "two+2"}), toList(createWindowStore.fetch(2, (59996 + 1) - 3, 59996 + 1 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"two", "two+1", "two+2", "two+3"}), toList(createWindowStore.fetch(2, (59996 + 2) - 3, 59996 + 2 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"two", "two+1", "two+2", "two+3", "two+4"}), toList(createWindowStore.fetch(2, (59996 + 3) - 3, 59996 + 3 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"two", "two+1", "two+2", "two+3", "two+4", "two+5"}), toList(createWindowStore.fetch(2, (59996 + 4) - 3, 59996 + 4 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"}), toList(createWindowStore.fetch(2, (59996 + 5) - 3, 59996 + 5 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"two+1", "two+2", "two+3", "two+4", "two+5", "two+6"}), toList(createWindowStore.fetch(2, (59996 + 6) - 3, 59996 + 6 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"two+2", "two+3", "two+4", "two+5", "two+6"}), toList(createWindowStore.fetch(2, (59996 + 7) - 3, 59996 + 7 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"two+3", "two+4", "two+5", "two+6"}), toList(createWindowStore.fetch(2, (59996 + 8) - 3, 59996 + 8 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"two+4", "two+5", "two+6"}), toList(createWindowStore.fetch(2, (59996 + 9) - 3, 59996 + 9 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"two+5", "two+6"}), toList(createWindowStore.fetch(2, (59996 + 10) - 3, 59996 + 10 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"two+6"}), toList(createWindowStore.fetch(2, (59996 + 11) - 3, 59996 + 11 + 3)));
                Assert.assertEquals(Utils.mkList(new Object[0]), toList(createWindowStore.fetch(2, (59996 + 12) - 3, 59996 + 12 + 3)));
                createWindowStore.flush();
                Map<Integer, Set<String>> entriesByKey = entriesByKey(arrayList, 59996L);
                Assert.assertEquals(Utils.mkSet(new String[]{"zero@0"}), entriesByKey.get(0));
                Assert.assertEquals(Utils.mkSet(new String[]{"one@1"}), entriesByKey.get(1));
                Assert.assertEquals(Utils.mkSet(new String[]{"two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"}), entriesByKey.get(2));
                Assert.assertNull(entriesByKey.get(3));
                Assert.assertEquals(Utils.mkSet(new String[]{"four@4"}), entriesByKey.get(4));
                Assert.assertEquals(Utils.mkSet(new String[]{"five@5"}), entriesByKey.get(5));
                Assert.assertNull(entriesByKey.get(6));
                createWindowStore.close();
            } catch (Throwable th) {
                createWindowStore.close();
                throw th;
            }
        } finally {
            Utils.delete(file);
        }
    }

    @Test
    public void testPutAndFetchBefore() throws IOException {
        File file = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            final List<KeyValue<byte[], byte[]>> arrayList = new ArrayList<>();
            MockProcessorContext mockProcessorContext = new MockProcessorContext((KStreamTestDriver) null, file, (Serde<?>) this.byteArraySerde, (Serde<?>) this.byteArraySerde, new RecordCollector(new MockProducer(true, this.byteArraySerde.serializer(), this.byteArraySerde.serializer())) { // from class: org.apache.kafka.streams.state.internals.RocksDBWindowStoreTest.2
                public <K1, V1> void send(ProducerRecord<K1, V1> producerRecord, Serializer<K1> serializer, Serializer<V1> serializer2) {
                    arrayList.add(new KeyValue(serializer.serialize(producerRecord.topic(), producerRecord.key()), serializer2.serialize(producerRecord.topic(), producerRecord.value())));
                }
            });
            WindowStore createWindowStore = createWindowStore(mockProcessorContext);
            try {
                mockProcessorContext.setTime(59996 + 0);
                createWindowStore.put(0, "zero");
                mockProcessorContext.setTime(59996 + 1);
                createWindowStore.put(1, "one");
                mockProcessorContext.setTime(59996 + 2);
                createWindowStore.put(2, "two");
                mockProcessorContext.setTime(59996 + 3);
                mockProcessorContext.setTime(59996 + 4);
                createWindowStore.put(4, "four");
                mockProcessorContext.setTime(59996 + 5);
                createWindowStore.put(5, "five");
                Assert.assertEquals(Utils.mkList(new String[]{"zero"}), toList(createWindowStore.fetch(0, (59996 + 0) - 3, 59996 + 0)));
                Assert.assertEquals(Utils.mkList(new String[]{"one"}), toList(createWindowStore.fetch(1, (59996 + 1) - 3, 59996 + 1)));
                Assert.assertEquals(Utils.mkList(new String[]{"two"}), toList(createWindowStore.fetch(2, (59996 + 2) - 3, 59996 + 2)));
                Assert.assertEquals(Utils.mkList(new Object[0]), toList(createWindowStore.fetch(3, (59996 + 3) - 3, 59996 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"four"}), toList(createWindowStore.fetch(4, (59996 + 4) - 3, 59996 + 4)));
                Assert.assertEquals(Utils.mkList(new String[]{"five"}), toList(createWindowStore.fetch(5, (59996 + 5) - 3, 59996 + 5)));
                mockProcessorContext.setTime(59996 + 3);
                createWindowStore.put(2, "two+1");
                mockProcessorContext.setTime(59996 + 4);
                createWindowStore.put(2, "two+2");
                mockProcessorContext.setTime(59996 + 5);
                createWindowStore.put(2, "two+3");
                mockProcessorContext.setTime(59996 + 6);
                createWindowStore.put(2, "two+4");
                mockProcessorContext.setTime(59996 + 7);
                createWindowStore.put(2, "two+5");
                mockProcessorContext.setTime(59996 + 8);
                createWindowStore.put(2, "two+6");
                Assert.assertEquals(Utils.mkList(new Object[0]), toList(createWindowStore.fetch(2, (59996 - 1) - 3, 59996 - 1)));
                Assert.assertEquals(Utils.mkList(new Object[0]), toList(createWindowStore.fetch(2, (59996 + 0) - 3, 59996 + 0)));
                Assert.assertEquals(Utils.mkList(new Object[0]), toList(createWindowStore.fetch(2, (59996 + 1) - 3, 59996 + 1)));
                Assert.assertEquals(Utils.mkList(new String[]{"two"}), toList(createWindowStore.fetch(2, (59996 + 2) - 3, 59996 + 2)));
                Assert.assertEquals(Utils.mkList(new String[]{"two", "two+1"}), toList(createWindowStore.fetch(2, (59996 + 3) - 3, 59996 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"two", "two+1", "two+2"}), toList(createWindowStore.fetch(2, (59996 + 4) - 3, 59996 + 4)));
                Assert.assertEquals(Utils.mkList(new String[]{"two", "two+1", "two+2", "two+3"}), toList(createWindowStore.fetch(2, (59996 + 5) - 3, 59996 + 5)));
                Assert.assertEquals(Utils.mkList(new String[]{"two+1", "two+2", "two+3", "two+4"}), toList(createWindowStore.fetch(2, (59996 + 6) - 3, 59996 + 6)));
                Assert.assertEquals(Utils.mkList(new String[]{"two+2", "two+3", "two+4", "two+5"}), toList(createWindowStore.fetch(2, (59996 + 7) - 3, 59996 + 7)));
                Assert.assertEquals(Utils.mkList(new String[]{"two+3", "two+4", "two+5", "two+6"}), toList(createWindowStore.fetch(2, (59996 + 8) - 3, 59996 + 8)));
                Assert.assertEquals(Utils.mkList(new String[]{"two+4", "two+5", "two+6"}), toList(createWindowStore.fetch(2, (59996 + 9) - 3, 59996 + 9)));
                Assert.assertEquals(Utils.mkList(new String[]{"two+5", "two+6"}), toList(createWindowStore.fetch(2, (59996 + 10) - 3, 59996 + 10)));
                Assert.assertEquals(Utils.mkList(new String[]{"two+6"}), toList(createWindowStore.fetch(2, (59996 + 11) - 3, 59996 + 11)));
                Assert.assertEquals(Utils.mkList(new Object[0]), toList(createWindowStore.fetch(2, (59996 + 12) - 3, 59996 + 12)));
                Assert.assertEquals(Utils.mkList(new Object[0]), toList(createWindowStore.fetch(2, (59996 + 13) - 3, 59996 + 13)));
                createWindowStore.flush();
                Map<Integer, Set<String>> entriesByKey = entriesByKey(arrayList, 59996L);
                Assert.assertEquals(Utils.mkSet(new String[]{"zero@0"}), entriesByKey.get(0));
                Assert.assertEquals(Utils.mkSet(new String[]{"one@1"}), entriesByKey.get(1));
                Assert.assertEquals(Utils.mkSet(new String[]{"two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"}), entriesByKey.get(2));
                Assert.assertNull(entriesByKey.get(3));
                Assert.assertEquals(Utils.mkSet(new String[]{"four@4"}), entriesByKey.get(4));
                Assert.assertEquals(Utils.mkSet(new String[]{"five@5"}), entriesByKey.get(5));
                Assert.assertNull(entriesByKey.get(6));
                createWindowStore.close();
            } catch (Throwable th) {
                createWindowStore.close();
                throw th;
            }
        } finally {
            Utils.delete(file);
        }
    }

    @Test
    public void testPutAndFetchAfter() throws IOException {
        File file = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            final List<KeyValue<byte[], byte[]>> arrayList = new ArrayList<>();
            MockProcessorContext mockProcessorContext = new MockProcessorContext((KStreamTestDriver) null, file, (Serde<?>) this.byteArraySerde, (Serde<?>) this.byteArraySerde, new RecordCollector(new MockProducer(true, this.byteArraySerde.serializer(), this.byteArraySerde.serializer())) { // from class: org.apache.kafka.streams.state.internals.RocksDBWindowStoreTest.3
                public <K1, V1> void send(ProducerRecord<K1, V1> producerRecord, Serializer<K1> serializer, Serializer<V1> serializer2) {
                    arrayList.add(new KeyValue(serializer.serialize(producerRecord.topic(), producerRecord.key()), serializer2.serialize(producerRecord.topic(), producerRecord.value())));
                }
            });
            WindowStore createWindowStore = createWindowStore(mockProcessorContext);
            try {
                mockProcessorContext.setTime(59996 + 0);
                createWindowStore.put(0, "zero");
                mockProcessorContext.setTime(59996 + 1);
                createWindowStore.put(1, "one");
                mockProcessorContext.setTime(59996 + 2);
                createWindowStore.put(2, "two");
                mockProcessorContext.setTime(59996 + 3);
                mockProcessorContext.setTime(59996 + 4);
                createWindowStore.put(4, "four");
                mockProcessorContext.setTime(59996 + 5);
                createWindowStore.put(5, "five");
                Assert.assertEquals(Utils.mkList(new String[]{"zero"}), toList(createWindowStore.fetch(0, 59996 + 0, 59996 + 0 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"one"}), toList(createWindowStore.fetch(1, 59996 + 1, 59996 + 1 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"two"}), toList(createWindowStore.fetch(2, 59996 + 2, 59996 + 2 + 3)));
                Assert.assertEquals(Utils.mkList(new Object[0]), toList(createWindowStore.fetch(3, 59996 + 3, 59996 + 3 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"four"}), toList(createWindowStore.fetch(4, 59996 + 4, 59996 + 4 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"five"}), toList(createWindowStore.fetch(5, 59996 + 5, 59996 + 5 + 3)));
                mockProcessorContext.setTime(59996 + 3);
                createWindowStore.put(2, "two+1");
                mockProcessorContext.setTime(59996 + 4);
                createWindowStore.put(2, "two+2");
                mockProcessorContext.setTime(59996 + 5);
                createWindowStore.put(2, "two+3");
                mockProcessorContext.setTime(59996 + 6);
                createWindowStore.put(2, "two+4");
                mockProcessorContext.setTime(59996 + 7);
                createWindowStore.put(2, "two+5");
                mockProcessorContext.setTime(59996 + 8);
                createWindowStore.put(2, "two+6");
                Assert.assertEquals(Utils.mkList(new Object[0]), toList(createWindowStore.fetch(2, 59996 - 2, (59996 - 2) + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"two"}), toList(createWindowStore.fetch(2, 59996 - 1, (59996 - 1) + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"two", "two+1"}), toList(createWindowStore.fetch(2, 59996L, 59996 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"two", "two+1", "two+2"}), toList(createWindowStore.fetch(2, 59996 + 1, 59996 + 1 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"two", "two+1", "two+2", "two+3"}), toList(createWindowStore.fetch(2, 59996 + 2, 59996 + 2 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"two+1", "two+2", "two+3", "two+4"}), toList(createWindowStore.fetch(2, 59996 + 3, 59996 + 3 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"two+2", "two+3", "two+4", "two+5"}), toList(createWindowStore.fetch(2, 59996 + 4, 59996 + 4 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"two+3", "two+4", "two+5", "two+6"}), toList(createWindowStore.fetch(2, 59996 + 5, 59996 + 5 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"two+4", "two+5", "two+6"}), toList(createWindowStore.fetch(2, 59996 + 6, 59996 + 6 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"two+5", "two+6"}), toList(createWindowStore.fetch(2, 59996 + 7, 59996 + 7 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"two+6"}), toList(createWindowStore.fetch(2, 59996 + 8, 59996 + 8 + 3)));
                Assert.assertEquals(Utils.mkList(new Object[0]), toList(createWindowStore.fetch(2, 59996 + 9, 59996 + 9 + 3)));
                Assert.assertEquals(Utils.mkList(new Object[0]), toList(createWindowStore.fetch(2, 59996 + 10, 59996 + 10 + 3)));
                Assert.assertEquals(Utils.mkList(new Object[0]), toList(createWindowStore.fetch(2, 59996 + 11, 59996 + 11 + 3)));
                Assert.assertEquals(Utils.mkList(new Object[0]), toList(createWindowStore.fetch(2, 59996 + 12, 59996 + 12 + 3)));
                createWindowStore.flush();
                Map<Integer, Set<String>> entriesByKey = entriesByKey(arrayList, 59996L);
                Assert.assertEquals(Utils.mkSet(new String[]{"zero@0"}), entriesByKey.get(0));
                Assert.assertEquals(Utils.mkSet(new String[]{"one@1"}), entriesByKey.get(1));
                Assert.assertEquals(Utils.mkSet(new String[]{"two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"}), entriesByKey.get(2));
                Assert.assertNull(entriesByKey.get(3));
                Assert.assertEquals(Utils.mkSet(new String[]{"four@4"}), entriesByKey.get(4));
                Assert.assertEquals(Utils.mkSet(new String[]{"five@5"}), entriesByKey.get(5));
                Assert.assertNull(entriesByKey.get(6));
                createWindowStore.close();
            } catch (Throwable th) {
                createWindowStore.close();
                throw th;
            }
        } finally {
            Utils.delete(file);
        }
    }

    @Test
    public void testPutSameKeyTimestamp() throws IOException {
        File file = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            final List<KeyValue<byte[], byte[]>> arrayList = new ArrayList<>();
            MockProcessorContext mockProcessorContext = new MockProcessorContext((KStreamTestDriver) null, file, (Serde<?>) this.byteArraySerde, (Serde<?>) this.byteArraySerde, new RecordCollector(new MockProducer(true, this.byteArraySerde.serializer(), this.byteArraySerde.serializer())) { // from class: org.apache.kafka.streams.state.internals.RocksDBWindowStoreTest.4
                public <K1, V1> void send(ProducerRecord<K1, V1> producerRecord, Serializer<K1> serializer, Serializer<V1> serializer2) {
                    arrayList.add(new KeyValue(serializer.serialize(producerRecord.topic(), producerRecord.key()), serializer2.serialize(producerRecord.topic(), producerRecord.value())));
                }
            });
            WindowStore createWindowStore = createWindowStore(mockProcessorContext);
            try {
                mockProcessorContext.setTime(59996L);
                createWindowStore.put(0, "zero");
                Assert.assertEquals(Utils.mkList(new String[]{"zero"}), toList(createWindowStore.fetch(0, 59996 - 3, 59996 + 3)));
                mockProcessorContext.setTime(59996L);
                createWindowStore.put(0, "zero");
                mockProcessorContext.setTime(59996L);
                createWindowStore.put(0, "zero+");
                mockProcessorContext.setTime(59996L);
                createWindowStore.put(0, "zero++");
                Assert.assertEquals(Utils.mkList(new String[]{"zero", "zero", "zero+", "zero++"}), toList(createWindowStore.fetch(0, 59996 - 3, 59996 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"zero", "zero", "zero+", "zero++"}), toList(createWindowStore.fetch(0, (59996 + 1) - 3, 59996 + 1 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"zero", "zero", "zero+", "zero++"}), toList(createWindowStore.fetch(0, (59996 + 2) - 3, 59996 + 2 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"zero", "zero", "zero+", "zero++"}), toList(createWindowStore.fetch(0, (59996 + 3) - 3, 59996 + 3 + 3)));
                Assert.assertEquals(Utils.mkList(new Object[0]), toList(createWindowStore.fetch(0, (59996 + 4) - 3, 59996 + 4 + 3)));
                createWindowStore.flush();
                Assert.assertEquals(Utils.mkSet(new String[]{"zero@0", "zero@0", "zero+@0", "zero++@0"}), entriesByKey(arrayList, 59996L).get(0));
                createWindowStore.close();
            } catch (Throwable th) {
                createWindowStore.close();
                throw th;
            }
        } finally {
            Utils.delete(file);
        }
    }

    @Test
    public void testRolling() throws IOException {
        File file = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            final ArrayList arrayList = new ArrayList();
            MockProcessorContext mockProcessorContext = new MockProcessorContext((KStreamTestDriver) null, file, (Serde<?>) this.byteArraySerde, (Serde<?>) this.byteArraySerde, new RecordCollector(new MockProducer(true, this.byteArraySerde.serializer(), this.byteArraySerde.serializer())) { // from class: org.apache.kafka.streams.state.internals.RocksDBWindowStoreTest.5
                public <K1, V1> void send(ProducerRecord<K1, V1> producerRecord, Serializer<K1> serializer, Serializer<V1> serializer2) {
                    arrayList.add(new KeyValue(serializer.serialize(producerRecord.topic(), producerRecord.key()), serializer2.serialize(producerRecord.topic(), producerRecord.value())));
                }
            });
            MeteredWindowStore createWindowStore = createWindowStore(mockProcessorContext);
            RocksDBWindowStore inner = createWindowStore.inner();
            try {
                mockProcessorContext.setTime(120000L);
                createWindowStore.put(0, "zero");
                Assert.assertEquals(Utils.mkSet(new Long[]{2L}), inner.segmentIds());
                mockProcessorContext.setTime(120000 + 30000);
                createWindowStore.put(1, "one");
                Assert.assertEquals(Utils.mkSet(new Long[]{2L}), inner.segmentIds());
                mockProcessorContext.setTime(120000 + (30000 * 2));
                createWindowStore.put(2, "two");
                Assert.assertEquals(Utils.mkSet(new Long[]{2L, 3L}), inner.segmentIds());
                mockProcessorContext.setTime(120000 + (30000 * 3));
                Assert.assertEquals(Utils.mkSet(new Long[]{2L, 3L}), inner.segmentIds());
                mockProcessorContext.setTime(120000 + (30000 * 4));
                createWindowStore.put(4, "four");
                Assert.assertEquals(Utils.mkSet(new Long[]{2L, 3L, 4L}), inner.segmentIds());
                mockProcessorContext.setTime(120000 + (30000 * 5));
                createWindowStore.put(5, "five");
                Assert.assertEquals(Utils.mkSet(new Long[]{2L, 3L, 4L}), inner.segmentIds());
                Assert.assertEquals(Utils.mkList(new String[]{"zero"}), toList(createWindowStore.fetch(0, 120000 - 3, 120000 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"one"}), toList(createWindowStore.fetch(1, (120000 + 30000) - 3, 120000 + 30000 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"two"}), toList(createWindowStore.fetch(2, (120000 + (30000 * 2)) - 3, 120000 + (30000 * 2) + 3)));
                Assert.assertEquals(Utils.mkList(new Object[0]), toList(createWindowStore.fetch(3, (120000 + (30000 * 3)) - 3, 120000 + (30000 * 3) + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"four"}), toList(createWindowStore.fetch(4, (120000 + (30000 * 4)) - 3, 120000 + (30000 * 4) + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"five"}), toList(createWindowStore.fetch(5, (120000 + (30000 * 5)) - 3, 120000 + (30000 * 5) + 3)));
                mockProcessorContext.setTime(120000 + (30000 * 6));
                createWindowStore.put(6, "six");
                Assert.assertEquals(Utils.mkSet(new Long[]{3L, 4L, 5L}), inner.segmentIds());
                Assert.assertEquals(Utils.mkList(new Object[0]), toList(createWindowStore.fetch(0, 120000 - 3, 120000 + 3)));
                Assert.assertEquals(Utils.mkList(new Object[0]), toList(createWindowStore.fetch(1, (120000 + 30000) - 3, 120000 + 30000 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"two"}), toList(createWindowStore.fetch(2, (120000 + (30000 * 2)) - 3, 120000 + (30000 * 2) + 3)));
                Assert.assertEquals(Utils.mkList(new Object[0]), toList(createWindowStore.fetch(3, (120000 + (30000 * 3)) - 3, 120000 + (30000 * 3) + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"four"}), toList(createWindowStore.fetch(4, (120000 + (30000 * 4)) - 3, 120000 + (30000 * 4) + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"five"}), toList(createWindowStore.fetch(5, (120000 + (30000 * 5)) - 3, 120000 + (30000 * 5) + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"six"}), toList(createWindowStore.fetch(6, (120000 + (30000 * 6)) - 3, 120000 + (30000 * 6) + 3)));
                mockProcessorContext.setTime(120000 + (30000 * 7));
                createWindowStore.put(7, "seven");
                Assert.assertEquals(Utils.mkSet(new Long[]{3L, 4L, 5L}), inner.segmentIds());
                Assert.assertEquals(Utils.mkList(new Object[0]), toList(createWindowStore.fetch(0, 120000 - 3, 120000 + 3)));
                Assert.assertEquals(Utils.mkList(new Object[0]), toList(createWindowStore.fetch(1, (120000 + 30000) - 3, 120000 + 30000 + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"two"}), toList(createWindowStore.fetch(2, (120000 + (30000 * 2)) - 3, 120000 + (30000 * 2) + 3)));
                Assert.assertEquals(Utils.mkList(new Object[0]), toList(createWindowStore.fetch(3, (120000 + (30000 * 3)) - 3, 120000 + (30000 * 3) + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"four"}), toList(createWindowStore.fetch(4, (120000 + (30000 * 4)) - 3, 120000 + (30000 * 4) + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"five"}), toList(createWindowStore.fetch(5, (120000 + (30000 * 5)) - 3, 120000 + (30000 * 5) + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"six"}), toList(createWindowStore.fetch(6, (120000 + (30000 * 6)) - 3, 120000 + (30000 * 6) + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"seven"}), toList(createWindowStore.fetch(7, (120000 + (30000 * 7)) - 3, 120000 + (30000 * 7) + 3)));
                mockProcessorContext.setTime(120000 + (30000 * 8));
                createWindowStore.put(8, "eight");
                Assert.assertEquals(Utils.mkSet(new Long[]{4L, 5L, 6L}), inner.segmentIds());
                Assert.assertEquals(Utils.mkList(new Object[0]), toList(createWindowStore.fetch(0, 120000 - 3, 120000 + 3)));
                Assert.assertEquals(Utils.mkList(new Object[0]), toList(createWindowStore.fetch(1, (120000 + 30000) - 3, 120000 + 30000 + 3)));
                Assert.assertEquals(Utils.mkList(new Object[0]), toList(createWindowStore.fetch(2, (120000 + (30000 * 2)) - 3, 120000 + (30000 * 2) + 3)));
                Assert.assertEquals(Utils.mkList(new Object[0]), toList(createWindowStore.fetch(3, (120000 + (30000 * 3)) - 3, 120000 + (30000 * 3) + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"four"}), toList(createWindowStore.fetch(4, (120000 + (30000 * 4)) - 3, 120000 + (30000 * 4) + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"five"}), toList(createWindowStore.fetch(5, (120000 + (30000 * 5)) - 3, 120000 + (30000 * 5) + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"six"}), toList(createWindowStore.fetch(6, (120000 + (30000 * 6)) - 3, 120000 + (30000 * 6) + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"seven"}), toList(createWindowStore.fetch(7, (120000 + (30000 * 7)) - 3, 120000 + (30000 * 7) + 3)));
                Assert.assertEquals(Utils.mkList(new String[]{"eight"}), toList(createWindowStore.fetch(8, (120000 + (30000 * 8)) - 3, 120000 + (30000 * 8) + 3)));
                createWindowStore.flush();
                Assert.assertEquals(Utils.mkSet(new String[]{inner.segmentName(4L), inner.segmentName(5L), inner.segmentName(6L)}), segmentDirs(file));
                createWindowStore.close();
            } catch (Throwable th) {
                createWindowStore.close();
                throw th;
            }
        } finally {
            Utils.delete(file);
        }
    }

    @Test
    public void testRestore() throws IOException {
        final ArrayList arrayList = new ArrayList();
        File file = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            MockProcessorContext mockProcessorContext = new MockProcessorContext((KStreamTestDriver) null, file, (Serde<?>) this.byteArraySerde, (Serde<?>) this.byteArraySerde, new RecordCollector(new MockProducer(true, this.byteArraySerde.serializer(), this.byteArraySerde.serializer())) { // from class: org.apache.kafka.streams.state.internals.RocksDBWindowStoreTest.6
                public <K1, V1> void send(ProducerRecord<K1, V1> producerRecord, Serializer<K1> serializer, Serializer<V1> serializer2) {
                    arrayList.add(new KeyValue(serializer.serialize(producerRecord.topic(), producerRecord.key()), serializer2.serialize(producerRecord.topic(), producerRecord.value())));
                }
            });
            MeteredWindowStore createWindowStore = createWindowStore(mockProcessorContext);
            try {
                mockProcessorContext.setTime(120000L);
                createWindowStore.put(0, "zero");
                mockProcessorContext.setTime(120000 + 30000);
                createWindowStore.put(1, "one");
                mockProcessorContext.setTime(120000 + (30000 * 2));
                createWindowStore.put(2, "two");
                mockProcessorContext.setTime(120000 + (30000 * 3));
                createWindowStore.put(3, "three");
                mockProcessorContext.setTime(120000 + (30000 * 4));
                createWindowStore.put(4, "four");
                mockProcessorContext.setTime(120000 + (30000 * 5));
                createWindowStore.put(5, "five");
                mockProcessorContext.setTime(120000 + (30000 * 6));
                createWindowStore.put(6, "six");
                mockProcessorContext.setTime(120000 + (30000 * 7));
                createWindowStore.put(7, "seven");
                mockProcessorContext.setTime(120000 + (30000 * 8));
                createWindowStore.put(8, "eight");
                createWindowStore.flush();
                createWindowStore.close();
                file = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
                try {
                    MockProcessorContext mockProcessorContext2 = new MockProcessorContext((KStreamTestDriver) null, file, (Serde<?>) this.byteArraySerde, (Serde<?>) this.byteArraySerde, new RecordCollector(new MockProducer(true, this.byteArraySerde.serializer(), this.byteArraySerde.serializer())) { // from class: org.apache.kafka.streams.state.internals.RocksDBWindowStoreTest.7
                        public <K1, V1> void send(ProducerRecord<K1, V1> producerRecord, Serializer<K1> serializer, Serializer<V1> serializer2) {
                            arrayList.add(new KeyValue(serializer.serialize(producerRecord.topic(), producerRecord.key()), serializer2.serialize(producerRecord.topic(), producerRecord.value())));
                        }
                    });
                    createWindowStore = createWindowStore(mockProcessorContext2);
                    RocksDBWindowStore inner = createWindowStore.inner();
                    try {
                        mockProcessorContext2.restore("window", arrayList);
                        Assert.assertEquals(Utils.mkSet(new Long[]{4L, 5L, 6L}), inner.segmentIds());
                        Assert.assertEquals(Utils.mkList(new Object[0]), toList(createWindowStore.fetch(0, 120000 - 3, 120000 + 3)));
                        Assert.assertEquals(Utils.mkList(new Object[0]), toList(createWindowStore.fetch(1, (120000 + 30000) - 3, 120000 + 30000 + 3)));
                        Assert.assertEquals(Utils.mkList(new Object[0]), toList(createWindowStore.fetch(2, (120000 + (30000 * 2)) - 3, 120000 + (30000 * 2) + 3)));
                        Assert.assertEquals(Utils.mkList(new Object[0]), toList(createWindowStore.fetch(3, (120000 + (30000 * 3)) - 3, 120000 + (30000 * 3) + 3)));
                        Assert.assertEquals(Utils.mkList(new String[]{"four"}), toList(createWindowStore.fetch(4, (120000 + (30000 * 4)) - 3, 120000 + (30000 * 4) + 3)));
                        Assert.assertEquals(Utils.mkList(new String[]{"five"}), toList(createWindowStore.fetch(5, (120000 + (30000 * 5)) - 3, 120000 + (30000 * 5) + 3)));
                        Assert.assertEquals(Utils.mkList(new String[]{"six"}), toList(createWindowStore.fetch(6, (120000 + (30000 * 6)) - 3, 120000 + (30000 * 6) + 3)));
                        Assert.assertEquals(Utils.mkList(new String[]{"seven"}), toList(createWindowStore.fetch(7, (120000 + (30000 * 7)) - 3, 120000 + (30000 * 7) + 3)));
                        Assert.assertEquals(Utils.mkList(new String[]{"eight"}), toList(createWindowStore.fetch(8, (120000 + (30000 * 8)) - 3, 120000 + (30000 * 8) + 3)));
                        createWindowStore.flush();
                        Assert.assertEquals(Utils.mkSet(new String[]{inner.segmentName(4L), inner.segmentName(5L), inner.segmentName(6L)}), segmentDirs(file));
                        createWindowStore.close();
                        Utils.delete(file);
                    } finally {
                    }
                } finally {
                    Utils.delete(file);
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testSegmentMaintenance() throws IOException {
        File file = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            MockProcessorContext mockProcessorContext = new MockProcessorContext((KStreamTestDriver) null, file, (Serde<?>) this.byteArraySerde, (Serde<?>) this.byteArraySerde, new RecordCollector(new MockProducer(true, this.byteArraySerde.serializer(), this.byteArraySerde.serializer())) { // from class: org.apache.kafka.streams.state.internals.RocksDBWindowStoreTest.8
                public <K1, V1> void send(ProducerRecord<K1, V1> producerRecord, Serializer<K1> serializer, Serializer<V1> serializer2) {
                }
            });
            MeteredWindowStore createWindowStore = createWindowStore(mockProcessorContext);
            RocksDBWindowStore inner = createWindowStore.inner();
            try {
                mockProcessorContext.setTime(0L);
                createWindowStore.put(0, "v");
                Assert.assertEquals(Utils.mkSet(new String[]{inner.segmentName(0L)}), segmentDirs(file));
                mockProcessorContext.setTime(59999L);
                createWindowStore.put(0, "v");
                mockProcessorContext.setTime(59999L);
                createWindowStore.put(0, "v");
                Assert.assertEquals(Utils.mkSet(new String[]{inner.segmentName(0L)}), segmentDirs(file));
                mockProcessorContext.setTime(60000L);
                createWindowStore.put(0, "v");
                Assert.assertEquals(Utils.mkSet(new String[]{inner.segmentName(0L), inner.segmentName(1L)}), segmentDirs(file));
                WindowStoreIterator fetch = createWindowStore.fetch(0, 0L, 240000L);
                int i = 0;
                while (fetch.hasNext()) {
                    fetch.next();
                    i++;
                }
                Assert.assertEquals(4L, i);
                Assert.assertEquals(Utils.mkSet(new String[]{inner.segmentName(0L), inner.segmentName(1L)}), segmentDirs(file));
                mockProcessorContext.setTime(180000L);
                createWindowStore.put(0, "v");
                WindowStoreIterator fetch2 = createWindowStore.fetch(0, 0L, 240000L);
                int i2 = 0;
                while (fetch2.hasNext()) {
                    fetch2.next();
                    i2++;
                }
                Assert.assertEquals(2L, i2);
                Assert.assertEquals(Utils.mkSet(new String[]{inner.segmentName(1L), inner.segmentName(2L), inner.segmentName(3L)}), segmentDirs(file));
                mockProcessorContext.setTime(300000L);
                createWindowStore.put(0, "v");
                WindowStoreIterator fetch3 = createWindowStore.fetch(0, 240000L, 1000000L);
                int i3 = 0;
                while (fetch3.hasNext()) {
                    fetch3.next();
                    i3++;
                }
                Assert.assertEquals(1L, i3);
                Assert.assertEquals(Utils.mkSet(new String[]{inner.segmentName(3L), inner.segmentName(4L), inner.segmentName(5L)}), segmentDirs(file));
                createWindowStore.close();
            } catch (Throwable th) {
                createWindowStore.close();
                throw th;
            }
        } finally {
            Utils.delete(file);
        }
    }

    @Test
    public void testInitialLoading() throws IOException {
        File file = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            MockProcessorContext mockProcessorContext = new MockProcessorContext((KStreamTestDriver) null, file, (Serde<?>) this.byteArraySerde, (Serde<?>) this.byteArraySerde, new RecordCollector(new MockProducer(true, this.byteArraySerde.serializer(), this.byteArraySerde.serializer())) { // from class: org.apache.kafka.streams.state.internals.RocksDBWindowStoreTest.9
                public <K1, V1> void send(ProducerRecord<K1, V1> producerRecord, Serializer<K1> serializer, Serializer<V1> serializer2) {
                }
            });
            File file2 = new File(file, "window");
            MeteredWindowStore createWindowStore = createWindowStore(mockProcessorContext);
            RocksDBWindowStore inner = createWindowStore.inner();
            try {
                new File(file2, inner.segmentName(0L)).mkdir();
                new File(file2, inner.segmentName(1L)).mkdir();
                new File(file2, inner.segmentName(2L)).mkdir();
                new File(file2, inner.segmentName(3L)).mkdir();
                new File(file2, inner.segmentName(4L)).mkdir();
                new File(file2, inner.segmentName(5L)).mkdir();
                new File(file2, inner.segmentName(6L)).mkdir();
                createWindowStore.close();
                createWindowStore = createWindowStore(mockProcessorContext);
                RocksDBWindowStore inner2 = createWindowStore.inner();
                try {
                    Assert.assertEquals(Utils.mkSet(new String[]{inner2.segmentName(4L), inner2.segmentName(5L), inner2.segmentName(6L)}), segmentDirs(file));
                    WindowStoreIterator fetch = createWindowStore.fetch(0, 0L, 1000000L);
                    while (fetch.hasNext()) {
                        fetch.next();
                    }
                    Assert.assertEquals(Utils.mkSet(new String[]{inner2.segmentName(4L), inner2.segmentName(5L), inner2.segmentName(6L)}), segmentDirs(file));
                    createWindowStore.close();
                } finally {
                }
            } finally {
            }
        } finally {
            Utils.delete(file);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <E> List<E> toList(WindowStoreIterator<E> windowStoreIterator) {
        ArrayList arrayList = new ArrayList();
        while (windowStoreIterator.hasNext()) {
            arrayList.add(((KeyValue) windowStoreIterator.next()).value);
        }
        return arrayList;
    }

    private Set<String> segmentDirs(File file) {
        return new HashSet(Arrays.asList(new File(file, "window").list()));
    }

    private Map<Integer, Set<String>> entriesByKey(List<KeyValue<byte[], byte[]>> list, long j) {
        HashMap hashMap = new HashMap();
        for (KeyValue<byte[], byte[]> keyValue : list) {
            long timestampFromBinaryKey = WindowStoreUtils.timestampFromBinaryKey((byte[]) keyValue.key);
            Integer num = (Integer) WindowStoreUtils.keyFromBinaryKey((byte[]) keyValue.key, this.serdes);
            String str = keyValue.value == null ? null : (String) this.serdes.valueFrom((byte[]) keyValue.value);
            Set set = (Set) hashMap.get(num);
            if (set == null) {
                set = new HashSet();
                hashMap.put(num, set);
            }
            set.add(str + "@" + (timestampFromBinaryKey - j));
        }
        return hashMap;
    }
}
