package org.apache.kafka.streams.state.internals;

import java.util.Collections;
import java.util.Iterator;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.MergedSortedCacheWindowStoreKeyValueIterator;
import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas;
import org.apache.kafka.test.KeyValueIteratorStub;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.class */
public class MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest {
    private static final SegmentedCacheFunction SINGLE_SEGMENT_CACHE_FUNCTION = new SegmentedCacheFunction(null, -1) { // from class: org.apache.kafka.streams.state.internals.MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.1
        public long segmentId(Bytes bytes) {
            return 0L;
        }
    };
    private static final int WINDOW_SIZE = 10;
    private Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs;
    private StoreKeySerializer<String> storeKeySerializer;
    private MergedSortedCacheWindowStoreKeyValueIterator.StoreKeyToWindowKey storeKeyToWindowKey;
    private MergedSortedCacheWindowStoreKeyValueIterator.WindowKeyToBytes windowKeyToBytes;
    private final String storeKey = "a";
    private final String cacheKey = "b";
    private final TimeWindow storeWindow = new TimeWindow(0, 1);
    private final Iterator<KeyValue<Windowed<Bytes>, byte[]>> storeKvs = Collections.singleton(KeyValue.pair(new Windowed(Bytes.wrap("a".getBytes()), this.storeWindow), "a".getBytes())).iterator();
    private final TimeWindow cacheWindow = new TimeWindow(10, 20);
    private final Deserializer<String> deserializer = Serdes.String().deserializer();

    /* loaded from: input_file:org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest$SchemaType.class */
    private enum SchemaType {
        WINDOW_KEY_SCHEMA,
        KEY_FIRST_SCHEMA,
        TIME_FIRST_SCHEMA
    }

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest$StoreKeySerializer.class */
    public interface StoreKeySerializer<K> {
        Bytes serialize(Windowed<K> windowed, int i, StateSerdes<K, ?> stateSerdes);
    }

    public void setUp(SchemaType schemaType) {
        switch (schemaType) {
            case WINDOW_KEY_SCHEMA:
                this.storeKeySerializer = WindowKeySchema::toStoreKeyBinary;
                this.storeKeyToWindowKey = WindowKeySchema::fromStoreKey;
                this.windowKeyToBytes = WindowKeySchema::toStoreKeyBinary;
                break;
            case KEY_FIRST_SCHEMA:
                this.storeKeySerializer = PrefixedWindowKeySchemas.KeyFirstWindowKeySchema::toStoreKeyBinary;
                this.storeKeyToWindowKey = PrefixedWindowKeySchemas.KeyFirstWindowKeySchema::fromStoreKey;
                this.windowKeyToBytes = PrefixedWindowKeySchemas.KeyFirstWindowKeySchema::toStoreKeyBinary;
                break;
            case TIME_FIRST_SCHEMA:
                this.storeKeySerializer = PrefixedWindowKeySchemas.TimeFirstWindowKeySchema::toStoreKeyBinary;
                this.storeKeyToWindowKey = PrefixedWindowKeySchemas.TimeFirstWindowKeySchema::fromStoreKey;
                this.windowKeyToBytes = PrefixedWindowKeySchemas.TimeFirstWindowKeySchema::toStoreKeyBinary;
                break;
            default:
                throw new IllegalStateException("Unknown schemaType: " + String.valueOf(schemaType));
        }
        this.cacheKvs = Collections.singleton(KeyValue.pair(SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(this.storeKeySerializer.serialize(new Windowed<>("b", this.cacheWindow), 0, new StateSerdes<>("dummy", Serdes.String(), Serdes.ByteArray()))), new LRUCacheEntry("b".getBytes()))).iterator();
    }

    @EnumSource(SchemaType.class)
    @ParameterizedTest
    public void shouldHaveNextFromStore(SchemaType schemaType) {
        setUp(schemaType);
        Assertions.assertTrue(createIterator(this.storeKvs, Collections.emptyIterator(), false).hasNext());
    }

    @EnumSource(SchemaType.class)
    @ParameterizedTest
    public void shouldHaveNextFromReverseStore(SchemaType schemaType) {
        setUp(schemaType);
        Assertions.assertTrue(createIterator(this.storeKvs, Collections.emptyIterator(), true).hasNext());
    }

    @EnumSource(SchemaType.class)
    @ParameterizedTest
    public void shouldGetNextFromStore(SchemaType schemaType) {
        setUp(schemaType);
        MatcherAssert.assertThat(convertKeyValuePair(createIterator(this.storeKvs, Collections.emptyIterator(), false).next()), CoreMatchers.equalTo(KeyValue.pair(new Windowed("a", this.storeWindow), "a")));
    }

    @EnumSource(SchemaType.class)
    @ParameterizedTest
    public void shouldGetNextFromReverseStore(SchemaType schemaType) {
        setUp(schemaType);
        MatcherAssert.assertThat(convertKeyValuePair(createIterator(this.storeKvs, Collections.emptyIterator(), true).next()), CoreMatchers.equalTo(KeyValue.pair(new Windowed("a", this.storeWindow), "a")));
    }

    @EnumSource(SchemaType.class)
    @ParameterizedTest
    public void shouldPeekNextKeyFromStore(SchemaType schemaType) {
        setUp(schemaType);
        MatcherAssert.assertThat(convertWindowedKey((Windowed) createIterator(this.storeKvs, Collections.emptyIterator(), false).peekNextKey()), CoreMatchers.equalTo(new Windowed("a", this.storeWindow)));
    }

