package org.apache.kafka.streams.state.internals.metrics;

import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Stream;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.RocksDBStore;
import org.apache.kafka.streams.state.internals.RocksDBTimestampedStore;
import org.apache.kafka.test.MockInternalProcessorContext;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.rocksdb.Cache;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/metrics/RocksDBBlockCacheMetricsTest.class */
public class RocksDBBlockCacheMetricsTest {
    private static final String STORE_NAME = "test";
    private static final String METRICS_SCOPE = "test-scope";
    private static final TaskId TASK_ID = new TaskId(0, 0);

    public static Stream<Arguments> stores() {
        File tempDirectory = TestUtils.tempDirectory("state");
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{new RocksDBStore(STORE_NAME, METRICS_SCOPE), new MockInternalProcessorContext(new Properties(), TASK_ID, tempDirectory)}), Arguments.of(new Object[]{new RocksDBTimestampedStore(STORE_NAME, METRICS_SCOPE), new MockInternalProcessorContext(new Properties(), TASK_ID, tempDirectory)})});
    }

    static void withStore(RocksDBStore rocksDBStore, StateStoreContext stateStoreContext, Runnable runnable) {
        rocksDBStore.init(stateStoreContext, rocksDBStore);
        try {
            runnable.run();
            rocksDBStore.close();
            try {
                Utils.delete(stateStoreContext.stateDir());
            } catch (IOException e) {
            }
        } catch (Throwable th) {
            rocksDBStore.close();
            try {
                Utils.delete(stateStoreContext.stateDir());
            } catch (IOException e2) {
            }
            throw th;
        }
    }

    @MethodSource({"stores"})
    @ParameterizedTest
    public void shouldRecordCorrectBlockCacheCapacity(RocksDBStore rocksDBStore, StateStoreContext stateStoreContext) {
        withStore(rocksDBStore, stateStoreContext, () -> {
            assertMetric(stateStoreContext, "stream-state-metrics", "block-cache-capacity", BigInteger.valueOf(52428800L));
        });
    }

    @MethodSource({"stores"})
    @ParameterizedTest
    public void shouldRecordCorrectBlockCacheUsage(RocksDBStore rocksDBStore, StateStoreContext stateStoreContext) {
        withStore(rocksDBStore, stateStoreContext, () -> {
            Cache blockCache = rocksDBStore.getOptions().tableFormatConfig().blockCache();
            try {
                assertMetric(stateStoreContext, "stream-state-metrics", "block-cache-usage", BigInteger.valueOf(blockCache.getUsage()));
                if (blockCache != null) {
                    blockCache.close();
                }
            } catch (Throwable th) {
                if (blockCache != null) {
                    try {
                        blockCache.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        });
    }

    @MethodSource({"stores"})
    @ParameterizedTest
    public void shouldRecordCorrectBlockCachePinnedUsage(RocksDBStore rocksDBStore, StateStoreContext stateStoreContext) {
        withStore(rocksDBStore, stateStoreContext, () -> {
            Cache blockCache = rocksDBStore.getOptions().tableFormatConfig().blockCache();
            try {
                assertMetric(stateStoreContext, "stream-state-metrics", "block-cache-pinned-usage", BigInteger.valueOf(blockCache.getPinnedUsage()));
                if (blockCache != null) {
                    blockCache.close();
                }
            } catch (Throwable th) {
                if (blockCache != null) {
                    try {
                        blockCache.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        });
    }

    public <T> void assertMetric(StateStoreContext stateStoreContext, String str, String str2, T t) {
        StreamsMetricsImpl metricsImpl = ProcessorContextUtils.metricsImpl(stateStoreContext);
        Assertions.assertEquals(t, ((KafkaMetric) metricsImpl.metrics().get(metricsImpl.metricsRegistry().metricName(str2, str, "Ignored", storeLevelTagMap(TASK_ID.toString(), METRICS_SCOPE, STORE_NAME)))).metricValue(), String.format("Value for metric '%s-%s' was incorrect", str, str2));
    }

    public Map<String, String> threadLevelTagMap(String str) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("thread-id", str);
        return linkedHashMap;
    }

    public Map<String, String> taskLevelTagMap(String str, String str2) {
        Map<String, String> threadLevelTagMap = threadLevelTagMap(str);
        threadLevelTagMap.put("task-id", str2);
        return threadLevelTagMap;
    }

    public Map<String, String> storeLevelTagMap(String str, String str2, String str3) {
        Map<String, String> taskLevelTagMap = taskLevelTagMap(Thread.currentThread().getName(), str);
        taskLevelTagMap.put(str2 + "-state-id", str3);
        return taskLevelTagMap;
    }
}
