package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.SimpleTimeZone;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.PrefixedSessionKeySchemas;
import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas;
import org.apache.kafka.streams.state.internals.Segment;
import org.apache.kafka.streams.state.internals.SegmentedBytesStore;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.rocksdb.WriteBatch;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.class */
public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends Segment> {
    private InternalMockProcessorContext context;
    private AbstractDualSchemaRocksDBSegmentedBytesStore<S> bytesStore;
    private File stateDir;
    private Window nextSegmentWindow;
    private Window startEdgeWindow;
    private Window endEdgeWindow;
    private final long windowSizeForTimeWindow = 500;
    private final Window[] windows = new Window[4];
    private final long startEdgeTime = 9223372036854775107L;
    private final long endEdgeTime = 9223372036854775207L;
    final long retention = 1000;
    final long segmentInterval = IntegrationTestUtils.DEFAULT_TIMEOUT;
    final String storeName = "bytes-store";

    @Before
    public void before() {
        if (getBaseSchema() instanceof PrefixedSessionKeySchemas.TimeFirstSessionKeySchema) {
            this.windows[0] = new SessionWindow(10L, 10L);
            this.windows[1] = new SessionWindow(500L, 1000L);
            this.windows[2] = new SessionWindow(1000L, 1500L);
            this.windows[3] = new SessionWindow(30000L, IntegrationTestUtils.DEFAULT_TIMEOUT);
            this.nextSegmentWindow = new SessionWindow(61000L, 61000L);
            this.startEdgeWindow = new SessionWindow(0L, 9223372036854775107L);
            this.endEdgeWindow = new SessionWindow(9223372036854775207L, Long.MAX_VALUE);
        }
        if (getBaseSchema() instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema) {
            this.windows[0] = WindowKeySchema.timeWindowForSize(10L, 500L);
            this.windows[1] = WindowKeySchema.timeWindowForSize(500L, 500L);
            this.windows[2] = WindowKeySchema.timeWindowForSize(1000L, 500L);
            this.windows[3] = WindowKeySchema.timeWindowForSize(IntegrationTestUtils.DEFAULT_TIMEOUT, 500L);
            this.nextSegmentWindow = WindowKeySchema.timeWindowForSize(61000L, 500L);
            this.startEdgeWindow = WindowKeySchema.timeWindowForSize(9223372036854775107L, 500L);
            this.endEdgeWindow = WindowKeySchema.timeWindowForSize(9223372036854775207L, 500L);
        }
        this.bytesStore = getBytesStore();
        this.stateDir = TestUtils.tempDirectory();
        this.context = new InternalMockProcessorContext(this.stateDir, Serdes.String(), Serdes.Long(), new MockRecordCollector(), new ThreadCache(new LogContext("testCache "), 0L, new MockStreamsMetrics(new Metrics())));
        this.bytesStore.init(this.context, this.bytesStore);
    }

    @After
    public void close() {
        this.bytesStore.close();
    }

    abstract AbstractDualSchemaRocksDBSegmentedBytesStore<S> getBytesStore();

    abstract AbstractSegments<S> newSegments();

    abstract SegmentedBytesStore.KeySchema getBaseSchema();

    abstract SegmentedBytesStore.KeySchema getIndexSchema();

    @Test
    public void shouldPutAndFetch() {
        KeyValueIterator<Bytes, byte[]> fetch;
        Throwable th;
        Throwable th2;
        Throwable th3;
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(10L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[1])), serializeValue(50L));
        this.bytesStore.put(serializeKey(new Windowed<>("b", this.windows[2])), serializeValue(100L));
        this.bytesStore.put(serializeKey(new Windowed<>("c", this.windows[3])), serializeValue(200L));
        KeyValueIterator<Bytes, byte[]> fetch2 = this.bytesStore.fetch(Bytes.wrap("a".getBytes()), 0L, this.windows[2].start());
        Throwable th4 = null;
        try {
            try {
                Assert.assertEquals(Collections.emptyList(), toList(fetch2));
                if (fetch2 != null) {
                    if (0 != 0) {
                        try {
                            fetch2.close();
                        } catch (Throwable th5) {
                            th4.addSuppressed(th5);
                        }
                    } else {
                        fetch2.close();
                    }
                }
                KeyValueIterator<Bytes, byte[]> fetch3 = this.bytesStore.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 0L, this.windows[2].start());
                Throwable th6 = null;
                try {
                    try {
                        Assert.assertEquals(Collections.emptyList(), toList(fetch3));
                        if (fetch3 != null) {
                            if (0 != 0) {
                                try {
                                    fetch3.close();
                                } catch (Throwable th7) {
                                    th6.addSuppressed(th7);
                                }
                            } else {
                                fetch3.close();
                            }
                        }
                        fetch = this.bytesStore.fetch((Bytes) null, Bytes.wrap("b".getBytes()), 0L, this.windows[2].start());
                        th = null;
                    } catch (Throwable th8) {
                        th6 = th8;
                        throw th8;
                    }
                } finally {
                    if (fetch3 != null) {
                        if (th6 != null) {
                            try {
                                fetch3.close();
                            } catch (Throwable th9) {
                                th6.addSuppressed(th9);
                            }
                        } else {
                            fetch3.close();
                        }
                    }
                }
            } catch (Throwable th10) {
                th4 = th10;
                throw th10;
            }
            try {
                try {
                    Assert.assertEquals(Collections.emptyList(), toList(fetch));
                    if (fetch != null) {
                        if (0 != 0) {
                            try {
                                fetch.close();
                            } catch (Throwable th11) {
                                th.addSuppressed(th11);
                            }
                        } else {
                            fetch.close();
                        }
                    }
                    fetch = this.bytesStore.fetch(Bytes.wrap("b".getBytes()), (Bytes) null, 0L, this.windows[3].start());
                    th2 = null;
                } catch (Throwable th12) {
                    th = th12;
                    throw th12;
                }
                try {
                    try {
                        Assert.assertEquals(Collections.singletonList(KeyValue.pair(new Windowed("c", this.windows[3]), 200L)), toList(fetch));
                        if (fetch != null) {
                            if (0 != 0) {
                                try {
                                    fetch.close();
                                } catch (Throwable th13) {
                                    th2.addSuppressed(th13);
                                }
                            } else {
                                fetch.close();
                            }
                        }
                        fetch = this.bytesStore.fetch((Bytes) null, (Bytes) null, 0L, this.windows[3].start());
                        th3 = null;
                    } catch (Throwable th14) {
                        th2 = th14;
                        throw th14;
                    }
                    try {
                        try {
                            Assert.assertEquals(Collections.singletonList(KeyValue.pair(new Windowed("c", this.windows[3]), 200L)), toList(fetch));
                            if (fetch != null) {
                                if (0 == 0) {
                                    fetch.close();
                                    return;
                                }
                                try {
                                    fetch.close();
                                } catch (Throwable th15) {
                                    th3.addSuppressed(th15);
                                }
                            }
                        } catch (Throwable th16) {
                            th3 = th16;
                            throw th16;
                        }
                    } finally {
                    }
                } finally {
                }
            } finally {
                if (fetch != null) {
                    if (th != null) {
                        try {
                            fetch.close();
                        } catch (Throwable th17) {
                            th.addSuppressed(th17);
                        }
                    } else {
                        fetch.close();
                    }
                }
            }
        } catch (Throwable th18) {
            if (fetch2 != null) {
                if (th4 != null) {
                    try {
                        fetch2.close();
                    } catch (Throwable th19) {
                        th4.addSuppressed(th19);
                    }
                } else {
                    fetch2.close();
                }
            }
            throw th18;
        }
    }

    @Test
    public void shouldPutAndBackwardFetch() {
        KeyValueIterator<Bytes, byte[]> backwardFetch;
        Throwable th;
        Throwable th2;
        Throwable th3;
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(10L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[1])), serializeValue(50L));
        this.bytesStore.put(serializeKey(new Windowed<>("b", this.windows[2])), serializeValue(100L));
        this.bytesStore.put(serializeKey(new Windowed<>("c", this.windows[3])), serializeValue(200L));
        KeyValueIterator<Bytes, byte[]> backwardFetch2 = this.bytesStore.backwardFetch(Bytes.wrap("a".getBytes()), 0L, this.windows[2].start());
        Throwable th4 = null;
        try {
            try {
                Assert.assertEquals(Collections.emptyList(), toList(backwardFetch2));
                if (backwardFetch2 != null) {
                    if (0 != 0) {
                        try {
                            backwardFetch2.close();
                        } catch (Throwable th5) {
                            th4.addSuppressed(th5);
                        }
                    } else {
                        backwardFetch2.close();
                    }
                }
                KeyValueIterator<Bytes, byte[]> backwardFetch3 = this.bytesStore.backwardFetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 0L, this.windows[2].start());
                Throwable th6 = null;
                try {
                    try {
                        Assert.assertEquals(Collections.emptyList(), toList(backwardFetch3));
                        if (backwardFetch3 != null) {
                            if (0 != 0) {
                                try {
                                    backwardFetch3.close();
                                } catch (Throwable th7) {
                                    th6.addSuppressed(th7);
                                }
                            } else {
                                backwardFetch3.close();
                            }
                        }
                        backwardFetch = this.bytesStore.backwardFetch((Bytes) null, Bytes.wrap("b".getBytes()), 0L, this.windows[2].start());
                        th = null;
                    } catch (Throwable th8) {
                        th6 = th8;
                        throw th8;
                    }
                } finally {
                    if (backwardFetch3 != null) {
                        if (th6 != null) {
                            try {
                                backwardFetch3.close();
                            } catch (Throwable th9) {
                                th6.addSuppressed(th9);
                            }
                        } else {
                            backwardFetch3.close();
                        }
                    }
                }
            } catch (Throwable th10) {
                th4 = th10;
                throw th10;
            }
            try {
                try {
                    Assert.assertEquals(Collections.emptyList(), toList(backwardFetch));
                    if (backwardFetch != null) {
                        if (0 != 0) {
                            try {
                                backwardFetch.close();
                            } catch (Throwable th11) {
                                th.addSuppressed(th11);
                            }
                        } else {
                            backwardFetch.close();
                        }
                    }
                    backwardFetch = this.bytesStore.backwardFetch(Bytes.wrap("b".getBytes()), (Bytes) null, 0L, this.windows[3].start());
                    th2 = null;
                } catch (Throwable th12) {
                    th = th12;
                    throw th12;
                }
                try {
                    try {
                        Assert.assertEquals(Collections.singletonList(KeyValue.pair(new Windowed("c", this.windows[3]), 200L)), toList(backwardFetch));
                        if (backwardFetch != null) {
                            if (0 != 0) {
                                try {
                                    backwardFetch.close();
                                } catch (Throwable th13) {
                                    th2.addSuppressed(th13);
                                }
                            } else {
                                backwardFetch.close();
                            }
                        }
                        backwardFetch = this.bytesStore.backwardFetch((Bytes) null, (Bytes) null, 0L, this.windows[3].start());
                        th3 = null;
                    } catch (Throwable th14) {
                        th2 = th14;
                        throw th14;
                    }
                    try {
                        try {
                            Assert.assertEquals(Collections.singletonList(KeyValue.pair(new Windowed("c", this.windows[3]), 200L)), toList(backwardFetch));
                            if (backwardFetch != null) {
                                if (0 == 0) {
                                    backwardFetch.close();
                                    return;
                                }
                                try {
                                    backwardFetch.close();
                                } catch (Throwable th15) {
                                    th3.addSuppressed(th15);
                                }
                            }
                        } catch (Throwable th16) {
                            th3 = th16;
                            throw th16;
                        }
                    } finally {
                    }
                } finally {
                }
            } finally {
                if (backwardFetch != null) {
                    if (th != null) {
                        try {
                            backwardFetch.close();
                        } catch (Throwable th17) {
                            th.addSuppressed(th17);
                        }
                    } else {
                        backwardFetch.close();
                    }
                }
            }
        } catch (Throwable th18) {
            if (backwardFetch2 != null) {
                if (th4 != null) {
                    try {
                        backwardFetch2.close();
                    } catch (Throwable th19) {
                        th4.addSuppressed(th19);
                    }
                } else {
                    backwardFetch2.close();
                }
            }
            throw th18;
        }
    }

    @Test
    public void shouldPutAndFetchEdgeSingleKey() {
        KeyValueIterator<Bytes, byte[]> fetch;
        Throwable th;
        KeyValueIterator<Bytes, byte[]> fetch2;
        Throwable th2;
        Bytes serializeKey = serializeKey(new Windowed<>("a", this.startEdgeWindow), false, Integer.MAX_VALUE);
        Bytes serializeKey2 = serializeKey(new Windowed<>("a", this.endEdgeWindow), false, Integer.MAX_VALUE);
        Bytes serializeKey3 = serializeKey(new Windowed<>("b", this.startEdgeWindow), false, Integer.MAX_VALUE);
        Bytes serializeKey4 = serializeKey(new Windowed<>("b", this.endEdgeWindow), false, Integer.MAX_VALUE);
        this.bytesStore.put(serializeKey, serializeValue(10L));
        this.bytesStore.put(serializeKey2, serializeValue(50L));
        this.bytesStore.put(serializeKey3, serializeValue(100L));
        this.bytesStore.put(serializeKey4, serializeValue(150L));
        KeyValueIterator<Bytes, byte[]> fetch3 = this.bytesStore.fetch(Bytes.wrap("a".getBytes()), 9223372036854775107L, 9223372036854775207L);
        Throwable th3 = null;
        try {
            try {
                Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", this.startEdgeWindow), 10L), KeyValue.pair(new Windowed("a", this.endEdgeWindow), 50L)), toList(fetch3));
                if (fetch3 != null) {
                    if (0 != 0) {
                        try {
                            fetch3.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    } else {
                        fetch3.close();
                    }
                }
                fetch = this.bytesStore.fetch(Bytes.wrap("b".getBytes()), 9223372036854775107L, 9223372036854775207L);
                th = null;
            } catch (Throwable th5) {
                th3 = th5;
                throw th5;
            }
            try {
                try {
                    Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("b", this.startEdgeWindow), 100L), KeyValue.pair(new Windowed("b", this.endEdgeWindow), 150L)), toList(fetch));
                    if (fetch != null) {
                        if (0 != 0) {
                            try {
                                fetch.close();
                            } catch (Throwable th6) {
                                th.addSuppressed(th6);
                            }
                        } else {
                            fetch.close();
                        }
                    }
                    fetch2 = this.bytesStore.fetch(Bytes.wrap("a".getBytes()), 0L, Long.MAX_VALUE);
                    th2 = null;
                } catch (Throwable th7) {
                    th = th7;
                    throw th7;
                }
                try {
                    try {
                        Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", this.startEdgeWindow), 10L), KeyValue.pair(new Windowed("a", this.endEdgeWindow), 50L)), toList(fetch2));
                        if (fetch2 != null) {
                            if (0 != 0) {
                                try {
                                    fetch2.close();
                                } catch (Throwable th8) {
                                    th2.addSuppressed(th8);
                                }
                            } else {
                                fetch2.close();
                            }
                        }
                        fetch3 = this.bytesStore.fetch(Bytes.wrap("b".getBytes()), 0L, Long.MAX_VALUE);
                        Throwable th9 = null;
                        try {
                            try {
                                Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("b", this.startEdgeWindow), 100L), KeyValue.pair(new Windowed("b", this.endEdgeWindow), 150L)), toList(fetch3));
                                if (fetch3 != null) {
                                    if (0 == 0) {
                                        fetch3.close();
                                        return;
                                    }
                                    try {
                                        fetch3.close();
                                    } catch (Throwable th10) {
                                        th9.addSuppressed(th10);
                                    }
                                }
                            } catch (Throwable th11) {
                                th9 = th11;
                                throw th11;
                            }
                        } finally {
                        }
                    } catch (Throwable th12) {
                        th2 = th12;
                        throw th12;
                    }
                } finally {
                }
            } finally {
                if (fetch != null) {
                    if (th != null) {
                        try {
                            fetch.close();
                        } catch (Throwable th13) {
                            th.addSuppressed(th13);
                        }
                    } else {
                        fetch.close();
                    }
                }
            }
        } finally {
            if (fetch3 != null) {
                if (th3 != null) {
                    try {
                        fetch3.close();
                    } catch (Throwable th14) {
                        th3.addSuppressed(th14);
                    }
                } else {
                    fetch3.close();
                }
            }
        }
    }

    @Test
    public void shouldPutAndFetchEdgeKeyRange() {
        Bytes serializeKey = serializeKey(new Windowed<>("a", this.startEdgeWindow), false, Integer.MAX_VALUE);
        Bytes serializeKey2 = serializeKey(new Windowed<>("a", this.endEdgeWindow), false, Integer.MAX_VALUE);
        Bytes serializeKey3 = serializeKey(new Windowed<>("b", this.startEdgeWindow), false, Integer.MAX_VALUE);
        Bytes serializeKey4 = serializeKey(new Windowed<>("b", this.endEdgeWindow), false, Integer.MAX_VALUE);
        this.bytesStore.put(serializeKey, serializeValue(10L));
        this.bytesStore.put(serializeKey2, serializeValue(50L));
        this.bytesStore.put(serializeKey3, serializeValue(100L));
        this.bytesStore.put(serializeKey4, serializeValue(150L));
        KeyValueIterator<Bytes, byte[]> fetch = this.bytesStore.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 9223372036854775107L, 9223372036854775207L);
        Throwable th = null;
        try {
            try {
                Assert.assertEquals(getIndexSchema() == null ? Arrays.asList(KeyValue.pair(new Windowed("a", this.startEdgeWindow), 10L), KeyValue.pair(new Windowed("b", this.startEdgeWindow), 100L), KeyValue.pair(new Windowed("a", this.endEdgeWindow), 50L), KeyValue.pair(new Windowed("b", this.endEdgeWindow), 150L)) : Arrays.asList(KeyValue.pair(new Windowed("a", this.startEdgeWindow), 10L), KeyValue.pair(new Windowed("a", this.endEdgeWindow), 50L), KeyValue.pair(new Windowed("b", this.startEdgeWindow), 100L), KeyValue.pair(new Windowed("b", this.endEdgeWindow), 150L)), toList(fetch));
                if (fetch != null) {
                    if (0 != 0) {
                        try {
                            fetch.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fetch.close();
                    }
                }
                KeyValueIterator<Bytes, byte[]> fetch2 = this.bytesStore.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 0L, Long.MAX_VALUE);
                Throwable th3 = null;
                try {
                    try {
                        Assert.assertEquals(getIndexSchema() == null ? Arrays.asList(KeyValue.pair(new Windowed("a", this.startEdgeWindow), 10L), KeyValue.pair(new Windowed("b", this.startEdgeWindow), 100L), KeyValue.pair(new Windowed("a", this.endEdgeWindow), 50L), KeyValue.pair(new Windowed("b", this.endEdgeWindow), 150L)) : Arrays.asList(KeyValue.pair(new Windowed("a", this.startEdgeWindow), 10L), KeyValue.pair(new Windowed("a", this.endEdgeWindow), 50L), KeyValue.pair(new Windowed("b", this.startEdgeWindow), 100L), KeyValue.pair(new Windowed("b", this.endEdgeWindow), 150L)), toList(fetch2));
                        if (fetch2 != null) {
                            if (0 != 0) {
                                try {
                                    fetch2.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            } else {
                                fetch2.close();
                            }
                        }
                        KeyValueIterator<Bytes, byte[]> fetch3 = this.bytesStore.fetch((Bytes) null, Bytes.wrap("a".getBytes()), 9223372036854775107L, 9223372036854775206L);
                        Throwable th5 = null;
                        try {
                            try {
                                Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", this.startEdgeWindow), 10L)), toList(fetch3));
                                if (fetch3 != null) {
                                    if (0 != 0) {
                                        try {
                                            fetch3.close();
                                        } catch (Throwable th6) {
                                            th5.addSuppressed(th6);
                                        }
                                    } else {
                                        fetch3.close();
                                    }
                                }
                                KeyValueIterator<Bytes, byte[]> fetch4 = this.bytesStore.fetch(Bytes.wrap("b".getBytes()), (Bytes) null, 9223372036854775108L, 9223372036854775207L);
                                Throwable th7 = null;
                                try {
                                    try {
                                        Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("b", this.endEdgeWindow), 150L)), toList(fetch4));
                                        if (fetch4 != null) {
                                            if (0 != 0) {
                                                try {
                                                    fetch4.close();
                                                } catch (Throwable th8) {
                                                    th7.addSuppressed(th8);
                                                }
                                            } else {
                                                fetch4.close();
                                            }
                                        }
                                        fetch3 = this.bytesStore.fetch((Bytes) null, (Bytes) null, 0L, Long.MAX_VALUE);
                                        Throwable th9 = null;
                                        try {
                                            try {
                                                Assert.assertEquals(getIndexSchema() == null ? Arrays.asList(KeyValue.pair(new Windowed("a", this.startEdgeWindow), 10L), KeyValue.pair(new Windowed("b", this.startEdgeWindow), 100L), KeyValue.pair(new Windowed("a", this.endEdgeWindow), 50L), KeyValue.pair(new Windowed("b", this.endEdgeWindow), 150L)) : Arrays.asList(KeyValue.pair(new Windowed("a", this.startEdgeWindow), 10L), KeyValue.pair(new Windowed("a", this.endEdgeWindow), 50L), KeyValue.pair(new Windowed("b", this.startEdgeWindow), 100L), KeyValue.pair(new Windowed("b", this.endEdgeWindow), 150L)), toList(fetch3));
                                                if (fetch3 != null) {
                                                    if (0 != 0) {
                                                        try {
                                                            fetch3.close();
                                                        } catch (Throwable th10) {
                                                            th9.addSuppressed(th10);
                                                        }
                                                    } else {
                                                        fetch3.close();
                                                    }
                                                }
                                                fetch = this.bytesStore.fetch((Bytes) null, (Bytes) null, 9223372036854775107L, 9223372036854775207L);
                                                Throwable th11 = null;
                                                try {
                                                    try {
                                                        Assert.assertEquals(getIndexSchema() == null ? Arrays.asList(KeyValue.pair(new Windowed("a", this.startEdgeWindow), 10L), KeyValue.pair(new Windowed("b", this.startEdgeWindow), 100L), KeyValue.pair(new Windowed("a", this.endEdgeWindow), 50L), KeyValue.pair(new Windowed("b", this.endEdgeWindow), 150L)) : Arrays.asList(KeyValue.pair(new Windowed("a", this.startEdgeWindow), 10L), KeyValue.pair(new Windowed("a", this.endEdgeWindow), 50L), KeyValue.pair(new Windowed("b", this.startEdgeWindow), 100L), KeyValue.pair(new Windowed("b", this.endEdgeWindow), 150L)), toList(fetch));
                                                        if (fetch != null) {
                                                            if (0 == 0) {
                                                                fetch.close();
                                                                return;
                                                            }
                                                            try {
                                                                fetch.close();
                                                            } catch (Throwable th12) {
                                                                th11.addSuppressed(th12);
                                                            }
                                                        }
                                                    } catch (Throwable th13) {
                                                        th11 = th13;
                                                        throw th13;
                                                    }
                                                } finally {
                                                }
                                            } catch (Throwable th14) {
                                                th9 = th14;
                                                throw th14;
                                            }
                                        } finally {
                                        }
                                    } catch (Throwable th15) {
                                        th7 = th15;
                                        throw th15;
                                    }
                                } finally {
                                    if (fetch4 != null) {
                                        if (th7 != null) {
                                            try {
                                                fetch4.close();
                                            } catch (Throwable th16) {
                                                th7.addSuppressed(th16);
                                            }
                                        } else {
                                            fetch4.close();
                                        }
                                    }
                                }
                            } catch (Throwable th17) {
                                th5 = th17;
                                throw th17;
                            }
                        } finally {
                            if (fetch3 != null) {
                                if (th5 != null) {
                                    try {
                                        fetch3.close();
                                    } catch (Throwable th18) {
                                        th5.addSuppressed(th18);
                                    }
                                } else {
                                    fetch3.close();
                                }
                            }
                        }
                    } catch (Throwable th19) {
                        th3 = th19;
                        throw th19;
                    }
                } finally {
                    if (fetch2 != null) {
                        if (th3 != null) {
                            try {
                                fetch2.close();
                            } catch (Throwable th20) {
                                th3.addSuppressed(th20);
                            }
                        } else {
                            fetch2.close();
                        }
                    }
                }
            } catch (Throwable th21) {
                th = th21;
                throw th21;
            }
        } finally {
            if (fetch != null) {
                if (th != null) {
                    try {
                        fetch.close();
                    } catch (Throwable th22) {
                        th.addSuppressed(th22);
                    }
                } else {
                    fetch.close();
                }
            }
        }
    }

    @Test
    public void shouldPutAndBackwardFetchEdgeSingleKey() {
        KeyValueIterator<Bytes, byte[]> backwardFetch;
        Throwable th;
        KeyValueIterator<Bytes, byte[]> backwardFetch2;
        Throwable th2;
        Bytes serializeKey = serializeKey(new Windowed<>("a", this.startEdgeWindow), false, Integer.MAX_VALUE);
        Bytes serializeKey2 = serializeKey(new Windowed<>("a", this.endEdgeWindow), false, Integer.MAX_VALUE);
        Bytes serializeKey3 = serializeKey(new Windowed<>("b", this.startEdgeWindow), false, Integer.MAX_VALUE);
        Bytes serializeKey4 = serializeKey(new Windowed<>("b", this.endEdgeWindow), false, Integer.MAX_VALUE);
        this.bytesStore.put(serializeKey, serializeValue(10L));
        this.bytesStore.put(serializeKey2, serializeValue(50L));
        this.bytesStore.put(serializeKey3, serializeValue(100L));
        this.bytesStore.put(serializeKey4, serializeValue(150L));
        KeyValueIterator<Bytes, byte[]> backwardFetch3 = this.bytesStore.backwardFetch(Bytes.wrap("a".getBytes()), 9223372036854775107L, 9223372036854775207L);
        Throwable th3 = null;
        try {
            try {
                Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", this.endEdgeWindow), 50L), KeyValue.pair(new Windowed("a", this.startEdgeWindow), 10L)), toList(backwardFetch3));
                if (backwardFetch3 != null) {
                    if (0 != 0) {
                        try {
                            backwardFetch3.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    } else {
                        backwardFetch3.close();
                    }
                }
                backwardFetch = this.bytesStore.backwardFetch(Bytes.wrap("b".getBytes()), 9223372036854775107L, 9223372036854775207L);
                th = null;
            } catch (Throwable th5) {
                th3 = th5;
                throw th5;
            }
            try {
                try {
                    Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("b", this.endEdgeWindow), 150L), KeyValue.pair(new Windowed("b", this.startEdgeWindow), 100L)), toList(backwardFetch));
                    if (backwardFetch != null) {
                        if (0 != 0) {
                            try {
                                backwardFetch.close();
                            } catch (Throwable th6) {
                                th.addSuppressed(th6);
                            }
                        } else {
                            backwardFetch.close();
                        }
                    }
                    backwardFetch2 = this.bytesStore.backwardFetch(Bytes.wrap("a".getBytes()), 0L, Long.MAX_VALUE);
                    th2 = null;
                } catch (Throwable th7) {
                    th = th7;
                    throw th7;
                }
                try {
                    try {
                        Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", this.endEdgeWindow), 50L), KeyValue.pair(new Windowed("a", this.startEdgeWindow), 10L)), toList(backwardFetch2));
                        if (backwardFetch2 != null) {
                            if (0 != 0) {
                                try {
                                    backwardFetch2.close();
                                } catch (Throwable th8) {
                                    th2.addSuppressed(th8);
                                }
                            } else {
                                backwardFetch2.close();
                            }
                        }
                        backwardFetch3 = this.bytesStore.backwardFetch(Bytes.wrap("b".getBytes()), 0L, Long.MAX_VALUE);
                        Throwable th9 = null;
                        try {
                            try {
                                Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("b", this.endEdgeWindow), 150L), KeyValue.pair(new Windowed("b", this.startEdgeWindow), 100L)), toList(backwardFetch3));
                                if (backwardFetch3 != null) {
                                    if (0 == 0) {
                                        backwardFetch3.close();
                                        return;
                                    }
                                    try {
                                        backwardFetch3.close();
                                    } catch (Throwable th10) {
                                        th9.addSuppressed(th10);
                                    }
                                }
                            } catch (Throwable th11) {
                                th9 = th11;
                                throw th11;
                            }
                        } finally {
                        }
                    } catch (Throwable th12) {
                        th2 = th12;
                        throw th12;
                    }
                } finally {
                }
            } finally {
                if (backwardFetch != null) {
                    if (th != null) {
                        try {
                            backwardFetch.close();
                        } catch (Throwable th13) {
                            th.addSuppressed(th13);
                        }
                    } else {
                        backwardFetch.close();
                    }
                }
            }
        } finally {
            if (backwardFetch3 != null) {
                if (th3 != null) {
                    try {
                        backwardFetch3.close();
                    } catch (Throwable th14) {
                        th3.addSuppressed(th14);
                    }
                } else {
                    backwardFetch3.close();
                }
            }
        }
    }

    @Test
    public void shouldPutAndBackwardFetchEdgeKeyRange() {
        Bytes serializeKey = serializeKey(new Windowed<>("a", this.startEdgeWindow), false, Integer.MAX_VALUE);
        Bytes serializeKey2 = serializeKey(new Windowed<>("a", this.endEdgeWindow), false, Integer.MAX_VALUE);
        Bytes serializeKey3 = serializeKey(new Windowed<>("b", this.startEdgeWindow), false, Integer.MAX_VALUE);
        Bytes serializeKey4 = serializeKey(new Windowed<>("b", this.endEdgeWindow), false, Integer.MAX_VALUE);
        this.bytesStore.put(serializeKey, serializeValue(10L));
        this.bytesStore.put(serializeKey2, serializeValue(50L));
        this.bytesStore.put(serializeKey3, serializeValue(100L));
        this.bytesStore.put(serializeKey4, serializeValue(150L));
        KeyValueIterator<Bytes, byte[]> backwardFetch = this.bytesStore.backwardFetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 9223372036854775107L, 9223372036854775207L);
        Throwable th = null;
        try {
            try {
                Assert.assertEquals(getIndexSchema() == null ? Arrays.asList(KeyValue.pair(new Windowed("b", this.endEdgeWindow), 150L), KeyValue.pair(new Windowed("a", this.endEdgeWindow), 50L), KeyValue.pair(new Windowed("b", this.startEdgeWindow), 100L), KeyValue.pair(new Windowed("a", this.startEdgeWindow), 10L)) : Arrays.asList(KeyValue.pair(new Windowed("b", this.endEdgeWindow), 150L), KeyValue.pair(new Windowed("b", this.startEdgeWindow), 100L), KeyValue.pair(new Windowed("a", this.endEdgeWindow), 50L), KeyValue.pair(new Windowed("a", this.startEdgeWindow), 10L)), toList(backwardFetch));
                if (backwardFetch != null) {
                    if (0 != 0) {
                        try {
                            backwardFetch.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        backwardFetch.close();
                    }
                }
                KeyValueIterator<Bytes, byte[]> backwardFetch2 = this.bytesStore.backwardFetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 0L, Long.MAX_VALUE);
                Throwable th3 = null;
                try {
                    try {
                        Assert.assertEquals(getIndexSchema() == null ? Arrays.asList(KeyValue.pair(new Windowed("b", this.endEdgeWindow), 150L), KeyValue.pair(new Windowed("a", this.endEdgeWindow), 50L), KeyValue.pair(new Windowed("b", this.startEdgeWindow), 100L), KeyValue.pair(new Windowed("a", this.startEdgeWindow), 10L)) : Arrays.asList(KeyValue.pair(new Windowed("b", this.endEdgeWindow), 150L), KeyValue.pair(new Windowed("b", this.startEdgeWindow), 100L), KeyValue.pair(new Windowed("a", this.endEdgeWindow), 50L), KeyValue.pair(new Windowed("a", this.startEdgeWindow), 10L)), toList(backwardFetch2));
                        if (backwardFetch2 != null) {
                            if (0 != 0) {
                                try {
                                    backwardFetch2.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            } else {
                                backwardFetch2.close();
                            }
                        }
                        KeyValueIterator<Bytes, byte[]> backwardFetch3 = this.bytesStore.backwardFetch((Bytes) null, Bytes.wrap("a".getBytes()), 9223372036854775107L, 9223372036854775206L);
                        Throwable th5 = null;
                        try {
                            try {
                                Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", this.startEdgeWindow), 10L)), toList(backwardFetch3));
                                if (backwardFetch3 != null) {
                                    if (0 != 0) {
                                        try {
                                            backwardFetch3.close();
                                        } catch (Throwable th6) {
                                            th5.addSuppressed(th6);
                                        }
                                    } else {
                                        backwardFetch3.close();
                                    }
                                }
                                KeyValueIterator<Bytes, byte[]> backwardFetch4 = this.bytesStore.backwardFetch(Bytes.wrap("b".getBytes()), (Bytes) null, 9223372036854775108L, 9223372036854775207L);
                                Throwable th7 = null;
                                try {
                                    try {
                                        Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("b", this.endEdgeWindow), 150L)), toList(backwardFetch4));
                                        if (backwardFetch4 != null) {
                                            if (0 != 0) {
                                                try {
                                                    backwardFetch4.close();
                                                } catch (Throwable th8) {
                                                    th7.addSuppressed(th8);
                                                }
                                            } else {
                                                backwardFetch4.close();
                                            }
                                        }
                                        backwardFetch3 = this.bytesStore.backwardFetch((Bytes) null, (Bytes) null, 0L, Long.MAX_VALUE);
                                        Throwable th9 = null;
                                        try {
                                            try {
                                                Assert.assertEquals(getIndexSchema() == null ? Arrays.asList(KeyValue.pair(new Windowed("b", this.endEdgeWindow), 150L), KeyValue.pair(new Windowed("a", this.endEdgeWindow), 50L), KeyValue.pair(new Windowed("b", this.startEdgeWindow), 100L), KeyValue.pair(new Windowed("a", this.startEdgeWindow), 10L)) : Arrays.asList(KeyValue.pair(new Windowed("b", this.endEdgeWindow), 150L), KeyValue.pair(new Windowed("b", this.startEdgeWindow), 100L), KeyValue.pair(new Windowed("a", this.endEdgeWindow), 50L), KeyValue.pair(new Windowed("a", this.startEdgeWindow), 10L)), toList(backwardFetch3));
                                                if (backwardFetch3 != null) {
                                                    if (0 != 0) {
                                                        try {
                                                            backwardFetch3.close();
                                                        } catch (Throwable th10) {
                                                            th9.addSuppressed(th10);
                                                        }
                                                    } else {
                                                        backwardFetch3.close();
                                                    }
                                                }
                                                backwardFetch = this.bytesStore.backwardFetch((Bytes) null, (Bytes) null, 9223372036854775107L, 9223372036854775207L);
                                                Throwable th11 = null;
                                                try {
                                                    try {
                                                        Assert.assertEquals(getIndexSchema() == null ? Arrays.asList(KeyValue.pair(new Windowed("b", this.endEdgeWindow), 150L), KeyValue.pair(new Windowed("a", this.endEdgeWindow), 50L), KeyValue.pair(new Windowed("b", this.startEdgeWindow), 100L), KeyValue.pair(new Windowed("a", this.startEdgeWindow), 10L)) : Arrays.asList(KeyValue.pair(new Windowed("b", this.endEdgeWindow), 150L), KeyValue.pair(new Windowed("b", this.startEdgeWindow), 100L), KeyValue.pair(new Windowed("a", this.endEdgeWindow), 50L), KeyValue.pair(new Windowed("a", this.startEdgeWindow), 10L)), toList(backwardFetch));
                                                        if (backwardFetch != null) {
                                                            if (0 == 0) {
                                                                backwardFetch.close();
                                                                return;
                                                            }
                                                            try {
                                                                backwardFetch.close();
                                                            } catch (Throwable th12) {
                                                                th11.addSuppressed(th12);
                                                            }
                                                        }
                                                    } catch (Throwable th13) {
                                                        th11 = th13;
                                                        throw th13;
                                                    }
                                                } finally {
                                                }
                                            } catch (Throwable th14) {
                                                th9 = th14;
                                                throw th14;
                                            }
                                        } finally {
                                        }
                                    } catch (Throwable th15) {
                                        th7 = th15;
                                        throw th15;
                                    }
                                } finally {
                                    if (backwardFetch4 != null) {
                                        if (th7 != null) {
                                            try {
                                                backwardFetch4.close();
                                            } catch (Throwable th16) {
                                                th7.addSuppressed(th16);
                                            }
                                        } else {
                                            backwardFetch4.close();
                                        }
                                    }
                                }
                            } catch (Throwable th17) {
                                th5 = th17;
                                throw th17;
                            }
                        } finally {
                            if (backwardFetch3 != null) {
                                if (th5 != null) {
                                    try {
                                        backwardFetch3.close();
                                    } catch (Throwable th18) {
                                        th5.addSuppressed(th18);
                                    }
                                } else {
                                    backwardFetch3.close();
                                }
                            }
                        }
                    } catch (Throwable th19) {
                        th3 = th19;
                        throw th19;
                    }
                } finally {
                    if (backwardFetch2 != null) {
                        if (th3 != null) {
                            try {
                                backwardFetch2.close();
                            } catch (Throwable th20) {
                                th3.addSuppressed(th20);
                            }
                        } else {
                            backwardFetch2.close();
                        }
                    }
                }
            } catch (Throwable th21) {
                th = th21;
                throw th21;
            }
        } finally {
            if (backwardFetch != null) {
                if (th != null) {
                    try {
                        backwardFetch.close();
                    } catch (Throwable th22) {
                        th.addSuppressed(th22);
                    }
                } else {
                    backwardFetch.close();
                }
            }
        }
    }

    @Test
    public void shouldPutAndFetchWithPrefixKey() {
        if (getBaseSchema() instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema) {
            TimeWindow timeWindow = new TimeWindow(9223372036854775806L, Long.MAX_VALUE);
            Bytes serializeKey = serializeKey(new Windowed<>("a", timeWindow), false, Integer.MAX_VALUE);
            Bytes serializeKey2 = serializeKey(new Windowed<>("aa", timeWindow), false, Integer.MAX_VALUE);
            Bytes serializeKey3 = serializeKey(new Windowed<>("aaa", timeWindow), false, Integer.MAX_VALUE);
            Assert.assertTrue(serializeKey.compareTo(serializeKey2) > 0);
            Assert.assertTrue(serializeKey2.compareTo(serializeKey3) > 0);
            this.bytesStore.put(serializeKey, serializeValue(10L));
            this.bytesStore.put(serializeKey2, serializeValue(50L));
            this.bytesStore.put(serializeKey3, serializeValue(100L));
            KeyValueIterator<Bytes, byte[]> fetch = this.bytesStore.fetch(Bytes.wrap("a".getBytes()), 0L, Long.MAX_VALUE);
            Throwable th = null;
            try {
                try {
                    Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", timeWindow), 10L)), toList(fetch));
                    if (fetch != null) {
                        if (0 != 0) {
                            try {
                                fetch.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            fetch.close();
                        }
                    }
                    KeyValueIterator<Bytes, byte[]> fetch2 = this.bytesStore.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("aa".getBytes()), 0L, Long.MAX_VALUE);
                    Throwable th3 = null;
                    try {
                        try {
                            Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("aa", timeWindow), 50L), KeyValue.pair(new Windowed("a", timeWindow), 10L)), toList(fetch2));
                            if (fetch2 != null) {
                                if (0 != 0) {
                                    try {
                                        fetch2.close();
                                    } catch (Throwable th4) {
                                        th3.addSuppressed(th4);
                                    }
                                } else {
                                    fetch2.close();
                                }
                            }
                            KeyValueIterator<Bytes, byte[]> fetch3 = this.bytesStore.fetch((Bytes) null, Bytes.wrap("aa".getBytes()), 0L, Long.MAX_VALUE);
                            Throwable th5 = null;
                            try {
                                try {
                                    Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("aa", timeWindow), 50L), KeyValue.pair(new Windowed("a", timeWindow), 10L)), toList(fetch3));
                                    if (fetch3 != null) {
                                        if (0 != 0) {
                                            try {
                                                fetch3.close();
                                            } catch (Throwable th6) {
                                                th5.addSuppressed(th6);
                                            }
                                        } else {
                                            fetch3.close();
                                        }
                                    }
                                    fetch3 = this.bytesStore.fetch(Bytes.wrap("aa".getBytes()), (Bytes) null, 0L, Long.MAX_VALUE);
                                    Throwable th7 = null;
                                    try {
                                        try {
                                            Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("aaa", timeWindow), 100L), KeyValue.pair(new Windowed("aa", timeWindow), 50L)), toList(fetch3));
                                            if (fetch3 != null) {
                                                if (0 != 0) {
                                                    try {
                                                        fetch3.close();
                                                    } catch (Throwable th8) {
                                                        th7.addSuppressed(th8);
                                                    }
                                                } else {
                                                    fetch3.close();
                                                }
                                            }
                                            fetch3 = this.bytesStore.fetch((Bytes) null, (Bytes) null, 0L, Long.MAX_VALUE);
                                            Throwable th9 = null;
                                            try {
                                                try {
                                                    Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("aaa", timeWindow), 100L), KeyValue.pair(new Windowed("aa", timeWindow), 50L), KeyValue.pair(new Windowed("a", timeWindow), 10L)), toList(fetch3));
                                                    if (fetch3 != null) {
                                                        if (0 == 0) {
                                                            fetch3.close();
                                                            return;
                                                        }
                                                        try {
                                                            fetch3.close();
                                                        } catch (Throwable th10) {
                                                            th9.addSuppressed(th10);
                                                        }
                                                    }
                                                } catch (Throwable th11) {
                                                    th9 = th11;
                                                    throw th11;
                                                }
                                            } finally {
                                            }
                                        } catch (Throwable th12) {
                                            th7 = th12;
                                            throw th12;
                                        }
                                    } finally {
                                    }
                                } catch (Throwable th13) {
                                    th5 = th13;
                                    throw th13;
                                }
                            } finally {
                                if (fetch3 != null) {
                                    if (th5 != null) {
                                        try {
                                            fetch3.close();
                                        } catch (Throwable th14) {
                                            th5.addSuppressed(th14);
                                        }
                                    } else {
                                        fetch3.close();
                                    }
                                }
                            }
                        } catch (Throwable th15) {
                            th3 = th15;
                            throw th15;
                        }
                    } finally {
                        if (fetch2 != null) {
                            if (th3 != null) {
                                try {
                                    fetch2.close();
                                } catch (Throwable th16) {
                                    th3.addSuppressed(th16);
                                }
                            } else {
                                fetch2.close();
                            }
                        }
                    }
                } catch (Throwable th17) {
                    th = th17;
                    throw th17;
                }
            } catch (Throwable th18) {
                if (fetch != null) {
                    if (th != null) {
                        try {
                            fetch.close();
                        } catch (Throwable th19) {
                            th.addSuppressed(th19);
                        }
                    } else {
                        fetch.close();
                    }
                }
                throw th18;
            }
        }
    }

    @Test
    public void shouldPutAndBackwardFetchWithPrefix() {
        if (getBaseSchema() instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema) {
            TimeWindow timeWindow = new TimeWindow(9223372036854775806L, Long.MAX_VALUE);
            Bytes serializeKey = serializeKey(new Windowed<>("a", timeWindow), false, Integer.MAX_VALUE);
            Bytes serializeKey2 = serializeKey(new Windowed<>("aa", timeWindow), false, Integer.MAX_VALUE);
            Bytes serializeKey3 = serializeKey(new Windowed<>("aaa", timeWindow), false, Integer.MAX_VALUE);
            Assert.assertTrue(serializeKey.compareTo(serializeKey2) > 0);
            Assert.assertTrue(serializeKey2.compareTo(serializeKey3) > 0);
            this.bytesStore.put(serializeKey, serializeValue(10L));
            this.bytesStore.put(serializeKey2, serializeValue(50L));
            this.bytesStore.put(serializeKey3, serializeValue(100L));
            KeyValueIterator<Bytes, byte[]> backwardFetch = this.bytesStore.backwardFetch(Bytes.wrap("a".getBytes()), 0L, Long.MAX_VALUE);
            Throwable th = null;
            try {
                try {
                    Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", timeWindow), 10L)), toList(backwardFetch));
                    if (backwardFetch != null) {
                        if (0 != 0) {
                            try {
                                backwardFetch.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            backwardFetch.close();
                        }
                    }
                    KeyValueIterator<Bytes, byte[]> backwardFetch2 = this.bytesStore.backwardFetch(Bytes.wrap("a".getBytes()), Bytes.wrap("aa".getBytes()), 0L, Long.MAX_VALUE);
                    Throwable th3 = null;
                    try {
                        try {
                            Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", timeWindow), 10L), KeyValue.pair(new Windowed("aa", timeWindow), 50L)), toList(backwardFetch2));
                            if (backwardFetch2 != null) {
                                if (0 != 0) {
                                    try {
                                        backwardFetch2.close();
                                    } catch (Throwable th4) {
                                        th3.addSuppressed(th4);
                                    }
                                } else {
                                    backwardFetch2.close();
                                }
                            }
                            KeyValueIterator<Bytes, byte[]> backwardFetch3 = this.bytesStore.backwardFetch((Bytes) null, Bytes.wrap("aa".getBytes()), 0L, Long.MAX_VALUE);
                            Throwable th5 = null;
                            try {
                                try {
                                    Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", timeWindow), 10L), KeyValue.pair(new Windowed("aa", timeWindow), 50L)), toList(backwardFetch3));
                                    if (backwardFetch3 != null) {
                                        if (0 != 0) {
                                            try {
                                                backwardFetch3.close();
                                            } catch (Throwable th6) {
                                                th5.addSuppressed(th6);
                                            }
                                        } else {
                                            backwardFetch3.close();
                                        }
                                    }
                                    backwardFetch3 = this.bytesStore.backwardFetch(Bytes.wrap("aa".getBytes()), (Bytes) null, 0L, Long.MAX_VALUE);
                                    Throwable th7 = null;
                                    try {
                                        try {
                                            Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("aa", timeWindow), 50L), KeyValue.pair(new Windowed("aaa", timeWindow), 100L)), toList(backwardFetch3));
                                            if (backwardFetch3 != null) {
                                                if (0 != 0) {
                                                    try {
                                                        backwardFetch3.close();
                                                    } catch (Throwable th8) {
                                                        th7.addSuppressed(th8);
                                                    }
                                                } else {
                                                    backwardFetch3.close();
                                                }
                                            }
                                            backwardFetch3 = this.bytesStore.backwardFetch((Bytes) null, (Bytes) null, 0L, Long.MAX_VALUE);
                                            Throwable th9 = null;
                                            try {
                                                try {
                                                    Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", timeWindow), 10L), KeyValue.pair(new Windowed("aa", timeWindow), 50L), KeyValue.pair(new Windowed("aaa", timeWindow), 100L)), toList(backwardFetch3));
                                                    if (backwardFetch3 != null) {
                                                        if (0 == 0) {
                                                            backwardFetch3.close();
                                                            return;
                                                        }
                                                        try {
                                                            backwardFetch3.close();
                                                        } catch (Throwable th10) {
                                                            th9.addSuppressed(th10);
                                                        }
                                                    }
                                                } catch (Throwable th11) {
                                                    th9 = th11;
                                                    throw th11;
                                                }
                                            } finally {
                                            }
                                        } catch (Throwable th12) {
                                            th7 = th12;
                                            throw th12;
                                        }
                                    } finally {
                                    }
                                } catch (Throwable th13) {
                                    th5 = th13;
                                    throw th13;
                                }
                            } finally {
                                if (backwardFetch3 != null) {
                                    if (th5 != null) {
                                        try {
                                            backwardFetch3.close();
                                        } catch (Throwable th14) {
                                            th5.addSuppressed(th14);
                                        }
                                    } else {
                                        backwardFetch3.close();
                                    }
                                }
                            }
                        } catch (Throwable th15) {
                            th3 = th15;
                            throw th15;
                        }
                    } finally {
                        if (backwardFetch2 != null) {
                            if (th3 != null) {
                                try {
                                    backwardFetch2.close();
                                } catch (Throwable th16) {
                                    th3.addSuppressed(th16);
                                }
                            } else {
                                backwardFetch2.close();
                            }
                        }
                    }
                } catch (Throwable th17) {
                    th = th17;
                    throw th17;
                }
            } catch (Throwable th18) {
                if (backwardFetch != null) {
                    if (th != null) {
                        try {
                            backwardFetch.close();
                        } catch (Throwable th19) {
                            th.addSuppressed(th19);
                        }
                    } else {
                        backwardFetch.close();
                    }
                }
                throw th18;
            }
        }
    }

    @Test
    public void shouldFetchSessionForSingleKey() {
        if (getBaseSchema() instanceof PrefixedSessionKeySchemas.TimeFirstSessionKeySchema) {
            StateSerdes withBuiltinTypes = StateSerdes.withBuiltinTypes("dummy", String.class, Long.class);
            Bytes wrap = Bytes.wrap(withBuiltinTypes.keySerializer().serialize("dummy", "a"));
            Bytes wrap2 = Bytes.wrap(withBuiltinTypes.keySerializer().serialize("dummy", "b"));
            Bytes wrap3 = Bytes.wrap(withBuiltinTypes.keySerializer().serialize("dummy", "c"));
            byte[] serializeValue = serializeValue(10L);
            byte[] serializeValue2 = serializeValue(50L);
            byte[] serializeValue3 = serializeValue(100L);
            byte[] serializeValue4 = serializeValue(200L);
            this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue);
            this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[1])), serializeValue2);
            this.bytesStore.put(serializeKey(new Windowed<>("b", this.windows[2])), serializeValue3);
            this.bytesStore.put(serializeKey(new Windowed<>("c", this.windows[3])), serializeValue4);
            Assert.assertNull(this.bytesStore.fetchSession(wrap, this.windows[0].start(), this.windows[0].end()));
            Assert.assertNull(this.bytesStore.fetchSession(wrap, this.windows[1].start(), this.windows[1].end()));
            Assert.assertNull(this.bytesStore.fetchSession(wrap2, this.windows[2].start(), this.windows[2].end()));
            Assert.assertEquals(Bytes.wrap(this.bytesStore.fetchSession(wrap3, this.windows[3].start(), this.windows[3].end())), Bytes.wrap(serializeValue4));
            Assert.assertNull(this.bytesStore.fetchSession(wrap3, 2000L, 3000L));
        }
    }

    @Test
    public void shouldFetchSessionForTimeRange() {
        if (getBaseSchema() instanceof PrefixedSessionKeySchemas.TimeFirstSessionKeySchema) {
            Window[] windowArr = new Window[4];
            windowArr[0] = new SessionWindow(100L, 100L);
            windowArr[1] = new SessionWindow(50L, 200L);
            windowArr[2] = new SessionWindow(200L, 300L);
            this.bytesStore.put(serializeKey(new Windowed<>("a", windowArr[0])), serializeValue(10L));
            this.bytesStore.put(serializeKey(new Windowed<>("b", windowArr[1])), serializeValue(100L));
            this.bytesStore.put(serializeKey(new Windowed<>("c", windowArr[2])), serializeValue(200L));
            KeyValueIterator<Bytes, byte[]> fetchSessions = this.bytesStore.fetchSessions(100L, 100L);
            Throwable th = null;
            try {
                try {
                    Assert.assertEquals(Collections.singletonList(KeyValue.pair(new Windowed("a", windowArr[0]), 10L)), toList(fetchSessions));
                    if (fetchSessions != null) {
                        if (0 != 0) {
                            try {
                                fetchSessions.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            fetchSessions.close();
                        }
                    }
                    KeyValueIterator<Bytes, byte[]> fetchSessions2 = this.bytesStore.fetchSessions(100L, 200L);
                    Throwable th3 = null;
                    try {
                        try {
                            Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", windowArr[0]), 10L), KeyValue.pair(new Windowed("b", windowArr[1]), 100L)), toList(fetchSessions2));
                            if (fetchSessions2 != null) {
                                if (0 != 0) {
                                    try {
                                        fetchSessions2.close();
                                    } catch (Throwable th4) {
                                        th3.addSuppressed(th4);
                                    }
                                } else {
                                    fetchSessions2.close();
                                }
                            }
                            KeyValueIterator<Bytes, byte[]> fetchSessions3 = this.bytesStore.fetchSessions(99L, 201L);
                            Throwable th5 = null;
                            try {
                                try {
                                    Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", windowArr[0]), 10L), KeyValue.pair(new Windowed("b", windowArr[1]), 100L)), toList(fetchSessions3));
                                    if (fetchSessions3 != null) {
                                        if (0 != 0) {
                                            try {
                                                fetchSessions3.close();
                                            } catch (Throwable th6) {
                                                th5.addSuppressed(th6);
                                            }
                                        } else {
                                            fetchSessions3.close();
                                        }
                                    }
                                    fetchSessions3 = this.bytesStore.fetchSessions(101L, 199L);
                                    Throwable th7 = null;
                                    try {
                                        try {
                                            Assert.assertTrue(toList(fetchSessions3).isEmpty());
                                            if (fetchSessions3 != null) {
                                                if (0 != 0) {
                                                    try {
                                                        fetchSessions3.close();
                                                    } catch (Throwable th8) {
                                                        th7.addSuppressed(th8);
                                                    }
                                                } else {
                                                    fetchSessions3.close();
                                                }
                                            }
                                            KeyValueIterator<Bytes, byte[]> fetchSessions4 = this.bytesStore.fetchSessions(100L, 300L);
                                            Throwable th9 = null;
                                            try {
                                                Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", windowArr[0]), 10L), KeyValue.pair(new Windowed("b", windowArr[1]), 100L), KeyValue.pair(new Windowed("c", windowArr[2]), 200L)), toList(fetchSessions4));
                                                if (fetchSessions4 != null) {
                                                    if (0 != 0) {
                                                        try {
                                                            fetchSessions4.close();
                                                        } catch (Throwable th10) {
                                                            th9.addSuppressed(th10);
                                                        }
                                                    } else {
                                                        fetchSessions4.close();
                                                    }
                                                }
                                                KeyValueIterator<Bytes, byte[]> fetchSessions5 = this.bytesStore.fetchSessions(99L, 301L);
                                                Throwable th11 = null;
                                                try {
                                                    Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", windowArr[0]), 10L), KeyValue.pair(new Windowed("b", windowArr[1]), 100L), KeyValue.pair(new Windowed("c", windowArr[2]), 200L)), toList(fetchSessions5));
                                                    if (fetchSessions5 != null) {
                                                        if (0 != 0) {
                                                            try {
                                                                fetchSessions5.close();
                                                            } catch (Throwable th12) {
                                                                th11.addSuppressed(th12);
                                                            }
                                                        } else {
                                                            fetchSessions5.close();
                                                        }
                                                    }
                                                    fetchSessions = this.bytesStore.fetchSessions(101L, 299L);
                                                    Throwable th13 = null;
                                                    try {
                                                        try {
                                                            Assert.assertEquals(Collections.singletonList(KeyValue.pair(new Windowed("b", windowArr[1]), 100L)), toList(fetchSessions));
                                                            if (fetchSessions != null) {
                                                                if (0 == 0) {
                                                                    fetchSessions.close();
                                                                    return;
                                                                }
                                                                try {
                                                                    fetchSessions.close();
                                                                } catch (Throwable th14) {
                                                                    th13.addSuppressed(th14);
                                                                }
                                                            }
                                                        } catch (Throwable th15) {
                                                            th13 = th15;
                                                            throw th15;
                                                        }
                                                    } finally {
                                                    }
                                                } catch (Throwable th16) {
                                                    if (fetchSessions5 != null) {
                                                        if (0 != 0) {
                                                            try {
                                                                fetchSessions5.close();
                                                            } catch (Throwable th17) {
                                                                th11.addSuppressed(th17);
                                                            }
                                                        } else {
                                                            fetchSessions5.close();
                                                        }
                                                    }
                                                    throw th16;
                                                }
                                            } catch (Throwable th18) {
                                                if (fetchSessions4 != null) {
                                                    if (0 != 0) {
                                                        try {
                                                            fetchSessions4.close();
                                                        } catch (Throwable th19) {
                                                            th9.addSuppressed(th19);
                                                        }
                                                    } else {
                                                        fetchSessions4.close();
                                                    }
                                                }
                                                throw th18;
                                            }
                                        } catch (Throwable th20) {
                                            th7 = th20;
                                            throw th20;
                                        }
                                    } finally {
                                    }
                                } catch (Throwable th21) {
                                    th5 = th21;
                                    throw th21;
                                }
                            } finally {
                                if (fetchSessions3 != null) {
                                    if (th5 != null) {
                                        try {
                                            fetchSessions3.close();
                                        } catch (Throwable th22) {
                                            th5.addSuppressed(th22);
                                        }
                                    } else {
                                        fetchSessions3.close();
                                    }
                                }
                            }
                        } catch (Throwable th23) {
                            th3 = th23;
                            throw th23;
                        }
                    } finally {
                        if (fetchSessions2 != null) {
                            if (th3 != null) {
                                try {
                                    fetchSessions2.close();
                                } catch (Throwable th24) {
                                    th3.addSuppressed(th24);
                                }
                            } else {
                                fetchSessions2.close();
                            }
                        }
                    }
                } catch (Throwable th25) {
                    th = th25;
                    throw th25;
                }
            } finally {
                if (fetchSessions != null) {
                    if (th != null) {
                        try {
                            fetchSessions.close();
                        } catch (Throwable th26) {
                            th.addSuppressed(th26);
                        }
                    } else {
                        fetchSessions.close();
                    }
                }
            }
        }
    }

    @Test
    public void shouldSkipAndRemoveDanglingIndex() {
        if (getIndexSchema() == null) {
            Assertions.assertThrows(IllegalStateException.class, () -> {
                this.bytesStore.putIndex(Bytes.wrap("a".getBytes()), new byte[0]);
            });
            return;
        }
        Bytes serializeKeyForIndex = serializeKeyForIndex(new Windowed<>("a", this.windows[1]));
        this.bytesStore.putIndex(serializeKeyForIndex, new byte[0]);
        MatcherAssert.assertThat(Bytes.wrap(this.bytesStore.getIndex(serializeKeyForIndex)), CoreMatchers.is(Bytes.wrap(new byte[0])));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(10L));
        this.bytesStore.put(serializeKey(new Windowed<>("b", this.windows[2])), serializeValue(20L));
        KeyValueIterator<Bytes, byte[]> fetch = this.bytesStore.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1L, 2000L);
        Throwable th = null;
        try {
            try {
                Assert.assertEquals(getBaseSchema() instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema ? Arrays.asList(KeyValue.pair(new Windowed("a", this.windows[0]), 10L), KeyValue.pair(new Windowed("b", this.windows[2]), 20L)) : Collections.singletonList(KeyValue.pair(new Windowed("b", this.windows[2]), 20L)), toList(fetch));
                if (fetch != null) {
                    if (0 != 0) {
                        try {
                            fetch.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fetch.close();
                    }
                }
                MatcherAssert.assertThat(this.bytesStore.getIndex(serializeKeyForIndex), CoreMatchers.is(CoreMatchers.nullValue()));
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (fetch != null) {
                if (th != null) {
                    try {
                        fetch.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    fetch.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldFindValuesWithinRange() {
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(10L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[1])), serializeValue(50L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[2])), serializeValue(100L));
        KeyValueIterator<Bytes, byte[]> fetch = this.bytesStore.fetch(Bytes.wrap("a".getBytes()), 1L, 999L);
        Throwable th = null;
        try {
            try {
                Assert.assertEquals(getBaseSchema() instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema ? Arrays.asList(KeyValue.pair(new Windowed("a", this.windows[0]), 10L), KeyValue.pair(new Windowed("a", this.windows[1]), 50L)) : Collections.singletonList(KeyValue.pair(new Windowed("a", this.windows[1]), 50L)), toList(fetch));
                if (fetch != null) {
                    if (0 == 0) {
                        fetch.close();
                        return;
                    }
                    try {
                        fetch.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (fetch != null) {
                if (th != null) {
                    try {
                        fetch.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    fetch.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldRemove() {
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(30L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[1])), serializeValue(50L));
        this.bytesStore.remove(serializeKey(new Windowed<>("a", this.windows[0])));
        KeyValueIterator fetch = this.bytesStore.fetch(Bytes.wrap("a".getBytes()), 0L, 100L);
        Throwable th = null;
        try {
            Assert.assertFalse(fetch.hasNext());
            if (fetch != null) {
                if (0 != 0) {
                    try {
                        fetch.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    fetch.close();
                }
            }
            if (getIndexSchema() != null) {
                MatcherAssert.assertThat(this.bytesStore.getIndex(serializeKeyForIndex(new Windowed<>("a", this.windows[0]))), CoreMatchers.is(CoreMatchers.nullValue()));
            }
        } catch (Throwable th3) {
            if (fetch != null) {
                if (0 != 0) {
                    try {
                        fetch.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fetch.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldRollSegments() {
        AbstractSegments<S> newSegments = newSegments();
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(50L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[1])), serializeValue(100L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[2])), serializeValue(500L));
        Assert.assertEquals(Collections.singleton(newSegments.segmentName(0L)), segmentDirs());
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[3])), serializeValue(1000L));
        Assert.assertEquals(Utils.mkSet(new String[]{newSegments.segmentName(0L), newSegments.segmentName(1L)}), segmentDirs());
        Assert.assertEquals(Collections.emptyList(), toList(this.bytesStore.fetch(Bytes.wrap("a".getBytes()), 0L, 1500L)));
        Assert.assertEquals(Collections.singletonList(KeyValue.pair(new Windowed("a", this.windows[3]), 1000L)), toList(this.bytesStore.fetch(Bytes.wrap("a".getBytes()), 59000L, IntegrationTestUtils.DEFAULT_TIMEOUT)));
        newSegments.close();
    }

    @Test
    public void shouldGetAllSegments() {
        AbstractSegments<S> newSegments = newSegments();
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(50L));
        Assert.assertEquals(Collections.singleton(newSegments.segmentName(0L)), segmentDirs());
        this.bytesStore.put(serializeKey(new Windowed<>("b", this.windows[3])), serializeValue(100L));
        Assert.assertEquals(Utils.mkSet(new String[]{newSegments.segmentName(0L), newSegments.segmentName(1L)}), segmentDirs());
        Assert.assertEquals(Collections.singletonList(KeyValue.pair(new Windowed("b", this.windows[3]), 100L)), toList(this.bytesStore.all()));
        newSegments.close();
    }

    @Test
    public void shouldGetAllBackwards() {
        AbstractSegments<S> newSegments = newSegments();
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(50L));
        Assert.assertEquals(Collections.singleton(newSegments.segmentName(0L)), segmentDirs());
        this.bytesStore.put(serializeKey(new Windowed<>("b", this.windows[3])), serializeValue(100L));
        Assert.assertEquals(Utils.mkSet(new String[]{newSegments.segmentName(0L), newSegments.segmentName(1L)}), segmentDirs());
        Assert.assertEquals(Collections.singletonList(KeyValue.pair(new Windowed("b", this.windows[3]), 100L)), toList(this.bytesStore.backwardAll()));
        newSegments.close();
    }

    @Test
    public void shouldFetchAllSegments() {
        AbstractSegments<S> newSegments = newSegments();
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(50L));
        Assert.assertEquals(Collections.singleton(newSegments.segmentName(0L)), segmentDirs());
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[3])), serializeValue(100L));
        Assert.assertEquals(Utils.mkSet(new String[]{newSegments.segmentName(0L), newSegments.segmentName(1L)}), segmentDirs());
        Assert.assertEquals(Collections.singletonList(KeyValue.pair(new Windowed("a", this.windows[3]), 100L)), toList(this.bytesStore.fetchAll(0L, IntegrationTestUtils.DEFAULT_TIMEOUT)));
        newSegments.close();
    }

    @Test
    public void shouldLoadSegmentsWithOldStyleDateFormattedName() {
        AbstractSegments<S> newSegments = newSegments();
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(50L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[3])), serializeValue(100L));
        this.bytesStore.close();
        String segmentName = newSegments.segmentName(0L);
        String[] split = segmentName.split("\\.");
        long parseLong = Long.parseLong(split[1]);
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmm");
        simpleDateFormat.setTimeZone(new SimpleTimeZone(0, "UTC"));
        String format = simpleDateFormat.format(new Date(parseLong * IntegrationTestUtils.DEFAULT_TIMEOUT));
        File file = new File(this.stateDir, "bytes-store");
        Assert.assertTrue(new File(file, segmentName).renameTo(new File(file, split[0] + "-" + format)));
        this.bytesStore = getBytesStore();
        this.bytesStore.init(this.context, this.bytesStore);
        MatcherAssert.assertThat(toList(this.bytesStore.fetch(Bytes.wrap("a".getBytes()), 0L, IntegrationTestUtils.DEFAULT_TIMEOUT)), CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed("a", this.windows[0]), 50L), KeyValue.pair(new Windowed("a", this.windows[3]), 100L))));
        newSegments.close();
    }

    @Test
    public void shouldLoadSegmentsWithOldStyleColonFormattedName() {
        AbstractSegments<S> newSegments = newSegments();
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(50L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[3])), serializeValue(100L));
        this.bytesStore.close();
        String segmentName = newSegments.segmentName(0L);
        String[] split = segmentName.split("\\.");
        File file = new File(this.stateDir, "bytes-store");
        Assert.assertTrue(new File(file, segmentName).renameTo(new File(file, split[0] + ":" + Long.parseLong(split[1]))));
        this.bytesStore = getBytesStore();
        this.bytesStore.init(this.context, this.bytesStore);
        MatcherAssert.assertThat(toList(this.bytesStore.fetch(Bytes.wrap("a".getBytes()), 0L, IntegrationTestUtils.DEFAULT_TIMEOUT)), CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed("a", this.windows[0]), 50L), KeyValue.pair(new Windowed("a", this.windows[3]), 100L))));
        newSegments.close();
    }

    @Test
    public void shouldBeAbleToWriteToReInitializedStore() {
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(50L));
        this.bytesStore.close();
        this.bytesStore.init(this.context, this.bytesStore);
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[1])), serializeValue(100L));
    }

    @Test
    public void shouldCreateWriteBatches() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ConsumerRecord("", 0, 0L, serializeKey(new Windowed<>("a", this.windows[0]), true).get(), serializeValue(50L)));
        arrayList.add(new ConsumerRecord("", 0, 0L, serializeKey(new Windowed<>("a", this.windows[3]), true).get(), serializeValue(100L)));
        Map writeBatches = this.bytesStore.getWriteBatches(arrayList);
        Assert.assertEquals(2L, writeBatches.size());
        int i = getIndexSchema() == null ? 1 : 2;
        Iterator it = writeBatches.values().iterator();
        while (it.hasNext()) {
            Assert.assertEquals(i, ((WriteBatch) it.next()).count());
        }
    }

    @Test
    public void shouldRestoreToByteStoreForActiveTask() {
        shouldRestoreToByteStore(Task.TaskType.ACTIVE);
    }

    @Test
    public void shouldRestoreToByteStoreForStandbyTask() {
        this.context.transitionToStandby(null);
        shouldRestoreToByteStore(Task.TaskType.STANDBY);
    }

    private void shouldRestoreToByteStore(Task.TaskType taskType) {
        this.bytesStore.init(this.context, this.bytesStore);
        Assert.assertEquals(0L, this.bytesStore.getSegments().size());
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ConsumerRecord("", 0, 0L, serializeKey(new Windowed<>("a", this.windows[0]), true).get(), serializeValue(50L)));
        arrayList.add(new ConsumerRecord("", 0, 0L, serializeKey(new Windowed<>("a", this.windows[3]), true).get(), serializeValue(100L)));
        this.bytesStore.restoreAllInternal(arrayList);
        Assert.assertEquals(2L, this.bytesStore.getSegments().size());
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(new KeyValue(new Windowed("a", this.windows[3]), 100L));
        Assert.assertEquals(arrayList2, toList(this.bytesStore.all()));
    }

    @Test
    public void shouldMatchPositionAfterPut() {
        this.bytesStore.init(this.context, this.bytesStore);
        this.context.setRecordContext(new ProcessorRecordContext(0L, 1L, 0, "", new RecordHeaders()));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(10L));
        this.context.setRecordContext(new ProcessorRecordContext(0L, 2L, 0, "", new RecordHeaders()));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[1])), serializeValue(50L));
        this.context.setRecordContext(new ProcessorRecordContext(0L, 3L, 0, "", new RecordHeaders()));
        this.bytesStore.put(serializeKey(new Windowed<>("b", this.windows[2])), serializeValue(100L));
        this.context.setRecordContext(new ProcessorRecordContext(0L, 4L, 0, "", new RecordHeaders()));
        this.bytesStore.put(serializeKey(new Windowed<>("c", this.windows[3])), serializeValue(200L));
        Assert.assertEquals(Position.fromMap(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("", Utils.mkMap(new Map.Entry[]{Utils.mkEntry(0, 4L)}))})), this.bytesStore.getPosition());
    }

    @Test
    public void shouldRestoreRecordsAndConsistencyVectorSingleTopic() {
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        streamsConfig.put("__iq.consistency.offset.vector.enabled__", true);
        this.context = new InternalMockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), new StreamsMetricsImpl(new Metrics(), "mock", "latest", new MockTime()), new StreamsConfig(streamsConfig), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0L, new MockStreamsMetrics(new Metrics())), Time.SYSTEM);
        this.bytesStore = getBytesStore();
        this.bytesStore.init(this.context, this.bytesStore);
        Assert.assertEquals(0L, this.bytesStore.getSegments().size());
        this.bytesStore.restoreAllInternal(getChangelogRecords());
        Assert.assertEquals(2L, this.bytesStore.getSegments().size());
        ArrayList arrayList = new ArrayList();
        arrayList.add(new KeyValue(new Windowed("a", this.windows[3]), 200L));
        Assert.assertEquals(arrayList, toList(this.bytesStore.all()));
        MatcherAssert.assertThat(this.bytesStore.getPosition(), Matchers.notNullValue());
        MatcherAssert.assertThat(this.bytesStore.getPosition().getPartitionPositions(""), Matchers.notNullValue());
        MatcherAssert.assertThat(this.bytesStore.getPosition().getPartitionPositions(""), Matchers.hasEntry(0, 3L));
    }

    @Test
    public void shouldRestoreRecordsAndConsistencyVectorMultipleTopics() {
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        streamsConfig.put("__iq.consistency.offset.vector.enabled__", true);
        this.context = new InternalMockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), new StreamsMetricsImpl(new Metrics(), "mock", "latest", new MockTime()), new StreamsConfig(streamsConfig), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0L, new MockStreamsMetrics(new Metrics())), Time.SYSTEM);
        this.bytesStore = getBytesStore();
        this.bytesStore.init(this.context, this.bytesStore);
        Assert.assertEquals(0L, this.bytesStore.getSegments().size());
        this.bytesStore.restoreAllInternal(getChangelogRecordsMultipleTopics());
        Assert.assertEquals(2L, this.bytesStore.getSegments().size());
        ArrayList arrayList = new ArrayList();
        arrayList.add(new KeyValue(new Windowed("a", this.windows[3]), 200L));
        Assert.assertEquals(arrayList, toList(this.bytesStore.all()));
        MatcherAssert.assertThat(this.bytesStore.getPosition(), Matchers.notNullValue());
        MatcherAssert.assertThat(this.bytesStore.getPosition().getPartitionPositions("A"), Matchers.notNullValue());
        MatcherAssert.assertThat(this.bytesStore.getPosition().getPartitionPositions("A"), Matchers.hasEntry(0, 3L));
        MatcherAssert.assertThat(this.bytesStore.getPosition().getPartitionPositions("B"), Matchers.notNullValue());
        MatcherAssert.assertThat(this.bytesStore.getPosition().getPartitionPositions("B"), Matchers.hasEntry(0, 2L));
    }

    @Test
    public void shouldHandleTombstoneRecords() {
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        streamsConfig.put("__iq.consistency.offset.vector.enabled__", true);
        this.context = new InternalMockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), new StreamsMetricsImpl(new Metrics(), "mock", "latest", new MockTime()), new StreamsConfig(streamsConfig), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0L, new MockStreamsMetrics(new Metrics())), Time.SYSTEM);
        this.bytesStore = getBytesStore();
        this.bytesStore.init(this.context, this.bytesStore);
        Assert.assertEquals(0L, this.bytesStore.getSegments().size());
        this.bytesStore.restoreAllInternal(getChangelogRecordsWithTombstones());
        Assert.assertEquals(1L, this.bytesStore.getSegments().size());
        ArrayList arrayList = new ArrayList();
        if (getBaseSchema() instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema) {
            arrayList.add(new KeyValue(new Windowed("a", this.windows[0]), 50L));
        }
        Assert.assertEquals(arrayList, toList(this.bytesStore.all()));
        MatcherAssert.assertThat(this.bytesStore.getPosition(), Matchers.notNullValue());
        MatcherAssert.assertThat(this.bytesStore.getPosition().getPartitionPositions("A"), Matchers.hasEntry(0, 2L));
    }

    @Test
    public void shouldNotThrowWhenRestoringOnMissingHeaders() {
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        streamsConfig.put("__iq.consistency.offset.vector.enabled__", true);
        this.context = new InternalMockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), new StreamsMetricsImpl(new Metrics(), "mock", "latest", new MockTime()), new StreamsConfig(streamsConfig), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0L, new MockStreamsMetrics(new Metrics())), Time.SYSTEM);
        this.bytesStore = getBytesStore();
        this.bytesStore.init(this.context, this.bytesStore);
        this.bytesStore.restoreAllInternal(getChangelogRecordsWithoutHeaders());
        MatcherAssert.assertThat(this.bytesStore.getPosition(), CoreMatchers.is(Position.emptyPosition()));
    }

    private List<ConsumerRecord<byte[], byte[]>> getChangelogRecords() {
        ArrayList arrayList = new ArrayList();
        RecordHeaders recordHeaders = new RecordHeaders();
        Position withComponent = Position.emptyPosition().withComponent("", 0, 1L);
        recordHeaders.add(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY);
        recordHeaders.add(new RecordHeader("c", PositionSerde.serialize(withComponent).array()));
        arrayList.add(new ConsumerRecord("", 0, 0L, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, serializeKey(new Windowed<>("a", this.windows[0]), true).get(), serializeValue(50L), recordHeaders, Optional.empty()));
        recordHeaders.remove("c");
        Position withComponent2 = withComponent.withComponent("", 0, 2L);
        recordHeaders.add(new RecordHeader("c", PositionSerde.serialize(withComponent2).array()));
        arrayList.add(new ConsumerRecord("", 0, 0L, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, serializeKey(new Windowed<>("a", this.windows[2]), true).get(), serializeValue(100L), recordHeaders, Optional.empty()));
        recordHeaders.remove("c");
        recordHeaders.add(new RecordHeader("c", PositionSerde.serialize(withComponent2.withComponent("", 0, 3L)).array()));
        arrayList.add(new ConsumerRecord("", 0, 0L, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, serializeKey(new Windowed<>("a", this.windows[3]), true).get(), serializeValue(200L), recordHeaders, Optional.empty()));
        return arrayList;
    }

    private List<ConsumerRecord<byte[], byte[]>> getChangelogRecordsMultipleTopics() {
        ArrayList arrayList = new ArrayList();
        RecordHeaders recordHeaders = new RecordHeaders();
        Position withComponent = Position.emptyPosition().withComponent("A", 0, 1L);
        recordHeaders.add(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY);
        recordHeaders.add(new RecordHeader("c", PositionSerde.serialize(withComponent).array()));
        arrayList.add(new ConsumerRecord("", 0, 0L, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, serializeKey(new Windowed<>("a", this.windows[0]), true).get(), serializeValue(50L), recordHeaders, Optional.empty()));
        recordHeaders.remove("c");
        Position withComponent2 = withComponent.withComponent("B", 0, 2L);
        recordHeaders.add(new RecordHeader("c", PositionSerde.serialize(withComponent2).array()));
        arrayList.add(new ConsumerRecord("", 0, 0L, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, serializeKey(new Windowed<>("a", this.windows[2]), true).get(), serializeValue(100L), recordHeaders, Optional.empty()));
        recordHeaders.remove("c");
        recordHeaders.add(new RecordHeader("c", PositionSerde.serialize(withComponent2.withComponent("A", 0, 3L)).array()));
        arrayList.add(new ConsumerRecord("", 0, 0L, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, serializeKey(new Windowed<>("a", this.windows[3]), true).get(), serializeValue(200L), recordHeaders, Optional.empty()));
        return arrayList;
    }

    private List<ConsumerRecord<byte[], byte[]>> getChangelogRecordsWithTombstones() {
        ArrayList arrayList = new ArrayList();
        RecordHeaders recordHeaders = new RecordHeaders();
        Position withComponent = Position.emptyPosition().withComponent("A", 0, 1L);
        recordHeaders.add(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY);
        recordHeaders.add(new RecordHeader("c", PositionSerde.serialize(withComponent).array()));
        arrayList.add(new ConsumerRecord("", 0, 0L, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, serializeKey(new Windowed<>("a", this.windows[0]), true).get(), serializeValue(50L), recordHeaders, Optional.empty()));
        Position withComponent2 = withComponent.withComponent("A", 0, 2L);
        recordHeaders.add(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY);
        recordHeaders.add(new RecordHeader("c", PositionSerde.serialize(withComponent2).array()));
        arrayList.add(new ConsumerRecord("", 0, 0L, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, serializeKey(new Windowed<>("a", this.windows[2]), true).get(), (Object) null, recordHeaders, Optional.empty()));
        return arrayList;
    }

    private List<ConsumerRecord<byte[], byte[]>> getChangelogRecordsWithoutHeaders() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ConsumerRecord("", 0, 0L, serializeKey(new Windowed<>("a", this.windows[2])).get(), serializeValue(50L)));
        return arrayList;
    }

    @Test
    public void shouldLogAndMeasureExpiredRecords() {
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        AbstractDualSchemaRocksDBSegmentedBytesStore<S> bytesStore = getBytesStore();
        InternalMockProcessorContext internalMockProcessorContext = new InternalMockProcessorContext(TestUtils.tempDirectory(), new StreamsConfig(streamsConfig));
        internalMockProcessorContext.setSystemTimeMs(new SystemTime().milliseconds());
        bytesStore.init(internalMockProcessorContext, bytesStore);
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister();
        Throwable th = null;
        try {
            try {
                bytesStore.put(serializeKey(new Windowed<>("dummy", this.nextSegmentWindow)), serializeValue(0L));
                bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(5L));
                MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem("Skipping record for expired segment."));
                if (createAndRegister != null) {
                    if (0 != 0) {
                        try {
                            createAndRegister.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createAndRegister.close();
                    }
                }
                Map metrics = internalMockProcessorContext.metrics().metrics();
                String name = Thread.currentThread().getName();
                Metric metric = (Metric) metrics.get(new MetricName("dropped-records-total", "stream-task-metrics", "", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", name), Utils.mkEntry("task-id", "0_0")})));
                Metric metric2 = (Metric) metrics.get(new MetricName("dropped-records-rate", "stream-task-metrics", "", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", name), Utils.mkEntry("task-id", "0_0")})));
                Assert.assertEquals(Double.valueOf(1.0d), metric.metricValue());
                Assert.assertNotEquals(Double.valueOf(0.0d), metric2.metricValue());
                bytesStore.close();
            } finally {
            }
        } catch (Throwable th3) {
            if (createAndRegister != null) {
                if (th != null) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th3;
        }
    }

    private Set<String> segmentDirs() {
        return Utils.mkSet((Object[]) Objects.requireNonNull(new File(this.stateDir, "bytes-store").list()));
    }

    private Bytes serializeKey(Windowed<String> windowed) {
        return serializeKey(windowed, false);
    }

    private Bytes serializeKey(Windowed<String> windowed, boolean z) {
        return serializeKey(windowed, z, 0);
    }

    private Bytes serializeKey(Windowed<String> windowed, boolean z, int i) {
        StateSerdes withBuiltinTypes = StateSerdes.withBuiltinTypes("dummy", String.class, Long.class);
        if (getBaseSchema() instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema) {
            return z ? WindowKeySchema.toStoreKeyBinary(windowed, i, withBuiltinTypes) : PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.toStoreKeyBinary(windowed, i, withBuiltinTypes);
        }
        if (getBaseSchema() instanceof PrefixedSessionKeySchemas.TimeFirstSessionKeySchema) {
            return z ? Bytes.wrap(SessionKeySchema.toBinary(windowed, withBuiltinTypes.keySerializer(), "dummy")) : Bytes.wrap(PrefixedSessionKeySchemas.TimeFirstSessionKeySchema.toBinary(windowed, withBuiltinTypes.keySerializer(), "dummy"));
        }
        throw new IllegalStateException("Unrecognized serde schema");
    }

    private Bytes serializeKeyForIndex(Windowed<String> windowed) {
        StateSerdes withBuiltinTypes = StateSerdes.withBuiltinTypes("dummy", String.class, Long.class);
        if (getIndexSchema() instanceof PrefixedWindowKeySchemas.KeyFirstWindowKeySchema) {
            return PrefixedWindowKeySchemas.KeyFirstWindowKeySchema.toStoreKeyBinary(windowed, 0, withBuiltinTypes);
        }
        if (getIndexSchema() instanceof PrefixedSessionKeySchemas.KeyFirstSessionKeySchema) {
            return Bytes.wrap(PrefixedSessionKeySchemas.KeyFirstSessionKeySchema.toBinary(windowed, withBuiltinTypes.keySerializer(), "dummy"));
        }
        throw new IllegalStateException("Unrecognized serde schema");
    }

    private byte[] serializeValue(long j) {
        return Serdes.Long().serializer().serialize("", Long.valueOf(j));
    }

    private List<KeyValue<Windowed<String>, Long>> toList(KeyValueIterator<Bytes, byte[]> keyValueIterator) {
        ArrayList arrayList = new ArrayList();
        StateSerdes withBuiltinTypes = StateSerdes.withBuiltinTypes("dummy", String.class, Long.class);
        while (keyValueIterator.hasNext()) {
            KeyValue keyValue = (KeyValue) keyValueIterator.next();
            if (getBaseSchema() instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema) {
                arrayList.add(KeyValue.pair(PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.fromStoreKey(((Bytes) keyValue.key).get(), 500L, withBuiltinTypes.keyDeserializer(), withBuiltinTypes.topic()), withBuiltinTypes.valueDeserializer().deserialize("dummy", (byte[]) keyValue.value)));
            } else {
                if (!(getBaseSchema() instanceof PrefixedSessionKeySchemas.TimeFirstSessionKeySchema)) {
                    throw new IllegalStateException("Unrecognized serde schema");
                }
                arrayList.add(KeyValue.pair(PrefixedSessionKeySchemas.TimeFirstSessionKeySchema.from(((Bytes) keyValue.key).get(), withBuiltinTypes.keyDeserializer(), "dummy"), withBuiltinTypes.valueDeserializer().deserialize("dummy", (byte[]) keyValue.value)));
            }
        }
        return arrayList;
    }
}
