package org.apache.kafka.streams.processor.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.PositionSerde;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@ExtendWith({MockitoExtension.class})
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.class */
public class ProcessorContextImplTest {
    private ProcessorContextImpl context;

    @Mock
    private RecordCollector recordCollector;

    @Mock
    private ProcessorStateManager stateManager;
    private static final long STREAM_TIME = 50;
    private static final String STORE_NAME = "underlying-store";
    private static final String REGISTERED_STORE_NAME = "registered-store";
    private boolean putWithTimestampExecuted;

    @Mock
    private KeyValueIterator<String, Long> rangeIter;

    @Mock
    private KeyValueIterator<String, ValueAndTimestamp<Long>> timestampedRangeIter;

    @Mock
    private KeyValueIterator<String, Long> allIter;

    @Mock
    private KeyValueIterator<String, ValueAndTimestamp<Long>> timestampedAllIter;

    @Mock
    private WindowStoreIterator windowStoreIter;
    private static final String KEY = "key";
    private static final Bytes KEY_BYTES = Bytes.wrap(KEY.getBytes());
    private static final long VALUE = 42;
    private static final byte[] VALUE_BYTES = String.valueOf(VALUE).getBytes();
    private static final long TIMESTAMP = 21;
    private static final ValueAndTimestamp<Long> VALUE_AND_TIMESTAMP = ValueAndTimestamp.make(Long.valueOf(VALUE), TIMESTAMP);
    private static final TopicPartition CHANGELOG_PARTITION = new TopicPartition("store-changelog", 1);
    private final StreamsConfig streamsConfig = streamsConfigMock();
    private boolean flushExecuted = false;
    private boolean putExecuted = false;
    private boolean putIfAbsentExecuted = false;
    private boolean putAllExecuted = false;
    private boolean deleteExecuted = false;
    private boolean removeExecuted = false;
    private final List<KeyValueIterator<Windowed<String>, Long>> iters = new ArrayList(7);
    private final List<KeyValueIterator<Windowed<String>, ValueAndTimestamp<Long>>> timestampedIters = new ArrayList(7);

    private void foreachSetUp() {
        for (int i = 0; i < 7; i++) {
            this.iters.add(i, (KeyValueIterator) Mockito.mock(KeyValueIterator.class));
            this.timestampedIters.add(i, (KeyValueIterator) Mockito.mock(KeyValueIterator.class));
        }
    }

    private ProcessorContextImpl getStandbyContext() {
        ProcessorStateManager processorStateManager = (ProcessorStateManager) Mockito.mock(ProcessorStateManager.class);
        Mockito.when(processorStateManager.taskType()).thenReturn(Task.TaskType.STANDBY);
        return buildProcessorContextImpl(this.streamsConfig, processorStateManager);
    }

    @Test
    public void globalKeyValueStoreShouldBeReadOnly() {
        foreachSetUp();
        Mockito.when(this.stateManager.taskType()).thenReturn(Task.TaskType.ACTIVE);
        Mockito.when(this.stateManager.globalStore(ArgumentMatchers.anyString())).thenReturn((Object) null);
        KeyValueStore keyValueStore = (KeyValueStore) Mockito.mock(KeyValueStore.class);
        Mockito.when(this.stateManager.globalStore("GlobalKeyValueStore")).thenAnswer(invocationOnMock -> {
            return keyValueStoreMock(keyValueStore);
        });
        this.context = buildProcessorContextImpl(this.streamsConfig, this.stateManager);
        this.context.transitionToActive((StreamTask) Mockito.mock(StreamTask.class), (RecordCollector) null, (ThreadCache) null);
        mockProcessorNodeWithLocalKeyValueStore();
        doTest("GlobalKeyValueStore", keyValueStore2 -> {
            verifyStoreCannotBeInitializedOrClosed(keyValueStore2);
            Objects.requireNonNull(keyValueStore2);
            checkThrowsUnsupportedOperation(keyValueStore2::flush, "flush()");
            checkThrowsUnsupportedOperation(() -> {
                keyValueStore2.put("1", 1L);
            }, "put()");
            checkThrowsUnsupportedOperation(() -> {
                keyValueStore2.putIfAbsent("1", 1L);
            }, "putIfAbsent()");
            checkThrowsUnsupportedOperation(() -> {
                keyValueStore2.putAll(Collections.emptyList());
            }, "putAll()");
            checkThrowsUnsupportedOperation(() -> {
                keyValueStore2.delete("1");
            }, "delete()");
            Assertions.assertEquals(Long.valueOf(VALUE), (Long) keyValueStore2.get(KEY));
            Assertions.assertEquals(this.rangeIter, keyValueStore2.range("one", "two"));
            Assertions.assertEquals(this.allIter, keyValueStore2.all());
            Assertions.assertEquals(VALUE, keyValueStore2.approximateNumEntries());
        });
    }

    @Test
    public void globalTimestampedKeyValueStoreShouldBeReadOnly() {
        foreachSetUp();
        Mockito.when(this.stateManager.taskType()).thenReturn(Task.TaskType.ACTIVE);
        Mockito.when(this.stateManager.globalStore(ArgumentMatchers.anyString())).thenReturn((Object) null);
        TimestampedKeyValueStore timestampedKeyValueStore = (TimestampedKeyValueStore) Mockito.mock(TimestampedKeyValueStore.class);
        Mockito.when(this.stateManager.globalStore("GlobalTimestampedKeyValueStore")).thenAnswer(invocationOnMock -> {
            return timestampedKeyValueStoreMock(timestampedKeyValueStore);
        });
        this.context = buildProcessorContextImpl(this.streamsConfig, this.stateManager);
        this.context.transitionToActive((StreamTask) Mockito.mock(StreamTask.class), (RecordCollector) null, (ThreadCache) null);
        mockProcessorNodeWithLocalKeyValueStore();
        doTest("GlobalTimestampedKeyValueStore", timestampedKeyValueStore2 -> {
            verifyStoreCannotBeInitializedOrClosed(timestampedKeyValueStore2);
            Objects.requireNonNull(timestampedKeyValueStore2);
            checkThrowsUnsupportedOperation(timestampedKeyValueStore2::flush, "flush()");
            checkThrowsUnsupportedOperation(() -> {
                timestampedKeyValueStore2.put("1", ValueAndTimestamp.make(1L, 2L));
            }, "put()");
            checkThrowsUnsupportedOperation(() -> {
                timestampedKeyValueStore2.putIfAbsent("1", ValueAndTimestamp.make(1L, 2L));
            }, "putIfAbsent()");
            checkThrowsUnsupportedOperation(() -> {
                timestampedKeyValueStore2.putAll(Collections.emptyList());
            }, "putAll()");
            checkThrowsUnsupportedOperation(() -> {
                timestampedKeyValueStore2.delete("1");
            }, "delete()");
            Assertions.assertEquals(VALUE_AND_TIMESTAMP, timestampedKeyValueStore2.get(KEY));
            Assertions.assertEquals(this.timestampedRangeIter, timestampedKeyValueStore2.range("one", "two"));
            Assertions.assertEquals(this.timestampedAllIter, timestampedKeyValueStore2.all());
            Assertions.assertEquals(VALUE, timestampedKeyValueStore2.approximateNumEntries());
        });
    }

