package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.class */
public class StreamThreadStateStoreProviderTest {
    private StreamTask taskOne;
    private StreamTask taskTwo;
    private StreamThreadStateStoreProvider provider;
    private StateDirectory stateDirectory;
    private File stateDir;
    private boolean storesAvailable;
    private final String topicName = "topic";

    @Before
    public void before() throws IOException {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.addSource("the-source", new String[]{"topic"});
        topologyBuilder.addProcessor("the-processor", new MockProcessorSupplier(), new String[]{"the-source"});
        topologyBuilder.addStateStore(Stores.create("kv-store").withStringKeys().withStringValues().inMemory().build(), new String[]{"the-processor"});
        topologyBuilder.addStateStore(Stores.create("window-store").withStringKeys().withStringValues().persistent().windowed(10L, 10L, 2, false).build(), new String[]{"the-processor"});
        Properties properties = new Properties();
        properties.put("application.id", "applicationId");
        properties.put("bootstrap.servers", "localhost:9092");
        this.stateDir = TestUtils.tempDirectory();
        String path = this.stateDir.getPath();
        properties.put("state.dir", path);
        StreamsConfig streamsConfig = new StreamsConfig(properties);
        MockClientSupplier mockClientSupplier = new MockClientSupplier();
        configureRestoreConsumer(mockClientSupplier, "applicationId-kv-store-changelog");
        configureRestoreConsumer(mockClientSupplier, "applicationId-window-store-changelog");
        topologyBuilder.setApplicationId("applicationId");
        ProcessorTopology build = topologyBuilder.build((Integer) null);
        final HashMap hashMap = new HashMap();
        this.stateDirectory = new StateDirectory("applicationId", path, new MockTime());
        this.taskOne = createStreamsTask("applicationId", streamsConfig, mockClientSupplier, build, new TaskId(0, 0));
        hashMap.put(new TaskId(0, 0), this.taskOne);
        this.taskTwo = createStreamsTask("applicationId", streamsConfig, mockClientSupplier, build, new TaskId(0, 1));
        hashMap.put(new TaskId(0, 1), this.taskTwo);
        this.storesAvailable = true;
        this.provider = new StreamThreadStateStoreProvider(new StreamThread(topologyBuilder, streamsConfig, mockClientSupplier, "applicationId", "clientId", UUID.randomUUID(), new Metrics(), Time.SYSTEM, new StreamsMetadataState(topologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0L) { // from class: org.apache.kafka.streams.state.internals.StreamThreadStateStoreProviderTest.1
            public Map<TaskId, StreamTask> tasks() {
                return hashMap;
            }

            public boolean isInitialized() {
                return StreamThreadStateStoreProviderTest.this.storesAvailable;
            }
        });
    }

    @After
    public void cleanUp() throws IOException {
        Utils.delete(this.stateDir);
    }

    @Test
    public void shouldFindKeyValueStores() throws Exception {
        Assert.assertEquals(2L, this.provider.stores("kv-store", QueryableStoreTypes.keyValueStore()).size());
    }

    @Test
    public void shouldFindWindowStores() throws Exception {
        Assert.assertEquals(2L, this.provider.stores("window-store", QueryableStoreTypes.windowStore()).size());
    }

    @Test(expected = InvalidStateStoreException.class)
    public void shouldThrowInvalidStoreExceptionIfWindowStoreClosed() throws Exception {
        this.taskOne.getStore("window-store").close();
        this.provider.stores("window-store", QueryableStoreTypes.windowStore());
    }

    @Test(expected = InvalidStateStoreException.class)
    public void shouldThrowInvalidStoreExceptionIfKVStoreClosed() throws Exception {
        this.taskOne.getStore("kv-store").close();
        this.provider.stores("kv-store", QueryableStoreTypes.keyValueStore());
    }

    @Test
    public void shouldReturnEmptyListIfNoStoresFoundWithName() throws Exception {
        Assert.assertEquals(Collections.emptyList(), this.provider.stores("not-a-store", QueryableStoreTypes.keyValueStore()));
    }

    @Test
    public void shouldReturnEmptyListIfStoreExistsButIsNotOfTypeValueStore() throws Exception {
        Assert.assertEquals(Collections.emptyList(), this.provider.stores("window-store", QueryableStoreTypes.keyValueStore()));
    }

    @Test(expected = InvalidStateStoreException.class)
    public void shouldThrowInvalidStoreExceptionIfNotAllStoresAvailable() throws Exception {
        this.storesAvailable = false;
        this.provider.stores("kv-store", QueryableStoreTypes.keyValueStore());
    }

    private StreamTask createStreamsTask(String str, StreamsConfig streamsConfig, MockClientSupplier mockClientSupplier, ProcessorTopology processorTopology, TaskId taskId) {
        return new StreamTask(taskId, str, Collections.singletonList(new TopicPartition("topic", taskId.partition)), processorTopology, mockClientSupplier.consumer, new StoreChangelogReader(mockClientSupplier.restoreConsumer, Time.SYSTEM, 5000L), streamsConfig, new MockStreamsMetrics(new Metrics()), this.stateDirectory, null, new MockTime(), mockClientSupplier.getProducer(new HashMap())) { // from class: org.apache.kafka.streams.state.internals.StreamThreadStateStoreProviderTest.2
            protected void updateOffsetLimits() {
            }
        };
    }

    private void configureRestoreConsumer(MockClientSupplier mockClientSupplier, String str) {
        mockClientSupplier.restoreConsumer.updatePartitions(str, Arrays.asList(new PartitionInfo(str, 0, (Node) null, (Node[]) null, (Node[]) null), new PartitionInfo(str, 1, (Node) null, (Node[]) null, (Node[]) null)));
        TopicPartition topicPartition = new TopicPartition(str, 0);
        TopicPartition topicPartition2 = new TopicPartition(str, 1);
        mockClientSupplier.restoreConsumer.assign(Arrays.asList(topicPartition, topicPartition2));
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, 0L);
        hashMap.put(topicPartition2, 0L);
        mockClientSupplier.restoreConsumer.updateBeginningOffsets(hashMap);
        mockClientSupplier.restoreConsumer.updateEndOffsets(hashMap);
    }
}