    @EnumSource(SchemaType.class)
    @ParameterizedTest
    public void shouldPeekNextKeyFromReverseStore(SchemaType schemaType) {
        setUp(schemaType);
        MatcherAssert.assertThat(convertWindowedKey((Windowed) createIterator(this.storeKvs, Collections.emptyIterator(), true).peekNextKey()), CoreMatchers.equalTo(new Windowed("a", this.storeWindow)));
    }

    @EnumSource(SchemaType.class)
    @ParameterizedTest
    public void shouldHaveNextFromCache(SchemaType schemaType) {
        setUp(schemaType);
        Assertions.assertTrue(createIterator(Collections.emptyIterator(), this.cacheKvs, false).hasNext());
    }

    @EnumSource(SchemaType.class)
    @ParameterizedTest
    public void shouldHaveNextFromReverseCache(SchemaType schemaType) {
        setUp(schemaType);
        Assertions.assertTrue(createIterator(Collections.emptyIterator(), this.cacheKvs, true).hasNext());
    }

    @EnumSource(SchemaType.class)
    @ParameterizedTest
    public void shouldGetNextFromCache(SchemaType schemaType) {
        setUp(schemaType);
        MatcherAssert.assertThat(convertKeyValuePair(createIterator(Collections.emptyIterator(), this.cacheKvs, false).next()), CoreMatchers.equalTo(KeyValue.pair(new Windowed("b", this.cacheWindow), "b")));
    }

    @EnumSource(SchemaType.class)
    @ParameterizedTest
    public void shouldGetNextFromReverseCache(SchemaType schemaType) {
        setUp(schemaType);
        MatcherAssert.assertThat(convertKeyValuePair(createIterator(Collections.emptyIterator(), this.cacheKvs, true).next()), CoreMatchers.equalTo(KeyValue.pair(new Windowed("b", this.cacheWindow), "b")));
    }

    @EnumSource(SchemaType.class)
    @ParameterizedTest
    public void shouldPeekNextKeyFromCache(SchemaType schemaType) {
        setUp(schemaType);
        MatcherAssert.assertThat(convertWindowedKey((Windowed) createIterator(Collections.emptyIterator(), this.cacheKvs, false).peekNextKey()), CoreMatchers.equalTo(new Windowed("b", this.cacheWindow)));
    }

    @EnumSource(SchemaType.class)
    @ParameterizedTest
    public void shouldPeekNextKeyFromReverseCache(SchemaType schemaType) {
        setUp(schemaType);
        MatcherAssert.assertThat(convertWindowedKey((Windowed) createIterator(Collections.emptyIterator(), this.cacheKvs, true).peekNextKey()), CoreMatchers.equalTo(new Windowed("b", this.cacheWindow)));
    }

    @EnumSource(SchemaType.class)
    @ParameterizedTest
    public void shouldIterateBothStoreAndCache(SchemaType schemaType) {
        setUp(schemaType);
        MergedSortedCacheWindowStoreKeyValueIterator createIterator = createIterator(this.storeKvs, this.cacheKvs, true);
        MatcherAssert.assertThat(convertKeyValuePair(createIterator.next()), CoreMatchers.equalTo(KeyValue.pair(new Windowed("a", this.storeWindow), "a")));
        MatcherAssert.assertThat(convertKeyValuePair(createIterator.next()), CoreMatchers.equalTo(KeyValue.pair(new Windowed("b", this.cacheWindow), "b")));
        Assertions.assertFalse(createIterator.hasNext());
    }

    @EnumSource(SchemaType.class)
    @ParameterizedTest
    public void shouldReverseIterateBothStoreAndCache(SchemaType schemaType) {
        setUp(schemaType);
        MergedSortedCacheWindowStoreKeyValueIterator createIterator = createIterator(this.storeKvs, this.cacheKvs, false);
        MatcherAssert.assertThat(convertKeyValuePair(createIterator.next()), CoreMatchers.equalTo(KeyValue.pair(new Windowed("b", this.cacheWindow), "b")));
        MatcherAssert.assertThat(convertKeyValuePair(createIterator.next()), CoreMatchers.equalTo(KeyValue.pair(new Windowed("a", this.storeWindow), "a")));
        Assertions.assertFalse(createIterator.hasNext());
    }

    private KeyValue<Windowed<String>, String> convertKeyValuePair(KeyValue<Windowed<Bytes>, byte[]> keyValue) {
        return KeyValue.pair(convertWindowedKey((Windowed) keyValue.key), (String) this.deserializer.deserialize("", (byte[]) keyValue.value));
    }

    private Windowed<String> convertWindowedKey(Windowed<Bytes> windowed) {
        return new Windowed<>((String) this.deserializer.deserialize("", ((Bytes) windowed.key()).get()), windowed.window());
    }

    private MergedSortedCacheWindowStoreKeyValueIterator createIterator(Iterator<KeyValue<Windowed<Bytes>, byte[]>> it, Iterator<KeyValue<Bytes, LRUCacheEntry>> it2, boolean z) {
        return new MergedSortedCacheWindowStoreKeyValueIterator(new DelegatingPeekingKeyValueIterator("cache", new KeyValueIteratorStub(it2)), new DelegatingPeekingKeyValueIterator("store", new KeyValueIteratorStub(it)), new StateSerdes("name", Serdes.Bytes(), Serdes.ByteArray()), 10L, SINGLE_SEGMENT_CACHE_FUNCTION, z, this.storeKeyToWindowKey, this.windowKeyToBytes);
    }
}