    @Test
    public void globalWindowStoreShouldBeReadOnly() {
        foreachSetUp();
        Mockito.when(this.stateManager.taskType()).thenReturn(Task.TaskType.ACTIVE);
        Mockito.when(this.stateManager.globalStore(ArgumentMatchers.anyString())).thenReturn((Object) null);
        WindowStore windowStore = (WindowStore) Mockito.mock(WindowStore.class);
        Mockito.when(this.stateManager.globalStore("GlobalWindowStore")).thenAnswer(invocationOnMock -> {
            return windowStoreMock(windowStore);
        });
        this.context = buildProcessorContextImpl(this.streamsConfig, this.stateManager);
        this.context.transitionToActive((StreamTask) Mockito.mock(StreamTask.class), (RecordCollector) null, (ThreadCache) null);
        mockProcessorNodeWithLocalKeyValueStore();
        doTest("GlobalWindowStore", windowStore2 -> {
            verifyStoreCannotBeInitializedOrClosed(windowStore2);
            Objects.requireNonNull(windowStore2);
            checkThrowsUnsupportedOperation(windowStore2::flush, "flush()");
            checkThrowsUnsupportedOperation(() -> {
                windowStore2.put("1", 1L, 1L);
            }, "put()");
            Assertions.assertEquals(this.iters.get(0), windowStore2.fetchAll(0L, 0L));
            Assertions.assertEquals(this.windowStoreIter, windowStore2.fetch(KEY, 0L, 1L));
            Assertions.assertEquals(this.iters.get(1), windowStore2.fetch(KEY, KEY, 0L, 1L));
            Assertions.assertEquals(Long.valueOf(VALUE), (Long) windowStore2.fetch(KEY, 1L));
            Assertions.assertEquals(this.iters.get(2), windowStore2.all());
        });
    }

    @Test
    public void globalTimestampedWindowStoreShouldBeReadOnly() {
        foreachSetUp();
        Mockito.when(this.stateManager.taskType()).thenReturn(Task.TaskType.ACTIVE);
        Mockito.when(this.stateManager.globalStore(ArgumentMatchers.anyString())).thenReturn((Object) null);
        TimestampedWindowStore timestampedWindowStore = (TimestampedWindowStore) Mockito.mock(TimestampedWindowStore.class);
        Mockito.when(this.stateManager.globalStore("GlobalTimestampedWindowStore")).thenAnswer(invocationOnMock -> {
            return timestampedWindowStoreMock(timestampedWindowStore);
        });
        this.context = buildProcessorContextImpl(this.streamsConfig, this.stateManager);
        this.context.transitionToActive((StreamTask) Mockito.mock(StreamTask.class), (RecordCollector) null, (ThreadCache) null);
        mockProcessorNodeWithLocalKeyValueStore();
        doTest("GlobalTimestampedWindowStore", timestampedWindowStore2 -> {
            verifyStoreCannotBeInitializedOrClosed(timestampedWindowStore2);
            Objects.requireNonNull(timestampedWindowStore2);
            checkThrowsUnsupportedOperation(timestampedWindowStore2::flush, "flush()");
            checkThrowsUnsupportedOperation(() -> {
                timestampedWindowStore2.put("1", ValueAndTimestamp.make(1L, 1L), 1L);
            }, "put() [with timestamp]");
            Assertions.assertEquals(this.timestampedIters.get(0), timestampedWindowStore2.fetchAll(0L, 0L));
            Assertions.assertEquals(this.windowStoreIter, timestampedWindowStore2.fetch(KEY, 0L, 1L));
            Assertions.assertEquals(this.timestampedIters.get(1), timestampedWindowStore2.fetch(KEY, KEY, 0L, 1L));
            Assertions.assertEquals(VALUE_AND_TIMESTAMP, timestampedWindowStore2.fetch(KEY, 1L));
            Assertions.assertEquals(this.timestampedIters.get(2), timestampedWindowStore2.all());
        });
    }

    @Test
    public void globalSessionStoreShouldBeReadOnly() {
        foreachSetUp();
        Mockito.when(this.stateManager.taskType()).thenReturn(Task.TaskType.ACTIVE);
        Mockito.when(this.stateManager.globalStore(ArgumentMatchers.anyString())).thenReturn((Object) null);
        SessionStore sessionStore = (SessionStore) Mockito.mock(SessionStore.class);
        Mockito.when(this.stateManager.globalStore("GlobalSessionStore")).thenAnswer(invocationOnMock -> {
            return sessionStoreMock(sessionStore);
        });
        this.context = buildProcessorContextImpl(this.streamsConfig, this.stateManager);
        this.context.transitionToActive((StreamTask) Mockito.mock(StreamTask.class), (RecordCollector) null, (ThreadCache) null);
        mockProcessorNodeWithLocalKeyValueStore();
        doTest("GlobalSessionStore", sessionStore2 -> {
            verifyStoreCannotBeInitializedOrClosed(sessionStore2);
            Objects.requireNonNull(sessionStore2);
            checkThrowsUnsupportedOperation(sessionStore2::flush, "flush()");
            checkThrowsUnsupportedOperation(() -> {
                sessionStore2.remove((Windowed) null);
            }, "remove()");
            checkThrowsUnsupportedOperation(() -> {
                sessionStore2.put((Windowed) null, (Object) null);
            }, "put()");
            Assertions.assertEquals(this.iters.get(3), sessionStore2.findSessions(KEY, 1L, 2L));
            Assertions.assertEquals(this.iters.get(4), sessionStore2.findSessions(KEY, KEY, 1L, 2L));
            Assertions.assertEquals(this.iters.get(5), sessionStore2.fetch(KEY));
            Assertions.assertEquals(this.iters.get(6), sessionStore2.fetch(KEY, KEY));
        });
    }

