package org.apache.kafka.streams.state.internals;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@ExtendWith({MockitoExtension.class})
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
/* loaded from: input_file:org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.class */
public class MeteredWindowStoreTest {
    private static final String STORE_TYPE = "scope";
    private static final String STORE_LEVEL_GROUP = "stream-state-metrics";
    private static final String THREAD_ID_TAG_KEY = "thread-id";
    private static final String STORE_NAME = "mocked-store";
    private static final String CHANGELOG_TOPIC = "changelog-topic";
    private static final String KEY = "key";
    private static final Bytes KEY_BYTES = Bytes.wrap(KEY.getBytes());
    private static final String VALUE = "value";
    private static final byte[] VALUE_BYTES = VALUE.getBytes();
    private static final int WINDOW_SIZE_MS = 10;
    private static final int RETENTION_PERIOD = 100;
    private static final long TIMESTAMP = 42;
    private InternalMockProcessorContext<?, ?> context;
    private Map<String, String> tags;
    private final String threadId = Thread.currentThread().getName();
    private final WindowStore<Bytes, byte[]> innerStoreMock = (WindowStore) Mockito.mock(WindowStore.class);
    private final MockTime mockTime = new MockTime();
    private MeteredWindowStore<String, String> store = new MeteredWindowStore<>(this.innerStoreMock, 10, STORE_TYPE, this.mockTime, Serdes.String(), new SerdeThatDoesntHandleNull());
    private final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG));

    /* loaded from: input_file:org/apache/kafka/streams/state/internals/MeteredWindowStoreTest$CachedWindowStore.class */
    private interface CachedWindowStore extends WindowStore<Bytes, byte[]>, CachedStateStore<byte[], byte[]> {
    }

    public MeteredWindowStoreTest() {
        Mockito.when(this.innerStoreMock.name()).thenReturn(STORE_NAME);
    }

    @BeforeEach
    public void setUp() {
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(this.metrics, "test", "processId", new MockTime());
        this.context = new InternalMockProcessorContext<>(TestUtils.tempDirectory(), Serdes.String(), Serdes.Long(), streamsMetricsImpl, new StreamsConfig(StreamsTestUtils.getStreamsConfig()), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0L, streamsMetricsImpl), Time.SYSTEM);
        this.tags = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(THREAD_ID_TAG_KEY, this.threadId), Utils.mkEntry("task-id", this.context.taskId().toString()), Utils.mkEntry("scope-state-id", STORE_NAME)});
    }

    @Test
    public void shouldDelegateInit() {
        MeteredWindowStore meteredWindowStore = new MeteredWindowStore(this.innerStoreMock, 10L, STORE_TYPE, new MockTime(), Serdes.String(), new SerdeThatDoesntHandleNull());
        Mockito.when(this.innerStoreMock.name()).thenReturn("store");
        ((WindowStore) Mockito.doNothing().when(this.innerStoreMock)).init(this.context, meteredWindowStore);
        meteredWindowStore.init(this.context, meteredWindowStore);
    }

    @Test
    public void shouldPassChangelogTopicNameToStateStoreSerde() {
        this.context.addChangelogForStore(STORE_NAME, "changelog-topic");
        doShouldPassChangelogTopicNameToStateStoreSerde("changelog-topic");
    }

    @Test
    public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
        doShouldPassChangelogTopicNameToStateStoreSerde(ProcessorStateManager.storeChangelogTopic(this.context.applicationId(), STORE_NAME, this.context.taskId().topologyName()));
    }

    private void doShouldPassChangelogTopicNameToStateStoreSerde(String str) {
        Serde serde = (Serde) Mockito.mock(Serde.class);
        Serializer serializer = (Serializer) Mockito.mock(Serializer.class);
        Serde serde2 = (Serde) Mockito.mock(Serde.class);
        Deserializer deserializer = (Deserializer) Mockito.mock(Deserializer.class);
        Serializer serializer2 = (Serializer) Mockito.mock(Serializer.class);
        Mockito.when(serde.serializer()).thenReturn(serializer);
        Mockito.when(serializer.serialize(str, KEY)).thenReturn(KEY.getBytes());
        Mockito.when(serde2.deserializer()).thenReturn(deserializer);
        Mockito.when((String) deserializer.deserialize(str, VALUE_BYTES)).thenReturn(VALUE);
        Mockito.when(serde2.serializer()).thenReturn(serializer2);
        Mockito.when(serializer2.serialize(str, VALUE)).thenReturn(VALUE_BYTES);
        Mockito.when((byte[]) this.innerStoreMock.fetch(KEY_BYTES, TIMESTAMP)).thenReturn(VALUE_BYTES);
        this.store = new MeteredWindowStore<>(this.innerStoreMock, 10L, STORE_TYPE, new MockTime(), serde, serde2);
        this.store.init(this.context, this.store);
        this.store.fetch(KEY, TIMESTAMP);
        this.store.put(KEY, VALUE, TIMESTAMP);
    }

    @Test
    public void testMetrics() {
        this.store.init(this.context, this.store);
        JmxReporter jmxReporter = new JmxReporter();
        jmxReporter.contextChange(new KafkaMetricsContext("kafka.streams"));
        this.metrics.addReporter(jmxReporter);
        Assertions.assertTrue(jmxReporter.containsMbean(String.format("kafka.streams:type=%s,%s=%s,task-id=%s,%s-state-id=%s", STORE_LEVEL_GROUP, THREAD_ID_TAG_KEY, this.threadId, this.context.taskId().toString(), STORE_TYPE, STORE_NAME)));
    }

    @Test
    public void shouldRecordRestoreLatencyOnInit() {
        ((WindowStore) Mockito.doNothing().when(this.innerStoreMock)).init(this.context, this.store);
        this.store.init(this.context, this.store);
        MatcherAssert.assertThat((Double) metric("restore-rate").metricValue(), Matchers.greaterThan(Double.valueOf(0.0d)));
    }

    @Test
    public void shouldPutToInnerStoreAndRecordPutMetrics() {
        ((WindowStore) Mockito.doNothing().when(this.innerStoreMock)).put((Bytes) ArgumentMatchers.eq(Bytes.wrap("a".getBytes())), (byte[]) ArgumentMatchers.any(), ArgumentMatchers.eq(this.context.timestamp()));
        this.store.init(this.context, this.store);
        this.store.put("a", "a", this.context.timestamp());
        MatcherAssert.assertThat((Double) metric("put-rate").metricValue(), Matchers.greaterThan(Double.valueOf(0.0d)));
    }

    @Test
    public void shouldFetchFromInnerStoreAndRecordFetchMetrics() {
        Mockito.when(this.innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1L, 1L)).thenReturn(KeyValueIterators.emptyWindowStoreIterator());
        this.store.init(this.context, this.store);
        this.store.fetch("a", Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L)).close();
        MatcherAssert.assertThat((Double) metric("fetch-rate").metricValue(), Matchers.greaterThan(Double.valueOf(0.0d)));
    }

    @Test
    public void shouldReturnNoRecordWhenFetchedKeyHasExpired() {
        Mockito.when(this.innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1L, 101L)).thenReturn(KeyValueIterators.emptyWindowStoreIterator());
        this.store.init(this.context, this.store);
        this.store.fetch("a", Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L).plus(100L, (TemporalUnit) ChronoUnit.MILLIS)).close();
    }

    @Test
    public void shouldFetchRangeFromInnerStoreAndRecordFetchMetrics() {
        Mockito.when(this.innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1L, 1L)).thenReturn(KeyValueIterators.emptyIterator());
        Mockito.when(this.innerStoreMock.fetch((Object) null, Bytes.wrap("b".getBytes()), 1L, 1L)).thenReturn(KeyValueIterators.emptyIterator());
        Mockito.when(this.innerStoreMock.fetch(Bytes.wrap("a".getBytes()), (Object) null, 1L, 1L)).thenReturn(KeyValueIterators.emptyIterator());
        Mockito.when(this.innerStoreMock.fetch((Object) null, (Object) null, 1L, 1L)).thenReturn(KeyValueIterators.emptyIterator());
        this.store.init(this.context, this.store);
        this.store.fetch("a", "b", Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L)).close();
        this.store.fetch((Object) null, "b", Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L)).close();
        this.store.fetch("a", (Object) null, Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L)).close();
        this.store.fetch((Object) null, (Object) null, Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L)).close();
        MatcherAssert.assertThat((Double) metric("fetch-rate").metricValue(), Matchers.greaterThan(Double.valueOf(0.0d)));
    }

    @Test
    public void shouldBackwardFetchFromInnerStoreAndRecordFetchMetrics() {
        Mockito.when(this.innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1L, 1L)).thenReturn(KeyValueIterators.emptyIterator());
        this.store.init(this.context, this.store);
        this.store.backwardFetch("a", "b", Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L)).close();
        MatcherAssert.assertThat((Double) metric("fetch-rate").metricValue(), Matchers.greaterThan(Double.valueOf(0.0d)));
    }

    @Test
    public void shouldBackwardFetchRangeFromInnerStoreAndRecordFetchMetrics() {
        Mockito.when(this.innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1L, 1L)).thenReturn(KeyValueIterators.emptyIterator());
        Mockito.when(this.innerStoreMock.backwardFetch((Object) null, Bytes.wrap("b".getBytes()), 1L, 1L)).thenReturn(KeyValueIterators.emptyIterator());
        Mockito.when(this.innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()), (Object) null, 1L, 1L)).thenReturn(KeyValueIterators.emptyIterator());
        Mockito.when(this.innerStoreMock.backwardFetch((Object) null, (Object) null, 1L, 1L)).thenReturn(KeyValueIterators.emptyIterator());
        this.store.init(this.context, this.store);
        this.store.backwardFetch("a", "b", Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L)).close();
        this.store.backwardFetch((Object) null, "b", Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L)).close();
        this.store.backwardFetch("a", (Object) null, Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L)).close();
        this.store.backwardFetch((Object) null, (Object) null, Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L)).close();
        MatcherAssert.assertThat((Double) metric("fetch-rate").metricValue(), Matchers.greaterThan(Double.valueOf(0.0d)));
    }

    @Test
    public void shouldFetchAllFromInnerStoreAndRecordFetchMetrics() {
        Mockito.when(this.innerStoreMock.fetchAll(1L, 1L)).thenReturn(KeyValueIterators.emptyIterator());
        this.store.init(this.context, this.store);
        this.store.fetchAll(Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L)).close();
        MatcherAssert.assertThat((Double) metric("fetch-rate").metricValue(), Matchers.greaterThan(Double.valueOf(0.0d)));
    }

    @Test
    public void shouldBackwardFetchAllFromInnerStoreAndRecordFetchMetrics() {
        Mockito.when(this.innerStoreMock.backwardFetchAll(1L, 1L)).thenReturn(KeyValueIterators.emptyIterator());
        this.store.init(this.context, this.store);
        this.store.backwardFetchAll(Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L)).close();
        MatcherAssert.assertThat((Double) metric("fetch-rate").metricValue(), Matchers.greaterThan(Double.valueOf(0.0d)));
    }

    @Test
    public void shouldRecordFlushLatency() {
        ((WindowStore) Mockito.doNothing().when(this.innerStoreMock)).flush();
        this.store.init(this.context, this.store);
        this.store.flush();
        Assertions.assertTrue(((Double) metric("flush-rate").metricValue()).doubleValue() > 0.0d);
    }

    @Test
    public void shouldNotThrowNullPointerExceptionIfFetchReturnsNull() {
        Mockito.when((byte[]) this.innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 0L)).thenReturn((Object) null);
        this.store.init(this.context, this.store);
        Assertions.assertNull(this.store.fetch("a", 0L));
    }

    @Test
    public void shouldSetFlushListenerOnWrappedCachingStore() {
        CachedWindowStore cachedWindowStore = (CachedWindowStore) Mockito.mock(CachedWindowStore.class);
        Mockito.when(Boolean.valueOf(cachedWindowStore.setFlushListener((CacheFlushListener) ArgumentMatchers.any(CacheFlushListener.class), ArgumentMatchers.eq(false)))).thenReturn(true);
        Assertions.assertTrue(new MeteredWindowStore(cachedWindowStore, 10L, STORE_TYPE, new MockTime(), Serdes.String(), new SerdeThatDoesntHandleNull()).setFlushListener((CacheFlushListener) null, false));
    }

    @Test
    public void shouldNotSetFlushListenerOnWrappedNoneCachingStore() {
        Assertions.assertFalse(this.store.setFlushListener((CacheFlushListener) null, false));
    }

    @Test
    public void shouldCloseUnderlyingStore() {
        ((WindowStore) Mockito.doNothing().when(this.innerStoreMock)).close();
        this.store.init(this.context, this.store);
        this.store.close();
    }

    @Test
    public void shouldRemoveMetricsOnClose() {
        ((WindowStore) Mockito.doNothing().when(this.innerStoreMock)).close();
        this.store.init(this.context, this.store);
        MatcherAssert.assertThat(storeMetrics(), Matchers.not(Matchers.empty()));
        this.store.close();
        MatcherAssert.assertThat(storeMetrics(), Matchers.empty());
    }

    @Test
    public void shouldRemoveMetricsEvenIfWrappedStoreThrowsOnClose() {
        ((WindowStore) Mockito.doThrow(new Throwable[]{new RuntimeException("Oops!")}).when(this.innerStoreMock)).close();
        this.store.init(this.context, this.store);
        MatcherAssert.assertThat(storeMetrics(), Matchers.not(Matchers.empty()));
        MeteredWindowStore<String, String> meteredWindowStore = this.store;
        Objects.requireNonNull(meteredWindowStore);
        Assertions.assertThrows(RuntimeException.class, meteredWindowStore::close);
        MatcherAssert.assertThat(storeMetrics(), Matchers.empty());
    }

    @Test
    public void shouldThrowNullPointerOnPutIfKeyIsNull() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.store.put((Object) null, "a", 1L);
        });
    }

    @Test
    public void shouldThrowNullPointerOnFetchIfKeyIsNull() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.store.fetch((Object) null, 0L, 1L);
        });
    }

    @Test
    public void shouldThrowNullPointerOnBackwardFetchIfKeyIsNull() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.store.backwardFetch((Object) null, 0L, 1L);
        });
    }

    @Test
    public void shouldTrackOpenIteratorsMetric() {
        Mockito.when(this.innerStoreMock.all()).thenReturn(KeyValueIterators.emptyIterator());
        this.store.init(this.context, this.store);
        KafkaMetric metric = metric("num-open-iterators");
        MatcherAssert.assertThat(metric, Matchers.not(CoreMatchers.nullValue()));
        MatcherAssert.assertThat((Long) metric.metricValue(), CoreMatchers.equalTo(0L));
        KeyValueIterator all = this.store.all();
        try {
            MatcherAssert.assertThat((Long) metric.metricValue(), CoreMatchers.equalTo(1L));
            if (all != null) {
                all.close();
            }
            MatcherAssert.assertThat((Long) metric.metricValue(), CoreMatchers.equalTo(0L));
        } catch (Throwable th) {
            if (all != null) {
                try {
                    all.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void shouldTimeIteratorDuration() {
        Mockito.when(this.innerStoreMock.all()).thenReturn(KeyValueIterators.emptyIterator());
        this.store.init(this.context, this.store);
        KafkaMetric metric = metric("iterator-duration-avg");
        KafkaMetric metric2 = metric("iterator-duration-max");
        MatcherAssert.assertThat(metric, Matchers.not(CoreMatchers.nullValue()));
        MatcherAssert.assertThat(metric2, Matchers.not(CoreMatchers.nullValue()));
        MatcherAssert.assertThat((Double) metric.metricValue(), CoreMatchers.equalTo(Double.valueOf(Double.NaN)));
        MatcherAssert.assertThat((Double) metric2.metricValue(), CoreMatchers.equalTo(Double.valueOf(Double.NaN)));
        KeyValueIterator all = this.store.all();
        try {
            this.mockTime.sleep(2L);
            if (all != null) {
                all.close();
            }
            MatcherAssert.assertThat(Double.valueOf(((Double) metric.metricValue()).doubleValue()), CoreMatchers.equalTo(Double.valueOf(2.0d * TimeUnit.MILLISECONDS.toNanos(1L))));
            MatcherAssert.assertThat(Double.valueOf(((Double) metric2.metricValue()).doubleValue()), CoreMatchers.equalTo(Double.valueOf(2.0d * TimeUnit.MILLISECONDS.toNanos(1L))));
            all = this.store.all();
            try {
                this.mockTime.sleep(3L);
                if (all != null) {
                    all.close();
                }
                MatcherAssert.assertThat(Double.valueOf(((Double) metric.metricValue()).doubleValue()), CoreMatchers.equalTo(Double.valueOf(2.5d * TimeUnit.MILLISECONDS.toNanos(1L))));
                MatcherAssert.assertThat(Double.valueOf(((Double) metric2.metricValue()).doubleValue()), CoreMatchers.equalTo(Double.valueOf(3.0d * TimeUnit.MILLISECONDS.toNanos(1L))));
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void shouldTrackOldestOpenIteratorTimestamp() {
        Mockito.when(this.innerStoreMock.all()).thenReturn(KeyValueIterators.emptyIterator());
        this.store.init(this.context, this.store);
        KafkaMetric metric = metric("oldest-iterator-open-since-ms");
        MatcherAssert.assertThat(metric, Matchers.not(CoreMatchers.nullValue()));
        MatcherAssert.assertThat(metric.metricValue(), CoreMatchers.nullValue());
        KeyValueIterator keyValueIterator = null;
        try {
            KeyValueIterator all = this.store.all();
            try {
                long milliseconds = this.mockTime.milliseconds();
                MatcherAssert.assertThat((Long) metric.metricValue(), CoreMatchers.equalTo(Long.valueOf(milliseconds)));
                this.mockTime.sleep(100L);
                keyValueIterator = this.store.all();
                long milliseconds2 = this.mockTime.milliseconds();
                MatcherAssert.assertThat((Long) metric.metricValue(), CoreMatchers.equalTo(Long.valueOf(milliseconds)));
                this.mockTime.sleep(100L);
                if (all != null) {
                    all.close();
                }
                MatcherAssert.assertThat((Long) metric.metricValue(), CoreMatchers.equalTo(Long.valueOf(milliseconds2)));
                if (keyValueIterator != null) {
                    keyValueIterator.close();
                }
                MatcherAssert.assertThat((Integer) metric.metricValue(), CoreMatchers.nullValue());
            } finally {
            }
        } catch (Throwable th) {
            if (keyValueIterator != null) {
                keyValueIterator.close();
            }
            throw th;
        }
    }

    private KafkaMetric metric(String str) {
        return this.metrics.metric(new MetricName(str, STORE_LEVEL_GROUP, "", this.tags));
    }

    private List<MetricName> storeMetrics() {
        return (List) this.metrics.metrics().keySet().stream().filter(metricName -> {
            return metricName.group().equals(STORE_LEVEL_GROUP) && metricName.tags().equals(this.tags);
        }).collect(Collectors.toList());
    }
}
