package org.apache.kafka.streams.state.internals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.MockProcessorContext;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/CachingSessionStoreTest.class */
public class CachingSessionStoreTest {
    private static final int MAX_CACHE_SIZE_BYTES = 600;
    private final StateSerdes<String, Long> serdes = new StateSerdes<>("name", Serdes.String(), Serdes.Long());
    private MockProcessorContext context;
    private RocksDBSegmentedBytesStore underlying;
    private CachingSessionStore<String, Long> cachingStore;
    private ThreadCache cache;
    private static final Long DEFAULT_TIMESTAMP = 10L;

    @Before
    public void setUp() throws Exception {
        SessionKeySchema sessionKeySchema = new SessionKeySchema();
        sessionKeySchema.init("topic");
        this.underlying = new RocksDBSegmentedBytesStore("test", 60000L, 3, sessionKeySchema);
        this.cachingStore = new CachingSessionStore<>(new RocksDBSessionStore(this.underlying, Serdes.Bytes(), Serdes.ByteArray()), Serdes.String(), Serdes.Long(), Segments.segmentInterval(60000L, 3));
        this.cache = new ThreadCache("testCache", 600L, new MockStreamsMetrics(new Metrics()));
        this.context = new MockProcessorContext(TestUtils.tempDirectory(), (Serde<?>) null, (Serde<?>) null, (RecordCollector) null, this.cache);
        this.context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP.longValue(), 0L, 0, "topic"));
        this.cachingStore.init(this.context, this.cachingStore);
    }

    @After
    public void close() {
        this.context.close();
        this.cachingStore.close();
    }

    @Test
    public void shouldPutFetchFromCache() throws Exception {
        this.cachingStore.put(new Windowed("a", new SessionWindow(0L, 0L)), 1L);
        this.cachingStore.put(new Windowed("aa", new SessionWindow(0L, 0L)), 1L);
        this.cachingStore.put(new Windowed("b", new SessionWindow(0L, 0L)), 1L);
        Assert.assertEquals(3L, this.cache.size());
        KeyValueIterator findSessions = this.cachingStore.findSessions("a", 0L, 0L);
        KeyValueIterator findSessions2 = this.cachingStore.findSessions("b", 0L, 0L);
        Assert.assertEquals(KeyValue.pair(new Windowed("a", new SessionWindow(0L, 0L)), 1L), findSessions.next());
        Assert.assertEquals(KeyValue.pair(new Windowed("b", new SessionWindow(0L, 0L)), 1L), findSessions2.next());
        Assert.assertFalse(findSessions.hasNext());
        Assert.assertFalse(findSessions2.hasNext());
    }

    @Test
    public void shouldPutFetchAllKeysFromCache() throws Exception {
        this.cachingStore.put(new Windowed("a", new SessionWindow(0L, 0L)), 1L);
        this.cachingStore.put(new Windowed("aa", new SessionWindow(0L, 0L)), 1L);
        this.cachingStore.put(new Windowed("b", new SessionWindow(0L, 0L)), 1L);
        Assert.assertEquals(3L, this.cache.size());
        KeyValueIterator findSessions = this.cachingStore.findSessions("a", "b", 0L, 0L);
        Assert.assertEquals(KeyValue.pair(new Windowed("a", new SessionWindow(0L, 0L)), 1L), findSessions.next());
        Assert.assertEquals(KeyValue.pair(new Windowed("aa", new SessionWindow(0L, 0L)), 1L), findSessions.next());
        Assert.assertEquals(KeyValue.pair(new Windowed("b", new SessionWindow(0L, 0L)), 1L), findSessions.next());
        Assert.assertFalse(findSessions.hasNext());
    }

    @Test
    public void shouldPutFetchRangeFromCache() throws Exception {
        this.cachingStore.put(new Windowed("a", new SessionWindow(0L, 0L)), 1L);
        this.cachingStore.put(new Windowed("aa", new SessionWindow(0L, 0L)), 1L);
        this.cachingStore.put(new Windowed("b", new SessionWindow(0L, 0L)), 1L);
        Assert.assertEquals(3L, this.cache.size());
        KeyValueIterator findSessions = this.cachingStore.findSessions("aa", "b", 0L, 0L);
        Assert.assertEquals(KeyValue.pair(new Windowed("aa", new SessionWindow(0L, 0L)), 1L), findSessions.next());
        Assert.assertEquals(KeyValue.pair(new Windowed("b", new SessionWindow(0L, 0L)), 1L), findSessions.next());
        Assert.assertFalse(findSessions.hasNext());
    }

    @Test
    public void shouldFetchAllSessionsWithSameRecordKey() throws Exception {
        List<KeyValue> asList = Arrays.asList(KeyValue.pair(new Windowed("a", new SessionWindow(0L, 0L)), 1L), KeyValue.pair(new Windowed("a", new SessionWindow(10L, 10L)), 2L), KeyValue.pair(new Windowed("a", new SessionWindow(100L, 100L)), 3L), KeyValue.pair(new Windowed("a", new SessionWindow(1000L, 1000L)), 4L));
        for (KeyValue keyValue : asList) {
            this.cachingStore.put((Windowed) keyValue.key, keyValue.value);
        }
        this.cachingStore.put(new Windowed("aa", new SessionWindow(0L, 0L)), 5L);
        Assert.assertEquals(asList, RocksDBSessionStoreTest.toList(this.cachingStore.fetch("a")));
    }

    @Test
    public void shouldFlushItemsToStoreOnEviction() throws Exception {
        List<KeyValue<Windowed<String>, Long>> addSessionsUntilOverflow = addSessionsUntilOverflow("a", "b", "c", "d");
        Assert.assertEquals(addSessionsUntilOverflow.size() - 1, this.cache.size());
        KeyValue keyValue = (KeyValue) this.underlying.fetch(Bytes.wrap(((String) ((Windowed) addSessionsUntilOverflow.get(0).key).key()).getBytes()), 0L, 0L).next();
        Assert.assertEquals(addSessionsUntilOverflow.get(0).key, SessionKeySerde.from(((Bytes) keyValue.key).get(), Serdes.String().deserializer(), "dummy"));
        Assert.assertArrayEquals(this.serdes.rawValue(addSessionsUntilOverflow.get(0).value), (byte[]) keyValue.value);
    }

    @Test
    public void shouldQueryItemsInCacheAndStore() throws Exception {
        Assert.assertEquals(addSessionsUntilOverflow("a"), RocksDBSessionStoreTest.toList(this.cachingStore.findSessions("a", 0L, r0.size() * 10)));
    }

    @Test
    public void shouldRemove() throws Exception {
        Windowed windowed = new Windowed("a", new SessionWindow(0L, 0L));
        Windowed windowed2 = new Windowed("b", new SessionWindow(0L, 0L));
        this.cachingStore.put(windowed, 2L);
        this.cachingStore.put(windowed2, 2L);
        this.cachingStore.flush();
        this.cachingStore.remove(windowed);
        this.cachingStore.flush();
        Assert.assertFalse(this.cachingStore.findSessions("a", 0L, 0L).hasNext());
    }

    @Test
    public void shouldFetchCorrectlyAcrossSegments() throws Exception {
        Windowed windowed = new Windowed("a", new SessionWindow(0L, 0L));
        Windowed windowed2 = new Windowed("a", new SessionWindow(60000L, 60000L));
        Windowed windowed3 = new Windowed("a", new SessionWindow(120000L, 120000L));
        this.cachingStore.put(windowed, 1L);
        this.cachingStore.put(windowed2, 2L);
        this.cachingStore.put(windowed3, 3L);
        this.cachingStore.flush();
        KeyValueIterator findSessions = this.cachingStore.findSessions("a", 0L, 120000L);
        Assert.assertEquals(windowed, ((KeyValue) findSessions.next()).key);
        Assert.assertEquals(windowed2, ((KeyValue) findSessions.next()).key);
        Assert.assertEquals(windowed3, ((KeyValue) findSessions.next()).key);
        Assert.assertFalse(findSessions.hasNext());
    }

    @Test
    public void shouldFetchRangeCorrectlyAcrossSegments() throws Exception {
        Windowed windowed = new Windowed("a", new SessionWindow(0L, 0L));
        Windowed windowed2 = new Windowed("aa", new SessionWindow(0L, 0L));
        Windowed windowed3 = new Windowed("a", new SessionWindow(60000L, 60000L));
        Windowed windowed4 = new Windowed("a", new SessionWindow(120000L, 120000L));
        Windowed windowed5 = new Windowed("aa", new SessionWindow(120000L, 120000L));
        this.cachingStore.put(windowed, 1L);
        this.cachingStore.put(windowed2, 1L);
        this.cachingStore.put(windowed3, 2L);
        this.cachingStore.put(windowed4, 3L);
        this.cachingStore.put(windowed5, 3L);
        this.cachingStore.flush();
        KeyValueIterator findSessions = this.cachingStore.findSessions("a", "aa", 0L, 120000L);
        Assert.assertEquals(windowed, ((KeyValue) findSessions.next()).key);
        Assert.assertEquals(windowed2, ((KeyValue) findSessions.next()).key);
        Assert.assertEquals(windowed3, ((KeyValue) findSessions.next()).key);
        Assert.assertEquals(windowed4, ((KeyValue) findSessions.next()).key);
        Assert.assertEquals(windowed5, ((KeyValue) findSessions.next()).key);
        Assert.assertFalse(findSessions.hasNext());
    }

    @Test
    public void shouldForwardChangedValuesDuringFlush() throws Exception {
        Windowed windowed = new Windowed("a", new SessionWindow(0L, 0L));
        final ArrayList arrayList = new ArrayList();
        this.cachingStore.setFlushListener(new CacheFlushListener<Windowed<String>, Long>() { // from class: org.apache.kafka.streams.state.internals.CachingSessionStoreTest.1
            public void apply(Windowed<String> windowed2, Long l, Long l2) {
                arrayList.add(KeyValue.pair(windowed2, new Change(l, l2)));
            }
        });
        this.cachingStore.put(windowed, 1L);
        this.cachingStore.flush();
        this.cachingStore.put(windowed, 2L);
        this.cachingStore.flush();
        this.cachingStore.remove(windowed);
        this.cachingStore.flush();
        Assert.assertEquals(arrayList, Arrays.asList(KeyValue.pair(windowed, new Change(1L, (Object) null)), KeyValue.pair(windowed, new Change(2L, 1L)), KeyValue.pair(windowed, new Change((Object) null, 2L))));
    }

    @Test
    public void shouldClearNamespaceCacheOnClose() throws Exception {
        this.cachingStore.put(new Windowed("a", new SessionWindow(0L, 0L)), 1L);
        Assert.assertEquals(1L, this.cache.size());
        this.cachingStore.close();
        Assert.assertEquals(0L, this.cache.size());
    }

    @Test(expected = InvalidStateStoreException.class)
    public void shouldThrowIfTryingToFetchFromClosedCachingStore() throws Exception {
        this.cachingStore.close();
        this.cachingStore.fetch("a");
    }

    @Test(expected = InvalidStateStoreException.class)
    public void shouldThrowIfTryingToFindMergeSessionFromClosedCachingStore() throws Exception {
        this.cachingStore.close();
        this.cachingStore.findSessions("a", 0L, Long.MAX_VALUE);
    }

    @Test(expected = InvalidStateStoreException.class)
    public void shouldThrowIfTryingToRemoveFromClosedCachingStore() throws Exception {
        this.cachingStore.close();
        this.cachingStore.remove(new Windowed("a", new SessionWindow(0L, 0L)));
    }

    @Test(expected = InvalidStateStoreException.class)
    public void shouldThrowIfTryingToPutIntoClosedCachingStore() throws Exception {
        this.cachingStore.close();
        this.cachingStore.put(new Windowed("a", new SessionWindow(0L, 0L)), 1L);
    }

    private List<KeyValue<Windowed<String>, Long>> addSessionsUntilOverflow(String... strArr) {
        Random random = new Random();
        ArrayList arrayList = new ArrayList();
        while (this.cache.size() == arrayList.size()) {
            addSingleSession(strArr[random.nextInt(strArr.length)], arrayList);
        }
        return arrayList;
    }

    private void addSingleSession(String str, List<KeyValue<Windowed<String>, Long>> list) {
        int size = list.size() * 10;
        Windowed windowed = new Windowed(str, new SessionWindow(size, size));
        this.cachingStore.put(windowed, 1L);
        list.add(KeyValue.pair(windowed, 1L));
    }
}