    @Test
    public void localKeyValueStoreShouldNotAllowInitOrClose() {
        foreachSetUp();
        Mockito.when(this.stateManager.taskType()).thenReturn(Task.TaskType.ACTIVE);
        Mockito.when(this.stateManager.globalStore(ArgumentMatchers.anyString())).thenReturn((Object) null);
        KeyValueStore keyValueStore = (KeyValueStore) Mockito.mock(KeyValueStore.class);
        Mockito.when(this.stateManager.store("LocalKeyValueStore")).thenAnswer(invocationOnMock -> {
            return keyValueStoreMock(keyValueStore);
        });
        mockStateStoreFlush(keyValueStore);
        mockKeyValueStoreOperation(keyValueStore);
        this.context = buildProcessorContextImpl(this.streamsConfig, this.stateManager);
        this.context.transitionToActive((StreamTask) Mockito.mock(StreamTask.class), (RecordCollector) null, (ThreadCache) null);
        mockProcessorNodeWithLocalKeyValueStore();
        doTest("LocalKeyValueStore", keyValueStore2 -> {
            verifyStoreCannotBeInitializedOrClosed(keyValueStore2);
            keyValueStore2.flush();
            Assertions.assertTrue(this.flushExecuted);
            keyValueStore2.put("1", 1L);
            Assertions.assertTrue(this.putExecuted);
            keyValueStore2.putIfAbsent("1", 1L);
            Assertions.assertTrue(this.putIfAbsentExecuted);
            keyValueStore2.putAll(Collections.emptyList());
            Assertions.assertTrue(this.putAllExecuted);
            keyValueStore2.delete("1");
            Assertions.assertTrue(this.deleteExecuted);
            Assertions.assertEquals(Long.valueOf(VALUE), (Long) keyValueStore2.get(KEY));
            Assertions.assertEquals(this.rangeIter, keyValueStore2.range("one", "two"));
            Assertions.assertEquals(this.allIter, keyValueStore2.all());
            Assertions.assertEquals(VALUE, keyValueStore2.approximateNumEntries());
        });
    }

    @Test
    public void localTimestampedKeyValueStoreShouldNotAllowInitOrClose() {
        foreachSetUp();
        Mockito.when(this.stateManager.taskType()).thenReturn(Task.TaskType.ACTIVE);
        Mockito.when(this.stateManager.globalStore(ArgumentMatchers.anyString())).thenReturn((Object) null);
        TimestampedKeyValueStore<String, Long> timestampedKeyValueStore = (TimestampedKeyValueStore) Mockito.mock(TimestampedKeyValueStore.class);
        Mockito.when(this.stateManager.store("LocalTimestampedKeyValueStore")).thenAnswer(invocationOnMock -> {
            return timestampedKeyValueStoreMock(timestampedKeyValueStore);
        });
        mockTimestampedKeyValueOperation(timestampedKeyValueStore);
        mockStateStoreFlush(timestampedKeyValueStore);
        this.context = buildProcessorContextImpl(this.streamsConfig, this.stateManager);
        this.context.transitionToActive((StreamTask) Mockito.mock(StreamTask.class), (RecordCollector) null, (ThreadCache) null);
        mockProcessorNodeWithLocalKeyValueStore();
        doTest("LocalTimestampedKeyValueStore", timestampedKeyValueStore2 -> {
            verifyStoreCannotBeInitializedOrClosed(timestampedKeyValueStore2);
            timestampedKeyValueStore2.flush();
            Assertions.assertTrue(this.flushExecuted);
            timestampedKeyValueStore2.put("1", ValueAndTimestamp.make(1L, 2L));
            Assertions.assertTrue(this.putExecuted);
            timestampedKeyValueStore2.putIfAbsent("1", ValueAndTimestamp.make(1L, 2L));
            Assertions.assertTrue(this.putIfAbsentExecuted);
            timestampedKeyValueStore2.putAll(Collections.emptyList());
            Assertions.assertTrue(this.putAllExecuted);
            timestampedKeyValueStore2.delete("1");
            Assertions.assertTrue(this.deleteExecuted);
            Assertions.assertEquals(VALUE_AND_TIMESTAMP, timestampedKeyValueStore2.get(KEY));
            Assertions.assertEquals(this.timestampedRangeIter, timestampedKeyValueStore2.range("one", "two"));
            Assertions.assertEquals(this.timestampedAllIter, timestampedKeyValueStore2.all());
            Assertions.assertEquals(VALUE, timestampedKeyValueStore2.approximateNumEntries());
        });
    }

    @Test
    public void localWindowStoreShouldNotAllowInitOrClose() {
        foreachSetUp();
        Mockito.when(this.stateManager.taskType()).thenReturn(Task.TaskType.ACTIVE);
        Mockito.when(this.stateManager.globalStore(ArgumentMatchers.anyString())).thenReturn((Object) null);
        WindowStore windowStore = (WindowStore) Mockito.mock(WindowStore.class);
        Mockito.when(this.stateManager.store("LocalWindowStore")).thenAnswer(invocationOnMock -> {
            return windowStoreMock(windowStore);
        });
        mockStateStoreFlush(windowStore);
        ((WindowStore) Mockito.doAnswer(invocationOnMock2 -> {
            this.putExecuted = true;
            return null;
        }).when(windowStore)).put(ArgumentMatchers.anyString(), Long.valueOf(ArgumentMatchers.anyLong()), ArgumentMatchers.anyLong());
        this.context = buildProcessorContextImpl(this.streamsConfig, this.stateManager);
        this.context.transitionToActive((StreamTask) Mockito.mock(StreamTask.class), (RecordCollector) null, (ThreadCache) null);
        mockProcessorNodeWithLocalKeyValueStore();
        doTest("LocalWindowStore", windowStore2 -> {
            verifyStoreCannotBeInitializedOrClosed(windowStore2);
            windowStore2.flush();
            Assertions.assertTrue(this.flushExecuted);
            windowStore2.put("1", 1L, 1L);
            Assertions.assertTrue(this.putExecuted);
            Assertions.assertEquals(this.iters.get(0), windowStore2.fetchAll(0L, 0L));
            Assertions.assertEquals(this.windowStoreIter, windowStore2.fetch(KEY, 0L, 1L));
            Assertions.assertEquals(this.iters.get(1), windowStore2.fetch(KEY, KEY, 0L, 1L));
            Assertions.assertEquals(Long.valueOf(VALUE), (Long) windowStore2.fetch(KEY, 1L));
            Assertions.assertEquals(this.iters.get(2), windowStore2.all());
        });
    }

    @Test
    public void localTimestampedWindowStoreShouldNotAllowInitOrClose() {
        foreachSetUp();
        Mockito.when(this.stateManager.taskType()).thenReturn(Task.TaskType.ACTIVE);
        Mockito.when(this.stateManager.globalStore(ArgumentMatchers.anyString())).thenReturn((Object) null);
        TimestampedWindowStore timestampedWindowStore = (TimestampedWindowStore) Mockito.mock(TimestampedWindowStore.class);
        Mockito.when(this.stateManager.store("LocalTimestampedWindowStore")).thenAnswer(invocationOnMock -> {
            return timestampedWindowStoreMock(timestampedWindowStore);
        });
        mockStateStoreFlush(timestampedWindowStore);
        ((TimestampedWindowStore) Mockito.doAnswer(invocationOnMock2 -> {
            this.putExecuted = true;
            return null;
        }).doAnswer(invocationOnMock3 -> {
            this.putWithTimestampExecuted = true;
            return null;
        }).when(timestampedWindowStore)).put(ArgumentMatchers.anyString(), (ValueAndTimestamp) ArgumentMatchers.any(ValueAndTimestamp.class), ArgumentMatchers.anyLong());
        this.context = buildProcessorContextImpl(this.streamsConfig, this.stateManager);
        this.context.transitionToActive((StreamTask) Mockito.mock(StreamTask.class), (RecordCollector) null, (ThreadCache) null);
        mockProcessorNodeWithLocalKeyValueStore();
        doTest("LocalTimestampedWindowStore", timestampedWindowStore2 -> {
            verifyStoreCannotBeInitializedOrClosed(timestampedWindowStore2);
            timestampedWindowStore2.flush();
            Assertions.assertTrue(this.flushExecuted);
            timestampedWindowStore2.put("1", ValueAndTimestamp.make(1L, 1L), 1L);
            Assertions.assertTrue(this.putExecuted);
            timestampedWindowStore2.put("1", ValueAndTimestamp.make(1L, 1L), 1L);
            Assertions.assertTrue(this.putWithTimestampExecuted);
            Assertions.assertEquals(this.timestampedIters.get(0), timestampedWindowStore2.fetchAll(0L, 0L));
            Assertions.assertEquals(this.windowStoreIter, timestampedWindowStore2.fetch(KEY, 0L, 1L));
            Assertions.assertEquals(this.timestampedIters.get(1), timestampedWindowStore2.fetch(KEY, KEY, 0L, 1L));
            Assertions.assertEquals(VALUE_AND_TIMESTAMP, timestampedWindowStore2.fetch(KEY, 1L));
            Assertions.assertEquals(this.timestampedIters.get(2), timestampedWindowStore2.all());
        });
    }

    @Test
    public void localSessionStoreShouldNotAllowInitOrClose() {
        foreachSetUp();
        Mockito.when(this.stateManager.taskType()).thenReturn(Task.TaskType.ACTIVE);
        Mockito.when(this.stateManager.globalStore(ArgumentMatchers.anyString())).thenReturn((Object) null);
        SessionStore sessionStore = (SessionStore) Mockito.mock(SessionStore.class);
        Mockito.when(this.stateManager.store("LocalSessionStore")).thenAnswer(invocationOnMock -> {
            return sessionStoreMock(sessionStore);
        });
        mockStateStoreFlush(sessionStore);
        ((SessionStore) Mockito.doAnswer(invocationOnMock2 -> {
            this.putExecuted = true;
            return null;
        }).when(sessionStore)).put((Windowed) ArgumentMatchers.any(), (Long) ArgumentMatchers.any());
        ((SessionStore) Mockito.doAnswer(invocationOnMock3 -> {
            this.removeExecuted = true;
            return null;
        }).when(sessionStore)).remove((Windowed) ArgumentMatchers.any());
        this.context = buildProcessorContextImpl(this.streamsConfig, this.stateManager);
        this.context.transitionToActive((StreamTask) Mockito.mock(StreamTask.class), (RecordCollector) null, (ThreadCache) null);
        mockProcessorNodeWithLocalKeyValueStore();
        doTest("LocalSessionStore", sessionStore2 -> {
            verifyStoreCannotBeInitializedOrClosed(sessionStore2);
            sessionStore2.flush();
            Assertions.assertTrue(this.flushExecuted);
            sessionStore2.remove((Windowed) null);
            Assertions.assertTrue(this.removeExecuted);
            sessionStore2.put((Windowed) null, (Object) null);
            Assertions.assertTrue(this.putExecuted);
            Assertions.assertEquals(this.iters.get(3), sessionStore2.findSessions(KEY, 1L, 2L));
            Assertions.assertEquals(this.iters.get(4), sessionStore2.findSessions(KEY, KEY, 1L, 2L));
            Assertions.assertEquals(this.iters.get(5), sessionStore2.fetch(KEY));
            Assertions.assertEquals(this.iters.get(6), sessionStore2.fetch(KEY, KEY));
        });
    }

    @Test
    public void shouldNotSendRecordHeadersToChangelogTopic() {
        foreachSetUp();
        Mockito.when(this.stateManager.taskType()).thenReturn(Task.TaskType.ACTIVE);
        Mockito.when(this.stateManager.registeredChangelogPartitionFor(REGISTERED_STORE_NAME)).thenReturn(CHANGELOG_PARTITION);
        this.context = buildProcessorContextImpl(this.streamsConfig, this.stateManager);
        this.context.transitionToActive((StreamTask) Mockito.mock(StreamTask.class), (RecordCollector) null, (ThreadCache) null);
        mockProcessorNodeWithLocalKeyValueStore();
        this.context.transitionToActive((StreamTask) Mockito.mock(StreamTask.class), this.recordCollector, (ThreadCache) null);
        this.context.logChange(REGISTERED_STORE_NAME, KEY_BYTES, VALUE_BYTES, TIMESTAMP, Position.emptyPosition());
        ((RecordCollector) Mockito.verify(this.recordCollector)).send(CHANGELOG_PARTITION.topic(), KEY_BYTES, VALUE_BYTES, (Headers) null, Integer.valueOf(CHANGELOG_PARTITION.partition()), Long.valueOf(TIMESTAMP), ProcessorContextImpl.BYTES_KEY_SERIALIZER, ProcessorContextImpl.BYTEARRAY_VALUE_SERIALIZER, (String) null, (InternalProcessorContext) null);
    }

    @Test
    public void shouldSendRecordHeadersToChangelogTopicWhenConsistencyEnabled() {
        foreachSetUp();
        Mockito.when(this.stateManager.taskType()).thenReturn(Task.TaskType.ACTIVE);
        Mockito.when(this.stateManager.registeredChangelogPartitionFor(REGISTERED_STORE_NAME)).thenReturn(CHANGELOG_PARTITION);
        this.context = buildProcessorContextImpl(this.streamsConfig, this.stateManager);
        this.context.transitionToActive((StreamTask) Mockito.mock(StreamTask.class), (RecordCollector) null, (ThreadCache) null);
        mockProcessorNodeWithLocalKeyValueStore();
        Position emptyPosition = Position.emptyPosition();
        RecordHeaders recordHeaders = new RecordHeaders();
        recordHeaders.add(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY);
        recordHeaders.add(new RecordHeader("c", PositionSerde.serialize(emptyPosition).array()));
        StreamTask streamTask = (StreamTask) Mockito.mock(StreamTask.class);
        this.context = buildProcessorContextImpl(streamsConfigWithConsistencyMock(), this.stateManager);
        this.context.transitionToActive(streamTask, this.recordCollector, (ThreadCache) null);
        this.context.logChange(REGISTERED_STORE_NAME, KEY_BYTES, VALUE_BYTES, TIMESTAMP, emptyPosition);
        ((RecordCollector) Mockito.verify(this.recordCollector)).send(CHANGELOG_PARTITION.topic(), KEY_BYTES, VALUE_BYTES, recordHeaders, Integer.valueOf(CHANGELOG_PARTITION.partition()), Long.valueOf(TIMESTAMP), ProcessorContextImpl.BYTES_KEY_SERIALIZER, ProcessorContextImpl.BYTEARRAY_VALUE_SERIALIZER, (String) null, (InternalProcessorContext) null);
    }

    @Test
    public void shouldThrowUnsupportedOperationExceptionOnLogChange() {
        foreachSetUp();
        Mockito.when(this.stateManager.taskType()).thenReturn(Task.TaskType.ACTIVE);
        this.context = buildProcessorContextImpl(this.streamsConfig, this.stateManager);
        this.context.transitionToActive((StreamTask) Mockito.mock(StreamTask.class), (RecordCollector) null, (ThreadCache) null);
        mockProcessorNodeWithLocalKeyValueStore();
        this.context = getStandbyContext();
        Assertions.assertThrows(UnsupportedOperationException.class, () -> {
            this.context.logChange("Store", Bytes.wrap("k".getBytes()), (byte[]) null, 0L, Position.emptyPosition());
        });
    }

    @Test
    public void shouldThrowUnsupportedOperationExceptionOnGetStateStore() {
        foreachSetUp();
        Mockito.when(this.stateManager.taskType()).thenReturn(Task.TaskType.ACTIVE);
        this.context = buildProcessorContextImpl(this.streamsConfig, this.stateManager);
        this.context.transitionToActive((StreamTask) Mockito.mock(StreamTask.class), (RecordCollector) null, (ThreadCache) null);
        mockProcessorNodeWithLocalKeyValueStore();
        this.context = getStandbyContext();
        Assertions.assertThrows(UnsupportedOperationException.class, () -> {
            this.context.getStateStore("store");
        });
    }

    @Test
    public void shouldThrowUnsupportedOperationExceptionOnForward() {
        foreachSetUp();
        Mockito.when(this.stateManager.taskType()).thenReturn(Task.TaskType.ACTIVE);
        this.context = buildProcessorContextImpl(this.streamsConfig, this.stateManager);
        this.context.transitionToActive((StreamTask) Mockito.mock(StreamTask.class), (RecordCollector) null, (ThreadCache) null);
        mockProcessorNodeWithLocalKeyValueStore();
        this.context = getStandbyContext();
        Assertions.assertThrows(UnsupportedOperationException.class, () -> {
            this.context.forward(KEY, "value");
        });
    }

    @Test
    public void shouldThrowUnsupportedOperationExceptionOnForwardWithTo() {
        foreachSetUp();
        Mockito.when(this.stateManager.taskType()).thenReturn(Task.TaskType.ACTIVE);
        this.context = buildProcessorContextImpl(this.streamsConfig, this.stateManager);
        this.context.transitionToActive((StreamTask) Mockito.mock(StreamTask.class), (RecordCollector) null, (ThreadCache) null);
        mockProcessorNodeWithLocalKeyValueStore();
        this.context = getStandbyContext();
        Assertions.assertThrows(UnsupportedOperationException.class, () -> {
            this.context.forward(KEY, "value", To.child("child-name"));
        });
    }

    @Test
    public void shouldThrowUnsupportedOperationExceptionOnCommit() {
        foreachSetUp();
        Mockito.when(this.stateManager.taskType()).thenReturn(Task.TaskType.ACTIVE);
        this.context = buildProcessorContextImpl(this.streamsConfig, this.stateManager);
        this.context.transitionToActive((StreamTask) Mockito.mock(StreamTask.class), (RecordCollector) null, (ThreadCache) null);
        mockProcessorNodeWithLocalKeyValueStore();
        this.context = getStandbyContext();
        Assertions.assertThrows(UnsupportedOperationException.class, () -> {
            this.context.commit();
        });
    }

    @Test
    public void shouldThrowUnsupportedOperationExceptionOnSchedule() {
        foreachSetUp();
        Mockito.when(this.stateManager.taskType()).thenReturn(Task.TaskType.ACTIVE);
        this.context = buildProcessorContextImpl(this.streamsConfig, this.stateManager);
        this.context.transitionToActive((StreamTask) Mockito.mock(StreamTask.class), (RecordCollector) null, (ThreadCache) null);
        mockProcessorNodeWithLocalKeyValueStore();
        this.context = getStandbyContext();
        Assertions.assertThrows(UnsupportedOperationException.class, () -> {
            this.context.schedule(Duration.ofMillis(100L), PunctuationType.STREAM_TIME, j -> {
            });
        });
    }

    @Test
    public void shouldThrowUnsupportedOperationExceptionOnTopic() {
        foreachSetUp();
        Mockito.when(this.stateManager.taskType()).thenReturn(Task.TaskType.ACTIVE);
        this.context = buildProcessorContextImpl(this.streamsConfig, this.stateManager);
        this.context.transitionToActive((StreamTask) Mockito.mock(StreamTask.class), (RecordCollector) null, (ThreadCache) null);
        mockProcessorNodeWithLocalKeyValueStore();
        this.context = getStandbyContext();
        Assertions.assertThrows(UnsupportedOperationException.class, () -> {
            this.context.topic();
        });
    }

    @Test
    public void shouldThrowUnsupportedOperationExceptionOnPartition() {
        foreachSetUp();
        Mockito.when(this.stateManager.taskType()).thenReturn(Task.TaskType.ACTIVE);
        this.context = buildProcessorContextImpl(this.streamsConfig, this.stateManager);
        this.context.transitionToActive((StreamTask) Mockito.mock(StreamTask.class), (RecordCollector) null, (ThreadCache) null);
        mockProcessorNodeWithLocalKeyValueStore();
        this.context = getStandbyContext();
        Assertions.assertThrows(UnsupportedOperationException.class, () -> {
            this.context.partition();
        });
    }

    @Test
    public void shouldThrowUnsupportedOperationExceptionOnOffset() {
        foreachSetUp();
        Mockito.when(this.stateManager.taskType()).thenReturn(Task.TaskType.ACTIVE);
        this.context = buildProcessorContextImpl(this.streamsConfig, this.stateManager);
        this.context.transitionToActive((StreamTask) Mockito.mock(StreamTask.class), (RecordCollector) null, (ThreadCache) null);
        mockProcessorNodeWithLocalKeyValueStore();
        this.context = getStandbyContext();
        Assertions.assertThrows(UnsupportedOperationException.class, () -> {
            this.context.offset();
        });
    }

    @Test
    public void shouldThrowUnsupportedOperationExceptionOnTimestamp() {
        foreachSetUp();
        Mockito.when(this.stateManager.taskType()).thenReturn(Task.TaskType.ACTIVE);
        this.context = buildProcessorContextImpl(this.streamsConfig, this.stateManager);
        this.context.transitionToActive((StreamTask) Mockito.mock(StreamTask.class), (RecordCollector) null, (ThreadCache) null);
        mockProcessorNodeWithLocalKeyValueStore();
        this.context = getStandbyContext();
        Assertions.assertThrows(UnsupportedOperationException.class, () -> {
            this.context.timestamp();
        });
    }

    @Test
    public void shouldThrowUnsupportedOperationExceptionOnCurrentNode() {
        foreachSetUp();
        Mockito.when(this.stateManager.taskType()).thenReturn(Task.TaskType.ACTIVE);
        this.context = buildProcessorContextImpl(this.streamsConfig, this.stateManager);
        this.context.transitionToActive((StreamTask) Mockito.mock(StreamTask.class), (RecordCollector) null, (ThreadCache) null);
        mockProcessorNodeWithLocalKeyValueStore();
        this.context = getStandbyContext();
        Assertions.assertThrows(UnsupportedOperationException.class, () -> {
            this.context.currentNode();
        });
    }

    @Test
    public void shouldThrowUnsupportedOperationExceptionOnSetRecordContext() {
        foreachSetUp();
        Mockito.when(this.stateManager.taskType()).thenReturn(Task.TaskType.ACTIVE);
        this.context = buildProcessorContextImpl(this.streamsConfig, this.stateManager);
        this.context.transitionToActive((StreamTask) Mockito.mock(StreamTask.class), (RecordCollector) null, (ThreadCache) null);
        mockProcessorNodeWithLocalKeyValueStore();
        this.context = getStandbyContext();
        Assertions.assertThrows(UnsupportedOperationException.class, () -> {
            this.context.setRecordContext((ProcessorRecordContext) Mockito.mock(ProcessorRecordContext.class));
        });
    }

    @Test
    public void shouldThrowUnsupportedOperationExceptionOnRecordContext() {
        foreachSetUp();
        Mockito.when(this.stateManager.taskType()).thenReturn(Task.TaskType.ACTIVE);
        this.context = buildProcessorContextImpl(this.streamsConfig, this.stateManager);
        this.context.transitionToActive((StreamTask) Mockito.mock(StreamTask.class), (RecordCollector) null, (ThreadCache) null);
        mockProcessorNodeWithLocalKeyValueStore();
        this.context = getStandbyContext();
        Assertions.assertThrows(UnsupportedOperationException.class, () -> {
            this.context.recordContext();
        });
    }

    @Test
    public void shouldMatchStreamTime() {
        foreachSetUp();
        Mockito.when(this.stateManager.taskType()).thenReturn(Task.TaskType.ACTIVE);
        this.context = buildProcessorContextImpl(this.streamsConfig, this.stateManager);
        StreamTask streamTask = (StreamTask) Mockito.mock(StreamTask.class);
        Mockito.when(Long.valueOf(streamTask.streamTime())).thenReturn(Long.valueOf(STREAM_TIME));
        this.context.transitionToActive(streamTask, (RecordCollector) null, (ThreadCache) null);
        mockProcessorNodeWithLocalKeyValueStore();
        Assertions.assertEquals(STREAM_TIME, this.context.currentStreamTimeMs());
    }

    @Test
    public void shouldAddAndGetProcessorKeyValue() {
        foreachSetUp();
        Mockito.when(this.stateManager.taskType()).thenReturn(Task.TaskType.ACTIVE);
        this.context = buildProcessorContextImpl(this.streamsConfig, this.stateManager);
        this.context.transitionToActive((StreamTask) Mockito.mock(StreamTask.class), (RecordCollector) null, (ThreadCache) null);
        mockProcessorNodeWithLocalKeyValueStore();
        this.context.addProcessorMetadataKeyValue("key1", 100L);
        Assertions.assertEquals(100L, this.context.processorMetadataForKey("key1").longValue());
        Assertions.assertNull(this.context.processorMetadataForKey("nokey"));
    }

    @Test
    public void shouldSetAndGetProcessorMetaData() {
        foreachSetUp();
        this.context = buildProcessorContextImpl(this.streamsConfig, this.stateManager);
        mockProcessorNodeWithLocalKeyValueStore();
        ProcessorMetadata processorMetadata = new ProcessorMetadata();
        this.context.setProcessorMetadata(processorMetadata);
        Assertions.assertEquals(processorMetadata, this.context.processorMetadata());
        this.context.setProcessorMetadata(new ProcessorMetadata(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("key1", 10L), Utils.mkEntry("key2", 100L)})));
        Assertions.assertEquals(10L, this.context.processorMetadataForKey("key1").longValue());
        Assertions.assertEquals(100L, this.context.processorMetadataForKey("key2").longValue());
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.context.setProcessorMetadata((ProcessorMetadata) null);
        });
    }

    private void mockProcessorNodeWithLocalKeyValueStore() {
        this.context.setCurrentNode(new ProcessorNode("fake", (Processor) null, new HashSet(Arrays.asList("LocalKeyValueStore", "LocalTimestampedKeyValueStore", "LocalWindowStore", "LocalTimestampedWindowStore", "LocalSessionStore"))));
    }

    private ProcessorContextImpl buildProcessorContextImpl(StreamsConfig streamsConfig, ProcessorStateManager processorStateManager) {
        return new ProcessorContextImpl((TaskId) Mockito.mock(TaskId.class), streamsConfig, processorStateManager, (StreamsMetricsImpl) Mockito.mock(StreamsMetricsImpl.class), (ThreadCache) Mockito.mock(ThreadCache.class));
    }

    private KeyValueStore<String, Long> keyValueStoreMock(KeyValueStore<String, Long> keyValueStore) {
        initStateStoreMock(keyValueStore);
        Mockito.when((Long) keyValueStore.get(KEY)).thenReturn(Long.valueOf(VALUE));
        Mockito.when(Long.valueOf(keyValueStore.approximateNumEntries())).thenReturn(Long.valueOf(VALUE));
        Mockito.when(keyValueStore.range("one", "two")).thenReturn(this.rangeIter);
        Mockito.when(keyValueStore.all()).thenReturn(this.allIter);
        return keyValueStore;
    }

    private void mockKeyValueStoreOperation(KeyValueStore<String, Long> keyValueStore) {
        ((KeyValueStore) Mockito.doAnswer(invocationOnMock -> {
            this.putExecuted = true;
            return null;
        }).when(keyValueStore)).put(ArgumentMatchers.anyString(), Long.valueOf(ArgumentMatchers.anyLong()));
        ((KeyValueStore) Mockito.doAnswer(invocationOnMock2 -> {
            this.putIfAbsentExecuted = true;
            return null;
        }).when(keyValueStore)).putIfAbsent(ArgumentMatchers.anyString(), Long.valueOf(ArgumentMatchers.anyLong()));
        ((KeyValueStore) Mockito.doAnswer(invocationOnMock3 -> {
            this.putAllExecuted = true;
            return null;
        }).when(keyValueStore)).putAll((List) ArgumentMatchers.any(List.class));
        ((KeyValueStore) Mockito.doAnswer(invocationOnMock4 -> {
            this.deleteExecuted = true;
            return null;
        }).when(keyValueStore)).delete(ArgumentMatchers.anyString());
    }

    private TimestampedKeyValueStore<String, Long> timestampedKeyValueStoreMock(TimestampedKeyValueStore<String, Long> timestampedKeyValueStore) {
        initStateStoreMock(timestampedKeyValueStore);
        Mockito.when((ValueAndTimestamp) timestampedKeyValueStore.get(KEY)).thenReturn(VALUE_AND_TIMESTAMP);
        Mockito.when(Long.valueOf(timestampedKeyValueStore.approximateNumEntries())).thenReturn(Long.valueOf(VALUE));
        Mockito.when(timestampedKeyValueStore.range("one", "two")).thenReturn(this.timestampedRangeIter);
        Mockito.when(timestampedKeyValueStore.all()).thenReturn(this.timestampedAllIter);
        return timestampedKeyValueStore;
    }

    private void mockTimestampedKeyValueOperation(TimestampedKeyValueStore<String, Long> timestampedKeyValueStore) {
        ((TimestampedKeyValueStore) Mockito.doAnswer(invocationOnMock -> {
            this.putExecuted = true;
            return null;
        }).when(timestampedKeyValueStore)).put(ArgumentMatchers.anyString(), (ValueAndTimestamp) ArgumentMatchers.any(ValueAndTimestamp.class));
        ((TimestampedKeyValueStore) Mockito.doAnswer(invocationOnMock2 -> {
            this.putIfAbsentExecuted = true;
            return null;
        }).when(timestampedKeyValueStore)).putIfAbsent(ArgumentMatchers.anyString(), (ValueAndTimestamp) ArgumentMatchers.any(ValueAndTimestamp.class));
        ((TimestampedKeyValueStore) Mockito.doAnswer(invocationOnMock3 -> {
            this.putAllExecuted = true;
            return null;
        }).when(timestampedKeyValueStore)).putAll((List) ArgumentMatchers.any(List.class));
        ((TimestampedKeyValueStore) Mockito.doAnswer(invocationOnMock4 -> {
            this.deleteExecuted = true;
            return null;
        }).when(timestampedKeyValueStore)).delete(ArgumentMatchers.anyString());
    }

    private WindowStore<String, Long> windowStoreMock(WindowStore<String, Long> windowStore) {
        initStateStoreMock(windowStore);
        Mockito.when(windowStore.fetchAll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn(this.iters.get(0));
        Mockito.when(windowStore.fetch(ArgumentMatchers.anyString(), ArgumentMatchers.anyString(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn(this.iters.get(1));
        Mockito.when(windowStore.fetch(ArgumentMatchers.anyString(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn(this.windowStoreIter);
        Mockito.when((Long) windowStore.fetch(ArgumentMatchers.anyString(), ArgumentMatchers.anyLong())).thenReturn(Long.valueOf(VALUE));
        Mockito.when(windowStore.all()).thenReturn(this.iters.get(2));
        return windowStore;
    }

    private TimestampedWindowStore<String, Long> timestampedWindowStoreMock(TimestampedWindowStore<String, Long> timestampedWindowStore) {
        initStateStoreMock(timestampedWindowStore);
        Mockito.when(timestampedWindowStore.fetchAll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn(this.timestampedIters.get(0));
        Mockito.when(timestampedWindowStore.fetch(ArgumentMatchers.anyString(), ArgumentMatchers.anyString(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn(this.timestampedIters.get(1));
        Mockito.when(timestampedWindowStore.fetch(ArgumentMatchers.anyString(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn(this.windowStoreIter);
        Mockito.when((ValueAndTimestamp) timestampedWindowStore.fetch(ArgumentMatchers.anyString(), ArgumentMatchers.anyLong())).thenReturn(VALUE_AND_TIMESTAMP);
        Mockito.when(timestampedWindowStore.all()).thenReturn(this.timestampedIters.get(2));
        return timestampedWindowStore;
    }

    private SessionStore<String, Long> sessionStoreMock(SessionStore<String, Long> sessionStore) {
        initStateStoreMock(sessionStore);
        Mockito.when(sessionStore.findSessions(ArgumentMatchers.anyString(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn(this.iters.get(3));
        Mockito.when(sessionStore.findSessions(ArgumentMatchers.anyString(), ArgumentMatchers.anyString(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn(this.iters.get(4));
        Mockito.when(sessionStore.fetch(ArgumentMatchers.anyString())).thenReturn(this.iters.get(5));
        Mockito.when(sessionStore.fetch(ArgumentMatchers.anyString(), ArgumentMatchers.anyString())).thenReturn(this.iters.get(6));
        return sessionStore;
    }

    private StreamsConfig streamsConfigMock() {
        StreamsConfig streamsConfig = (StreamsConfig) Mockito.mock(StreamsConfig.class);
        Mockito.when(streamsConfig.originals()).thenReturn(Collections.emptyMap());
        Mockito.when(streamsConfig.values()).thenReturn(Collections.emptyMap());
        Mockito.when(streamsConfig.getString("application.id")).thenReturn("add-id");
        return streamsConfig;
    }

    private StreamsConfig streamsConfigWithConsistencyMock() {
        StreamsConfig streamsConfig = (StreamsConfig) Mockito.mock(StreamsConfig.class);
        HashMap hashMap = new HashMap();
        hashMap.put("__iq.consistency.offset.vector.enabled__", true);
        Mockito.when(streamsConfig.originals()).thenReturn(hashMap);
        Mockito.when(streamsConfig.values()).thenReturn(Collections.emptyMap());
        Mockito.when(streamsConfig.getString("application.id")).thenReturn("add-id");
        return streamsConfig;
    }

    private void initStateStoreMock(StateStore stateStore) {
        Mockito.when(stateStore.name()).thenReturn(STORE_NAME);
        Mockito.when(Boolean.valueOf(stateStore.persistent())).thenReturn(true);
        Mockito.when(Boolean.valueOf(stateStore.isOpen())).thenReturn(true);
    }

    private void mockStateStoreFlush(StateStore stateStore) {
        ((StateStore) Mockito.doAnswer(invocationOnMock -> {
            this.flushExecuted = true;
            return null;
        }).when(stateStore)).flush();
    }

    private <T extends StateStore> void doTest(final String str, final Consumer<T> consumer) {
        new Processor<String, Long, String, Long>() { // from class: org.apache.kafka.streams.processor.internals.ProcessorContextImplTest.1
            public void init(ProcessorContext<String, Long> processorContext) {
                consumer.accept(processorContext.getStateStore(str));
            }

            public void process(Record<String, Long> record) {
            }

            public void close() {
            }
        }.init(this.context);
    }

    private void verifyStoreCannotBeInitializedOrClosed(StateStore stateStore) {
        Assertions.assertEquals(STORE_NAME, stateStore.name());
        Assertions.assertTrue(stateStore.persistent());
        Assertions.assertTrue(stateStore.isOpen());
        checkThrowsUnsupportedOperation(() -> {
            stateStore.init((StateStoreContext) null, (StateStore) null);
        }, "init()");
        Objects.requireNonNull(stateStore);
        checkThrowsUnsupportedOperation(stateStore::close, "close()");
    }

    private void checkThrowsUnsupportedOperation(Runnable runnable, String str) {
        try {
            runnable.run();
            Assertions.fail(str + " should throw exception");
        } catch (UnsupportedOperationException e) {
        }
    }
}
