package org.apache.kafka.streams.processor.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StateUpdater;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.class */
class DefaultStateUpdaterTest {
    private static final int COMMIT_INTERVAL = 100;
    private static final long CALL_TIMEOUT = 1000;
    private static final long VERIFICATION_TIMEOUT = 30000;
    private static final TopicPartition TOPIC_PARTITION_A_0 = new TopicPartition("topicA", 0);
    private static final TopicPartition TOPIC_PARTITION_A_1 = new TopicPartition("topicA", 1);
    private static final TopicPartition TOPIC_PARTITION_B_0 = new TopicPartition("topicB", 0);
    private static final TopicPartition TOPIC_PARTITION_B_1 = new TopicPartition("topicB", 1);
    private static final TopicPartition TOPIC_PARTITION_C_0 = new TopicPartition("topicC", 0);
    private static final TopicPartition TOPIC_PARTITION_D_0 = new TopicPartition("topicD", 0);
    private static final TaskId TASK_0_0 = new TaskId(0, 0);
    private static final TaskId TASK_0_1 = new TaskId(0, 1);
    private static final TaskId TASK_0_2 = new TaskId(0, 2);
    private static final TaskId TASK_1_0 = new TaskId(1, 0);
    private static final TaskId TASK_1_1 = new TaskId(1, 1);
    private static final TaskId TASK_A_0_0 = new TaskId(0, 0, "A");
    private static final TaskId TASK_A_0_1 = new TaskId(0, 1, "A");
    private static final TaskId TASK_B_0_0 = new TaskId(0, 0, "B");
    private static final TaskId TASK_B_0_1 = new TaskId(0, 1, "B");
    private final Time time = new MockTime(1);
    private final Metrics metrics = new Metrics(this.time);
    private final StreamsConfig config = new StreamsConfig(configProps(COMMIT_INTERVAL));
    private final ChangelogReader changelogReader = (ChangelogReader) Mockito.mock(ChangelogReader.class);
    private final TopologyMetadata topologyMetadata = StreamsTestUtils.TopologyMetadataBuilder.unnamedTopology().build();
    private final DefaultStateUpdater stateUpdater = new DefaultStateUpdater("test-state-updater", this.metrics, this.config, (Consumer) null, this.changelogReader, this.topologyMetadata, this.time);

    DefaultStateUpdaterTest() {
    }

    @AfterEach
    public void tearDown() {
        this.stateUpdater.shutdown(Duration.ofMinutes(1L));
    }

    private Properties configProps(int i) {
        return Utils.mkObjectProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("application.id", "appId"), Utils.mkEntry("bootstrap.servers", "localhost:2171"), Utils.mkEntry("processing.guarantee", "exactly_once_v2"), Utils.mkEntry("commit.interval.ms", Integer.valueOf(i)), Utils.mkEntry(StreamsConfig.producerPrefix("transaction.timeout.ms"), Integer.valueOf(i))}));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void shouldShutdownStateUpdater() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statelessTask(TASK_0_0).inState(Task.State.RESTORING).build();
        StreamTask build2 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_0, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RESTORING).build();
        StreamTask build3 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_1, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build();
        StandbyTask build4 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_2, Set.of(TOPIC_PARTITION_C_0)).inState(Task.State.RUNNING).build();
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Set.of(TOPIC_PARTITION_B_0));
        TaskCorruptedException taskCorruptedException = new TaskCorruptedException(Set.of(TASK_1_1));
        ((ChangelogReader) Mockito.doThrow(new Throwable[]{taskCorruptedException}).when(this.changelogReader)).restore(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(TASK_1_1, build3), Utils.mkEntry(TASK_0_2, build4)}));
        this.stateUpdater.add(build);
        this.stateUpdater.add(build2);
        this.stateUpdater.add(build3);
        this.stateUpdater.add(build4);
        this.stateUpdater.start();
        verifyRestoredActiveTasks(build, build2);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask(taskCorruptedException, build3));
        verifyUpdatingTasks(build4);
        verifyPausedTasks(new Task[0]);
        this.stateUpdater.shutdown(Duration.ofMinutes(1L));
        verifyRestoredActiveTasks(build, build2);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask(taskCorruptedException, build3));
        verifyUpdatingTasks(new Task[0]);
        verifyPausedTasks(new Task[0]);
        ((ChangelogReader) Mockito.verify(this.changelogReader)).clear();
    }

    @Test
    public void shouldShutdownStateUpdaterWithPausedTasks() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_0, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RESTORING).build();
        StandbyTask build2 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_2, Set.of(TOPIC_PARTITION_C_0)).inState(Task.State.RUNNING).build();
        Mockito.when(Boolean.valueOf(this.topologyMetadata.isPaused((String) null))).thenReturn(true);
        this.stateUpdater.add(build);
        this.stateUpdater.add(build2);
        this.stateUpdater.start();
        verifyRestoredActiveTasks(new StreamTask[0]);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask[0]);
        verifyUpdatingTasks(new Task[0]);
        verifyPausedTasks(build, build2);
        this.stateUpdater.shutdown(Duration.ofMinutes(1L));
        verifyRestoredActiveTasks(new StreamTask[0]);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask[0]);
        verifyUpdatingTasks(new Task[0]);
        verifyPausedTasks(new Task[0]);
        ((ChangelogReader) Mockito.verify(this.changelogReader)).clear();
    }

    @Test
    public void shouldShutdownStateUpdaterAndRestart() {
        this.stateUpdater.start();
        this.stateUpdater.shutdown(Duration.ofMinutes(1L));
        this.stateUpdater.start();
        this.stateUpdater.shutdown(Duration.ofMinutes(1L));
        ((ChangelogReader) Mockito.verify(this.changelogReader, Mockito.times(2))).clear();
    }

    @Test
    public void shouldThrowIfRestartedWithNonEmptyRestoredTasks() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build();
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Set.of(TOPIC_PARTITION_A_0, TOPIC_PARTITION_A_1));
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        verifyRestoredActiveTasks(build);
        this.stateUpdater.shutdown(Duration.ofMinutes(1L));
        Assertions.assertEquals("State updater started with non-empty output queues. This indicates a bug. Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the dev-mailing list (https://kafka.apache.org/contact).", ((IllegalStateException) Assertions.assertThrows(IllegalStateException.class, () -> {
            this.stateUpdater.start();
        })).getMessage());
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void shouldThrowIfRestartedWithNonEmptyFailedTasks() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build();
        TaskCorruptedException taskCorruptedException = new TaskCorruptedException(Set.of(TASK_0_0));
        ((ChangelogReader) Mockito.doThrow(new Throwable[]{taskCorruptedException}).when(this.changelogReader)).restore(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(TASK_0_0, build)}));
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask(taskCorruptedException, build));
        this.stateUpdater.shutdown(Duration.ofMinutes(1L));
        Assertions.assertEquals("State updater started with non-empty output queues. This indicates a bug. Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the dev-mailing list (https://kafka.apache.org/contact).", ((IllegalStateException) Assertions.assertThrows(IllegalStateException.class, () -> {
            this.stateUpdater.start();
        })).getMessage());
    }

    @Test
    public void shouldThrowIfStatelessTaskNotInStateRestoring() {
        shouldThrowIfActiveTaskNotInStateRestoring(StreamsTestUtils.TaskBuilder.statelessTask(TASK_0_0).build());
    }

    @Test
    public void shouldThrowIfStatefulTaskNotInStateRestoring() {
        shouldThrowIfActiveTaskNotInStateRestoring(StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).build());
    }

    private void shouldThrowIfActiveTaskNotInStateRestoring(StreamTask streamTask) {
        shouldThrowIfTaskNotInGivenState(streamTask, Task.State.RESTORING);
    }

    @Test
    public void shouldThrowIfStandbyTaskNotInStateRunning() {
        shouldThrowIfTaskNotInGivenState(StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Set.of(TOPIC_PARTITION_B_0)).build(), Task.State.RUNNING);
    }

    private void shouldThrowIfTaskNotInGivenState(Task task, Task.State state) {
        for (Task.State state2 : Task.State.values()) {
            if (state2 != state) {
                Mockito.when(task.state()).thenReturn(state2);
                Assertions.assertThrows(IllegalStateException.class, () -> {
                    this.stateUpdater.add(task);
                });
            }
        }
    }

    @Test
    public void shouldThrowIfAddingActiveTasksWithSameId() throws Exception {
        shouldThrowIfAddingTasksWithSameId(StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build(), StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build());
    }

    @Test
    public void shouldThrowIfAddingStandbyTasksWithSameId() throws Exception {
        shouldThrowIfAddingTasksWithSameId(StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RUNNING).build(), StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RUNNING).build());
    }

    @Test
    public void shouldThrowIfAddingActiveAndStandbyTaskWithSameId() throws Exception {
        shouldThrowIfAddingTasksWithSameId(StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build(), StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RUNNING).build());
    }

    @Test
    public void shouldThrowIfAddingStandbyAndActiveTaskWithSameId() throws Exception {
        shouldThrowIfAddingTasksWithSameId(StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RUNNING).build(), StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build());
    }

    private void shouldThrowIfAddingTasksWithSameId(Task task, Task task2) throws Exception {
        this.stateUpdater.start();
        this.stateUpdater.add(task);
        this.stateUpdater.add(task2);
        verifyFailedTasks(IllegalStateException.class, task);
        Assertions.assertFalse(this.stateUpdater.isRunning());
    }

    @Test
    public void shouldImmediatelyAddSingleStatelessTaskToRestoredTasks() throws Exception {
        shouldImmediatelyAddStatelessTasksToRestoredTasks(StreamsTestUtils.TaskBuilder.statelessTask(TASK_0_0).inState(Task.State.RESTORING).build());
    }

    @Test
    public void shouldImmediatelyAddMultipleStatelessTasksToRestoredTasks() throws Exception {
        shouldImmediatelyAddStatelessTasksToRestoredTasks(StreamsTestUtils.TaskBuilder.statelessTask(TASK_0_0).inState(Task.State.RESTORING).build(), StreamsTestUtils.TaskBuilder.statelessTask(TASK_0_2).inState(Task.State.RESTORING).build(), StreamsTestUtils.TaskBuilder.statelessTask(TASK_1_0).inState(Task.State.RESTORING).build());
    }

    private void shouldImmediatelyAddStatelessTasksToRestoredTasks(StreamTask... streamTaskArr) throws Exception {
        this.stateUpdater.start();
        for (StreamTask streamTask : streamTaskArr) {
            this.stateUpdater.add(streamTask);
        }
        verifyRestoredActiveTasks(streamTaskArr);
        verifyNeverCheckpointTasks(streamTaskArr);
        verifyUpdatingTasks(new Task[0]);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask[0]);
        verifyPausedTasks(new Task[0]);
    }

    @Test
    public void shouldRestoreSingleActiveStatefulTask() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0)).inState(Task.State.RESTORING).build();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()).thenReturn(Set.of(TOPIC_PARTITION_A_0)).thenAnswer(invocationOnMock -> {
            atomicBoolean.set(true);
            return Set.of(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0);
        });
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenAnswer(invocationOnMock2 -> {
            return Boolean.valueOf(atomicBoolean.get());
        });
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        verifyRestoredActiveTasks(build);
        verifyCheckpointTasks(true, build);
        verifyUpdatingTasks(new Task[0]);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask[0]);
        verifyPausedTasks(new Task[0]);
        ((ChangelogReader) Mockito.verify(this.changelogReader)).register(build.changelogPartitions(), build.stateManager());
        ((ChangelogReader) Mockito.verify(this.changelogReader)).unregister(build.changelogPartitions());
        ((ChangelogReader) Mockito.verify(this.changelogReader)).enforceRestoreActive();
        ((ChangelogReader) Mockito.verify(this.changelogReader, Mockito.atLeast(3))).restore(ArgumentMatchers.anyMap());
        ((ChangelogReader) Mockito.verify(this.changelogReader, Mockito.never())).transitToUpdateStandby();
    }

    @Test
    public void shouldRestoreMultipleActiveStatefulTasks() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build();
        StreamTask build2 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_2, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RESTORING).build();
        StreamTask build3 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_0, Set.of(TOPIC_PARTITION_C_0)).inState(Task.State.RESTORING).build();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()).thenReturn(Set.of(TOPIC_PARTITION_C_0)).thenReturn(Set.of(TOPIC_PARTITION_C_0, TOPIC_PARTITION_A_0)).thenAnswer(invocationOnMock -> {
            atomicBoolean.set(true);
            return Set.of(TOPIC_PARTITION_C_0, TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0);
        });
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenAnswer(invocationOnMock2 -> {
            return Boolean.valueOf(atomicBoolean.get());
        });
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        this.stateUpdater.add(build2);
        this.stateUpdater.add(build3);
        verifyRestoredActiveTasks(build3, build, build2);
        verifyCheckpointTasks(true, build3, build, build2);
        verifyUpdatingTasks(new Task[0]);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask[0]);
        verifyPausedTasks(new Task[0]);
        ((ChangelogReader) Mockito.verify(this.changelogReader)).register(build.changelogPartitions(), build.stateManager());
        ((ChangelogReader) Mockito.verify(this.changelogReader)).register(build2.changelogPartitions(), build2.stateManager());
        ((ChangelogReader) Mockito.verify(this.changelogReader)).register(build3.changelogPartitions(), build3.stateManager());
        ((ChangelogReader) Mockito.verify(this.changelogReader)).unregister(build.changelogPartitions());
        ((ChangelogReader) Mockito.verify(this.changelogReader)).unregister(build2.changelogPartitions());
        ((ChangelogReader) Mockito.verify(this.changelogReader)).unregister(build3.changelogPartitions());
        ((ChangelogReader) Mockito.verify(this.changelogReader, Mockito.times(3))).enforceRestoreActive();
        ((ChangelogReader) Mockito.verify(this.changelogReader, Mockito.atLeast(4))).restore(ArgumentMatchers.anyMap());
        ((ChangelogReader) Mockito.verify(this.changelogReader, Mockito.never())).transitToUpdateStandby();
    }

    @Test
    public void shouldReturnTrueForRestoreActiveTasksIfTaskAdded() {
        this.stateUpdater.add(StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0)).inState(Task.State.RESTORING).build());
        Assertions.assertTrue(this.stateUpdater.restoresActiveTasks());
    }

    @Test
    public void shouldReturnTrueForRestoreActiveTasksIfTaskUpdating() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0)).inState(Task.State.RESTORING).build();
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        verifyRestoredActiveTasks(new StreamTask[0]);
        verifyUpdatingTasks(build);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask[0]);
        verifyPausedTasks(new Task[0]);
        Assertions.assertTrue(this.stateUpdater.restoresActiveTasks());
    }

    @Test
    public void shouldReturnTrueForRestoreActiveTasksIfTaskRestored() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0)).inState(Task.State.RESTORING).build();
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Set.of(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0));
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(true);
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        verifyRestoredActiveTasks(build);
        verifyUpdatingTasks(new Task[0]);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask[0]);
        verifyPausedTasks(new Task[0]);
        Assertions.assertTrue(this.stateUpdater.restoresActiveTasks());
    }

    @Test
    public void shouldReturnFalseForRestoreActiveTasksIfTaskRemoved() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0)).inState(Task.State.RESTORING).build();
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        this.stateUpdater.remove(build.id()).get();
        verifyRestoredActiveTasks(new StreamTask[0]);
        verifyUpdatingTasks(new Task[0]);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask[0]);
        verifyPausedTasks(new Task[0]);
        Assertions.assertFalse(this.stateUpdater.restoresActiveTasks());
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void shouldReturnTrueForRestoreActiveTasksIfTaskFailed() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0)).inState(Task.State.RESTORING).build();
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        TaskCorruptedException taskCorruptedException = new TaskCorruptedException(Set.of(build.id()));
        ((ChangelogReader) Mockito.doThrow(new Throwable[]{taskCorruptedException}).when(this.changelogReader)).restore(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(TASK_0_0, build)}));
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        verifyRestoredActiveTasks(new StreamTask[0]);
        verifyUpdatingTasks(new Task[0]);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask(taskCorruptedException, build));
        verifyPausedTasks(new Task[0]);
        Assertions.assertTrue(this.stateUpdater.restoresActiveTasks());
    }

    @Test
    public void shouldReturnTrueForRestoreActiveTasksIfTaskPaused() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0)).inState(Task.State.RESTORING).build();
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        verifyUpdatingTasks(build);
        Mockito.when(Boolean.valueOf(this.topologyMetadata.isPaused((String) null))).thenReturn(true);
        verifyRestoredActiveTasks(new StreamTask[0]);
        verifyUpdatingTasks(new Task[0]);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask[0]);
        verifyPausedTasks(build);
        Assertions.assertTrue(this.stateUpdater.restoresActiveTasks());
    }

    @Test
    public void shouldReturnFalseForRestoreActiveTasksIfTaskRemovedFromStateUpdater() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0)).inState(Task.State.RESTORING).build();
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Set.of(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0));
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(true);
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        this.stateUpdater.drainRestoredActiveTasks(Duration.ofMillis(VERIFICATION_TIMEOUT));
        verifyRestoredActiveTasks(new StreamTask[0]);
        verifyUpdatingTasks(new Task[0]);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask[0]);
        verifyPausedTasks(new Task[0]);
        Assertions.assertFalse(this.stateUpdater.restoresActiveTasks());
    }

    @Test
    public void shouldReturnTrueForRestoreActiveTasksIfStandbyTask() throws Exception {
        StandbyTask build = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0)).inState(Task.State.RUNNING).build();
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        verifyRestoredActiveTasks(new StreamTask[0]);
        verifyUpdatingTasks(build);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask[0]);
        verifyPausedTasks(new Task[0]);
        Assertions.assertFalse(this.stateUpdater.restoresActiveTasks());
    }

    @Test
    public void shouldDrainRestoredActiveTasks() throws Exception {
        Assertions.assertTrue(this.stateUpdater.drainRestoredActiveTasks(Duration.ZERO).isEmpty());
        StreamTask build = StreamsTestUtils.TaskBuilder.statelessTask(TASK_0_0).inState(Task.State.RESTORING).build();
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        verifyDrainingRestoredActiveTasks(build);
        StreamTask build2 = StreamsTestUtils.TaskBuilder.statelessTask(TASK_1_1).inState(Task.State.RESTORING).build();
        StreamTask build3 = StreamsTestUtils.TaskBuilder.statelessTask(TASK_1_0).inState(Task.State.RESTORING).build();
        StreamTask build4 = StreamsTestUtils.TaskBuilder.statelessTask(TASK_0_2).inState(Task.State.RESTORING).build();
        this.stateUpdater.add(build2);
        this.stateUpdater.add(build3);
        this.stateUpdater.add(build4);
        verifyDrainingRestoredActiveTasks(build2, build3, build4);
    }

    @Test
    public void shouldUpdateSingleStandbyTask() throws Exception {
        shouldUpdateStandbyTasks(StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0)).inState(Task.State.RUNNING).build());
    }

    @Test
    public void shouldUpdateMultipleStandbyTasks() throws Exception {
        shouldUpdateStandbyTasks(StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RUNNING).build(), StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_2, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RUNNING).build(), StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_0, Set.of(TOPIC_PARTITION_C_0)).inState(Task.State.RUNNING).build());
    }

    private void shouldUpdateStandbyTasks(StandbyTask... standbyTaskArr) throws Exception {
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        this.stateUpdater.start();
        for (StandbyTask standbyTask : standbyTaskArr) {
            this.stateUpdater.add(standbyTask);
        }
        verifyUpdatingStandbyTasks(standbyTaskArr);
        verifyRestoredActiveTasks(new StreamTask[0]);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask[0]);
        verifyPausedTasks(new Task[0]);
        for (StandbyTask standbyTask2 : standbyTaskArr) {
            ((ChangelogReader) Mockito.verify(this.changelogReader)).register(standbyTask2.changelogPartitions(), standbyTask2.stateManager());
        }
        ((ChangelogReader) Mockito.verify(this.changelogReader)).transitToUpdateStandby();
        ((ChangelogReader) Mockito.verify(this.changelogReader, Mockito.timeout(VERIFICATION_TIMEOUT).atLeast(1))).restore(ArgumentMatchers.anyMap());
        ((ChangelogReader) Mockito.verify(this.changelogReader, Mockito.never())).enforceRestoreActive();
    }

    @Test
    public void shouldRestoreActiveStatefulTasksAndUpdateStandbyTasks() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build();
        StreamTask build2 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_2, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RESTORING).build();
        StandbyTask build3 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_0, Set.of(TOPIC_PARTITION_C_0)).inState(Task.State.RUNNING).build();
        StandbyTask build4 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_1, Set.of(TOPIC_PARTITION_D_0)).inState(Task.State.RUNNING).build();
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()).thenReturn(Set.of(TOPIC_PARTITION_A_0)).thenReturn(Set.of(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0));
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        this.stateUpdater.add(build2);
        this.stateUpdater.add(build3);
        this.stateUpdater.add(build4);
        verifyRestoredActiveTasks(build2, build);
        verifyCheckpointTasks(true, build2, build);
        verifyUpdatingStandbyTasks(build4, build3);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask[0]);
        verifyPausedTasks(new Task[0]);
        ((ChangelogReader) Mockito.verify(this.changelogReader)).register(build.changelogPartitions(), build.stateManager());
        ((ChangelogReader) Mockito.verify(this.changelogReader)).register(build2.changelogPartitions(), build2.stateManager());
        ((ChangelogReader) Mockito.verify(this.changelogReader)).register(build3.changelogPartitions(), build3.stateManager());
        ((ChangelogReader) Mockito.verify(this.changelogReader)).register(build4.changelogPartitions(), build4.stateManager());
        ((ChangelogReader) Mockito.verify(this.changelogReader, Mockito.atLeast(3))).restore(ArgumentMatchers.anyMap());
        InOrder inOrder = Mockito.inOrder(new Object[]{this.changelogReader, build, build2});
        ((ChangelogReader) inOrder.verify(this.changelogReader, Mockito.times(2))).enforceRestoreActive();
        ((ChangelogReader) inOrder.verify(this.changelogReader)).transitToUpdateStandby();
    }

    @Test
    public void shouldRestoreActiveStatefulTaskThenUpdateStandbyTaskAndAgainRestoreActiveStatefulTask() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build();
        StandbyTask build2 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_0, Set.of(TOPIC_PARTITION_C_0)).inState(Task.State.RUNNING).build();
        StreamTask build3 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_2, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RESTORING).build();
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()).thenReturn(Set.of(TOPIC_PARTITION_A_0)).thenReturn(Set.of(TOPIC_PARTITION_B_0));
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        this.stateUpdater.add(build2);
        verifyRestoredActiveTasks(build);
        verifyCheckpointTasks(true, build);
        verifyUpdatingStandbyTasks(build2);
        InOrder inOrder = Mockito.inOrder(new Object[]{this.changelogReader});
        ((ChangelogReader) inOrder.verify(this.changelogReader, Mockito.times(1))).enforceRestoreActive();
        ((ChangelogReader) inOrder.verify(this.changelogReader, Mockito.times(1))).transitToUpdateStandby();
        this.stateUpdater.add(build3);
        verifyRestoredActiveTasks(build, build3);
        verifyCheckpointTasks(true, build3);
        ((ChangelogReader) inOrder.verify(this.changelogReader, Mockito.times(1))).enforceRestoreActive();
        ((ChangelogReader) inOrder.verify(this.changelogReader, Mockito.times(1))).transitToUpdateStandby();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void shouldUpdateStandbyTaskAfterAllActiveStatefulTasksFailed() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build();
        StreamTask build2 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_1, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RESTORING).build();
        StandbyTask build3 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_0, Set.of(TOPIC_PARTITION_C_0)).inState(Task.State.RUNNING).build();
        TaskCorruptedException taskCorruptedException = new TaskCorruptedException(Set.of(build.id(), build2.id()));
        ((ChangelogReader) Mockito.doThrow(new Throwable[]{taskCorruptedException}).doReturn(0L).when(this.changelogReader)).restore(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(build.id(), build), Utils.mkEntry(build2.id(), build2), Utils.mkEntry(build3.id(), build3)}));
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        this.stateUpdater.add(build2);
        this.stateUpdater.add(build3);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask(taskCorruptedException, build), new StateUpdater.ExceptionAndTask(taskCorruptedException, build2));
        InOrder inOrder = Mockito.inOrder(new Object[]{this.changelogReader});
        ((ChangelogReader) inOrder.verify(this.changelogReader, Mockito.atLeast(1))).enforceRestoreActive();
        ((ChangelogReader) inOrder.verify(this.changelogReader)).transitToUpdateStandby();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void shouldNotTransitToStandbyAgainAfterStandbyTaskFailed() throws Exception {
        StandbyTask build = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RUNNING).build();
        StandbyTask build2 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_0, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RUNNING).build();
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(build.id(), build), Utils.mkEntry(build2.id(), build2)});
        TaskCorruptedException taskCorruptedException = new TaskCorruptedException(Set.of(build.id()));
        StateUpdater.ExceptionAndTask exceptionAndTask = new StateUpdater.ExceptionAndTask(taskCorruptedException, build);
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        ((ChangelogReader) Mockito.doThrow(new Throwable[]{taskCorruptedException}).doReturn(0L).when(this.changelogReader)).restore(mkMap);
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        this.stateUpdater.add(build2);
        verifyExceptionsAndFailedTasks(exceptionAndTask);
        ((ChangelogReader) Mockito.verify(this.changelogReader, Mockito.times(1))).transitToUpdateStandby();
    }

    @Test
    public void shouldUpdateStandbyTaskAfterAllActiveStatefulTasksRemoved() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build();
        StreamTask build2 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_1, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RESTORING).build();
        StandbyTask build3 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_0, Set.of(TOPIC_PARTITION_C_0)).inState(Task.State.RUNNING).build();
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        this.stateUpdater.add(build2);
        this.stateUpdater.add(build3);
        verifyUpdatingTasks(build, build2, build3);
        CompletableFuture.allOf(this.stateUpdater.remove(build.id()), this.stateUpdater.remove(build2.id())).get();
        InOrder inOrder = Mockito.inOrder(new Object[]{this.changelogReader});
        ((ChangelogReader) inOrder.verify(this.changelogReader, Mockito.atLeast(1))).enforceRestoreActive();
        ((ChangelogReader) inOrder.verify(this.changelogReader)).transitToUpdateStandby();
    }

    @Test
    public void shouldNotSwitchTwiceToUpdatingStandbyTaskIfStandbyTaskIsRemoved() throws Exception {
        StandbyTask build = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_0, Set.of(TOPIC_PARTITION_C_0)).inState(Task.State.RUNNING).build();
        StandbyTask build2 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_1, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RUNNING).build();
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        this.stateUpdater.add(build2);
        verifyUpdatingTasks(build, build2);
        this.stateUpdater.remove(build2.id()).get();
        ((ChangelogReader) Mockito.verify(this.changelogReader)).transitToUpdateStandby();
    }

    @Test
    public void shouldRemoveUpdatingActiveStatefulTask() throws Exception {
        shouldRemoveUpdatingStatefulTask(StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build());
    }

    @Test
    public void shouldRemoveUpdatingStandbyTask() throws Exception {
        shouldRemoveUpdatingStatefulTask(StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RUNNING).build());
    }

    private void shouldRemoveUpdatingStatefulTask(Task task) throws Exception {
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        this.stateUpdater.start();
        this.stateUpdater.add(task);
        verifyUpdatingTasks(task);
        Assertions.assertEquals(new StateUpdater.RemovedTaskResult(task), this.stateUpdater.remove(task.id()).get());
        verifyCheckpointTasks(true, task);
        verifyRestoredActiveTasks(new StreamTask[0]);
        verifyUpdatingTasks(new Task[0]);
        verifyPausedTasks(new Task[0]);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask[0]);
        ((ChangelogReader) Mockito.verify(this.changelogReader)).unregister(task.changelogPartitions());
    }

    @Test
    public void shouldThrowIfRemovingUpdatingActiveTaskFailsWithStreamsException() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build();
        StreamsException streamsException = new StreamsException("Something happened", build.id());
        setupShouldThrowIfRemovingUpdatingStatefulTaskFailsWithException(build, streamsException);
        verifyRemovingUpdatingStatefulTaskFails(this.stateUpdater.remove(build.id()), build, streamsException, true);
    }

    @Test
    public void shouldThrowIfRemovingUpdatingActiveTaskFailsWithRuntimeException() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build();
        RuntimeException runtimeException = new RuntimeException("Something happened");
        setupShouldThrowIfRemovingUpdatingStatefulTaskFailsWithException(build, runtimeException);
        verifyRemovingUpdatingStatefulTaskFails(this.stateUpdater.remove(build.id()), build, runtimeException, false);
    }

    @Test
    public void shouldThrowIfRemovingUpdatingStandbyTaskFailsWithStreamsException() throws Exception {
        StandbyTask build = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RUNNING).build();
        StreamsException streamsException = new StreamsException("Something happened", build.id());
        setupShouldThrowIfRemovingUpdatingStatefulTaskFailsWithException(build, streamsException);
        verifyRemovingUpdatingStatefulTaskFails(this.stateUpdater.remove(build.id()), build, streamsException, true);
    }

    @Test
    public void shouldThrowIfRemovingUpdatingStandbyTaskFailsWithRuntimeException() throws Exception {
        StandbyTask build = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RUNNING).build();
        RuntimeException runtimeException = new RuntimeException("Something happened");
        setupShouldThrowIfRemovingUpdatingStatefulTaskFailsWithException(build, runtimeException);
        verifyRemovingUpdatingStatefulTaskFails(this.stateUpdater.remove(build.id()), build, runtimeException, false);
    }

    private void setupShouldThrowIfRemovingUpdatingStatefulTaskFailsWithException(Task task, RuntimeException runtimeException) throws Exception {
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        ((ChangelogReader) Mockito.doThrow(new Throwable[]{runtimeException}).when(this.changelogReader)).unregister(task.changelogPartitions());
        this.stateUpdater.start();
        this.stateUpdater.add(task);
        verifyUpdatingTasks(task);
    }

    private void verifyRemovingUpdatingStatefulTaskFails(CompletableFuture<StateUpdater.RemovedTaskResult> completableFuture, Task task, RuntimeException runtimeException, boolean z) throws Exception {
        Objects.requireNonNull(completableFuture);
        Assertions.assertInstanceOf(RuntimeException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, completableFuture::get)).getCause());
        verifyRestoredActiveTasks(new StreamTask[0]);
        verifyUpdatingTasks(new Task[0]);
        verifyPausedTasks(new Task[0]);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask(runtimeException, task));
        Assertions.assertEquals(Boolean.valueOf(z), Boolean.valueOf(this.stateUpdater.isRunning()));
    }

    @Test
    public void shouldRemovePausedTask() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build();
        StandbyTask build2 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_1, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RUNNING).build();
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        this.stateUpdater.add(build2);
        verifyUpdatingTasks(build, build2);
        Mockito.when(Boolean.valueOf(this.topologyMetadata.isPaused((String) null))).thenReturn(true);
        verifyPausedTasks(build, build2);
        verifyUpdatingTasks(new Task[0]);
        CompletableFuture remove = this.stateUpdater.remove(build.id());
        CompletableFuture remove2 = this.stateUpdater.remove(build2.id());
        Assertions.assertEquals(new StateUpdater.RemovedTaskResult(build), remove.get());
        Assertions.assertEquals(new StateUpdater.RemovedTaskResult(build2), remove2.get());
        verifyPausedTasks(new Task[0]);
        verifyCheckpointTasks(true, build, build2);
        verifyUpdatingTasks(new Task[0]);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask[0]);
        ((ChangelogReader) Mockito.verify(this.changelogReader)).unregister(build.changelogPartitions());
        ((ChangelogReader) Mockito.verify(this.changelogReader)).unregister(build2.changelogPartitions());
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void shouldThrowIfRemovingPausedTaskFails() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build();
        StreamsException streamsException = new StreamsException("Something happened", build.id());
        ((ChangelogReader) Mockito.doThrow(new Throwable[]{streamsException}).when(this.changelogReader)).unregister(build.changelogPartitions());
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        verifyUpdatingTasks(build);
        Mockito.when(Boolean.valueOf(this.topologyMetadata.isPaused((String) null))).thenReturn(true);
        verifyPausedTasks(build);
        verifyUpdatingTasks(new Task[0]);
        CompletableFuture remove = this.stateUpdater.remove(build.id());
        Objects.requireNonNull(remove);
        Assertions.assertInstanceOf(StreamsException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, remove::get)).getCause());
        verifyRestoredActiveTasks(new StreamTask[0]);
        verifyUpdatingTasks(new Task[0]);
        verifyPausedTasks(new Task[0]);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask(streamsException, build));
        Assertions.assertTrue(this.stateUpdater.isRunning());
    }

    @Test
    public void shouldRemoveActiveStatefulTaskFromRestoredActiveTasks() throws Exception {
        shouldRemoveTaskFromRestoredActiveTasks(StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build());
    }

    @Test
    public void shouldRemoveStatelessTaskFromRestoredActiveTasks() throws Exception {
        shouldRemoveTaskFromRestoredActiveTasks(StreamsTestUtils.TaskBuilder.statelessTask(TASK_0_0).inState(Task.State.RESTORING).build());
    }

    private void shouldRemoveTaskFromRestoredActiveTasks(StreamTask streamTask) throws Exception {
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        this.stateUpdater.start();
        this.stateUpdater.add(streamTask);
        verifyRestoredActiveTasks(streamTask);
        CompletableFuture remove = this.stateUpdater.remove(streamTask.id());
        remove.get();
        Assertions.assertEquals(new StateUpdater.RemovedTaskResult(streamTask), remove.get());
        verifyRestoredActiveTasks(new StreamTask[0]);
        verifyUpdatingTasks(new Task[0]);
        verifyPausedTasks(new Task[0]);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask[0]);
    }

    @Test
    public void shouldRemoveActiveStatefulTaskFromFailedTasks() throws Exception {
        shouldRemoveTaskFromFailedTasks(StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build());
    }

    @Test
    public void shouldRemoveStandbyTaskFromFailedTasks() throws Exception {
        shouldRemoveTaskFromFailedTasks(StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RUNNING).build());
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void shouldRemoveTaskFromFailedTasks(Task task) throws Exception {
        StreamsException streamsException = new StreamsException("Something happened", task.id());
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        ((ChangelogReader) Mockito.doThrow(new Throwable[]{streamsException}).doReturn(0L).when(this.changelogReader)).restore(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(task.id(), task)}));
        this.stateUpdater.start();
        this.stateUpdater.add(task);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask(streamsException, task));
        Assertions.assertEquals(new StateUpdater.RemovedTaskResult(task, streamsException), this.stateUpdater.remove(task.id()).get());
        verifyPausedTasks(new Task[0]);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask[0]);
        verifyUpdatingTasks(new Task[0]);
        verifyRestoredActiveTasks(new StreamTask[0]);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void shouldCompleteWithNullIfTaskNotFound() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build();
        StreamTask build2 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_1, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RESTORING).build();
        StreamTask build3 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_2, Set.of(TOPIC_PARTITION_C_0)).inState(Task.State.RESTORING).build();
        TaskCorruptedException taskCorruptedException = new TaskCorruptedException(Set.of(TASK_0_2));
        ((ChangelogReader) Mockito.doThrow(new Throwable[]{taskCorruptedException}).when(this.changelogReader)).restore(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(TASK_0_0, build), Utils.mkEntry(TASK_0_2, build3)}));
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Set.of(TOPIC_PARTITION_B_0));
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        this.stateUpdater.add(build2);
        this.stateUpdater.add(build3);
        verifyRestoredActiveTasks(build2);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask(taskCorruptedException, build3));
        verifyUpdatingTasks(build);
        verifyPausedTasks(new Task[0]);
        Assertions.assertNull(this.stateUpdater.remove(TASK_1_0).get());
        verifyRestoredActiveTasks(build2);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask(taskCorruptedException, build3));
        verifyUpdatingTasks(build);
        verifyPausedTasks(new Task[0]);
    }

    @Test
    public void shouldCompleteWithNullIfNoTasks() throws Exception {
        this.stateUpdater.start();
        Assertions.assertNull(this.stateUpdater.remove(TASK_0_1).get());
        Assertions.assertTrue(this.stateUpdater.isRunning());
    }

    @Test
    public void shouldPauseActiveStatefulTask() throws Exception {
        shouldPauseStatefulTask(StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build());
        ((ChangelogReader) Mockito.verify(this.changelogReader, Mockito.never())).transitToUpdateStandby();
    }

    @Test
    public void shouldPauseStandbyTask() throws Exception {
        shouldPauseStatefulTask(StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RUNNING).build());
        ((ChangelogReader) Mockito.verify(this.changelogReader, Mockito.times(1))).transitToUpdateStandby();
    }

    @Test
    public void shouldPauseActiveTaskAndTransitToUpdateStandby() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_A_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build();
        StandbyTask build2 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_B_0_0, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RUNNING).build();
        Mockito.when(Boolean.valueOf(this.topologyMetadata.isPaused(build.id().topologyName()))).thenReturn(false).thenReturn(true);
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        this.stateUpdater.add(build2);
        verifyPausedTasks(build);
        verifyCheckpointTasks(true, build);
        verifyRestoredActiveTasks(new StreamTask[0]);
        verifyUpdatingTasks(build2);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask[0]);
        ((ChangelogReader) Mockito.verify(this.changelogReader, Mockito.times(1))).enforceRestoreActive();
        ((ChangelogReader) Mockito.verify(this.changelogReader, Mockito.times(1))).transitToUpdateStandby();
    }

    @Test
    public void shouldPauseStandbyTaskAndNotTransitToRestoreActive() throws Exception {
        StandbyTask build = StreamsTestUtils.TaskBuilder.standbyTask(TASK_A_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RUNNING).build();
        StandbyTask build2 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_B_0_0, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RUNNING).build();
        Mockito.when(Boolean.valueOf(this.topologyMetadata.isPaused(build.id().topologyName()))).thenReturn(false).thenReturn(true);
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        this.stateUpdater.add(build2);
        verifyPausedTasks(build);
        verifyUpdatingTasks(build2);
        verifyCheckpointTasks(true, build);
        ((ChangelogReader) Mockito.verify(this.changelogReader, Mockito.never())).enforceRestoreActive();
    }

    private void shouldPauseStatefulTask(Task task) throws Exception {
        this.stateUpdater.start();
        this.stateUpdater.add(task);
        verifyUpdatingTasks(task);
        Mockito.when(Boolean.valueOf(this.topologyMetadata.isPaused((String) null))).thenReturn(true);
        verifyPausedTasks(task);
        verifyCheckpointTasks(true, task);
        verifyRestoredActiveTasks(new StreamTask[0]);
        verifyUpdatingTasks(new Task[0]);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask[0]);
    }

    @Test
    public void shouldNotPausingNonExistTasks() throws Exception {
        this.stateUpdater.start();
        Mockito.when(Boolean.valueOf(this.topologyMetadata.isPaused((String) null))).thenReturn(true);
        verifyPausedTasks(new Task[0]);
        verifyRestoredActiveTasks(new StreamTask[0]);
        verifyUpdatingTasks(new Task[0]);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask[0]);
    }

    @Test
    public void shouldNotPauseActiveStatefulTaskInRestoredActiveTasks() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build();
        StreamTask build2 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_0, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RESTORING).build();
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        this.stateUpdater.add(build2);
        verifyRestoredActiveTasks(build);
        verifyUpdatingTasks(build2);
        Mockito.when(Boolean.valueOf(this.topologyMetadata.isPaused((String) null))).thenReturn(true);
        verifyPausedTasks(build2);
        verifyRestoredActiveTasks(build);
        verifyUpdatingTasks(new Task[0]);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask[0]);
    }

    @Test
    public void shouldNotPauseActiveStatefulTaskInFailedTasks() throws Exception {
        shouldNotPauseTaskInFailedTasks(StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build());
    }

    @Test
    public void shouldNotPauseStandbyTaskInFailedTasks() throws Exception {
        shouldNotPauseTaskInFailedTasks(StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RUNNING).build());
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void shouldNotPauseTaskInFailedTasks(Task task) throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_0, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RESTORING).build();
        StreamsException streamsException = new StreamsException("Something happened", task.id());
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        ((ChangelogReader) Mockito.doThrow(new Throwable[]{streamsException}).doReturn(0L).when(this.changelogReader)).restore(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(task.id(), task), Utils.mkEntry(build.id(), build)}));
        this.stateUpdater.start();
        this.stateUpdater.add(task);
        this.stateUpdater.add(build);
        StateUpdater.ExceptionAndTask exceptionAndTask = new StateUpdater.ExceptionAndTask(streamsException, task);
        verifyExceptionsAndFailedTasks(exceptionAndTask);
        verifyUpdatingTasks(build);
        Mockito.when(Boolean.valueOf(this.topologyMetadata.isPaused((String) null))).thenReturn(true);
        verifyPausedTasks(build);
        verifyExceptionsAndFailedTasks(exceptionAndTask);
        verifyUpdatingTasks(new Task[0]);
        verifyRestoredActiveTasks(new StreamTask[0]);
    }

    @Test
    public void shouldResumeActiveStatefulTask() throws Exception {
        shouldResumeStatefulTask(StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build());
        ((ChangelogReader) Mockito.verify(this.changelogReader, Mockito.times(2))).enforceRestoreActive();
    }

    @Test
    public void shouldIdleWhenAllTasksPaused() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build();
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        Mockito.when(Boolean.valueOf(this.topologyMetadata.isPaused((String) null))).thenReturn(true);
        verifyPausedTasks(build);
        verifyIdle();
        Mockito.when(Boolean.valueOf(this.topologyMetadata.isPaused((String) null))).thenReturn(false);
        this.stateUpdater.signalResume();
        verifyPausedTasks(new Task[0]);
        verifyUpdatingTasks(build);
    }

    @Test
    public void shouldResumeStandbyTask() throws Exception {
        shouldResumeStatefulTask(StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RUNNING).build());
        ((ChangelogReader) Mockito.verify(this.changelogReader, Mockito.times(2))).transitToUpdateStandby();
    }

    private void shouldResumeStatefulTask(Task task) throws Exception {
        this.stateUpdater.start();
        this.stateUpdater.add(task);
        verifyUpdatingTasks(task);
        Mockito.when(Boolean.valueOf(this.topologyMetadata.isPaused((String) null))).thenReturn(true);
        verifyPausedTasks(task);
        verifyUpdatingTasks(new Task[0]);
        Mockito.when(Boolean.valueOf(this.topologyMetadata.isPaused((String) null))).thenReturn(false);
        this.stateUpdater.signalResume();
        verifyPausedTasks(new Task[0]);
        verifyUpdatingTasks(task);
    }

    @Test
    public void shouldNotResumeNonExistingTasks() throws Exception {
        this.stateUpdater.start();
        verifyPausedTasks(new Task[0]);
        verifyRestoredActiveTasks(new StreamTask[0]);
        verifyUpdatingTasks(new Task[0]);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask[0]);
    }

    @Test
    public void shouldNotResumeActiveStatefulTaskInRestoredActiveTasks() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build();
        StreamTask build2 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_0, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RESTORING).build();
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        this.stateUpdater.add(build2);
        verifyRestoredActiveTasks(build);
        verifyPausedTasks(new Task[0]);
        verifyRestoredActiveTasks(build);
        verifyUpdatingTasks(build2);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask[0]);
    }

    @Test
    public void shouldNotResumeActiveStatefulTaskInFailedTasks() throws Exception {
        shouldNotPauseTaskInFailedTasks(StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build());
    }

    @Test
    public void shouldNotResumeStandbyTaskInFailedTasks() throws Exception {
        shouldNotResumeTaskInFailedTasks(StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RUNNING).build());
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void shouldNotResumeTaskInFailedTasks(Task task) throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_0, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RESTORING).build();
        StreamsException streamsException = new StreamsException("Something happened", task.id());
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        ((ChangelogReader) Mockito.doThrow(new Throwable[]{streamsException}).doReturn(0L).when(this.changelogReader)).restore(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(task.id(), task), Utils.mkEntry(build.id(), build)}));
        this.stateUpdater.start();
        this.stateUpdater.add(task);
        this.stateUpdater.add(build);
        StateUpdater.ExceptionAndTask exceptionAndTask = new StateUpdater.ExceptionAndTask(streamsException, task);
        verifyExceptionsAndFailedTasks(exceptionAndTask);
        verifyUpdatingTasks(build);
        verifyExceptionsAndFailedTasks(exceptionAndTask);
        verifyUpdatingTasks(build);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithoutTask() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build();
        StandbyTask build2 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_2, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RUNNING).build();
        StreamsException streamsException = new StreamsException("The Streams were crossed!");
        ((ChangelogReader) Mockito.doReturn(0L).doThrow(new Throwable[]{streamsException}).when(this.changelogReader)).restore(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(build.id(), build), Utils.mkEntry(build2.id(), build2)}));
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        this.stateUpdater.add(build2);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask(streamsException, build), new StateUpdater.ExceptionAndTask(streamsException, build2));
        verifyPausedTasks(new Task[0]);
        verifyUpdatingTasks(new Task[0]);
        verifyRestoredActiveTasks(new StreamTask[0]);
        Assertions.assertTrue(this.stateUpdater.isRunning());
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithTask() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build();
        StreamTask build2 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_2, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RESTORING).build();
        StandbyTask build3 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_0, Set.of(TOPIC_PARTITION_C_0)).inState(Task.State.RUNNING).build();
        StreamsException streamsException = new StreamsException("The Streams were crossed!", build.id());
        StreamsException streamsException2 = new StreamsException("The Streams were crossed!", build3.id());
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(build.id(), build), Utils.mkEntry(build2.id(), build2), Utils.mkEntry(build3.id(), build3)});
        Map mkMap2 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(build2.id(), build2), Utils.mkEntry(build3.id(), build3)});
        ((ChangelogReader) Mockito.doReturn(0L).doThrow(new Throwable[]{streamsException}).when(this.changelogReader)).restore(mkMap);
        ((ChangelogReader) Mockito.doReturn(0L).doThrow(new Throwable[]{streamsException2}).when(this.changelogReader)).restore(mkMap2);
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        this.stateUpdater.add(build2);
        this.stateUpdater.add(build3);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask(streamsException, build), new StateUpdater.ExceptionAndTask(streamsException2, build3));
        verifyUpdatingTasks(build2);
        verifyRestoredActiveTasks(new StreamTask[0]);
        verifyPausedTasks(new Task[0]);
        Assertions.assertTrue(this.stateUpdater.isRunning());
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void shouldHandleTaskCorruptedExceptionAndAddFailedTasksToQueue() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build();
        StandbyTask build2 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_2, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RUNNING).build();
        StreamTask build3 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_0, Set.of(TOPIC_PARTITION_C_0)).inState(Task.State.RESTORING).build();
        TaskCorruptedException taskCorruptedException = new TaskCorruptedException(Set.of(build.id(), build2.id()));
        ((ChangelogReader) Mockito.doReturn(0L).doThrow(new Throwable[]{taskCorruptedException}).doReturn(0L).when(this.changelogReader)).restore(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(build.id(), build), Utils.mkEntry(build2.id(), build2), Utils.mkEntry(build3.id(), build3)}));
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        this.stateUpdater.add(build2);
        this.stateUpdater.add(build3);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask(taskCorruptedException, build), new StateUpdater.ExceptionAndTask(taskCorruptedException, build2));
        verifyUpdatingTasks(build3);
        verifyRestoredActiveTasks(new StreamTask[0]);
        ((ChangelogReader) Mockito.verify(this.changelogReader)).unregister(Set.of(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0));
        ((StreamTask) Mockito.verify(build)).markChangelogAsCorrupted(Set.of(TOPIC_PARTITION_A_0));
        ((StandbyTask) Mockito.verify(build2)).markChangelogAsCorrupted(Set.of(TOPIC_PARTITION_B_0));
        Assertions.assertTrue(this.stateUpdater.isRunning());
    }

    @Test
    public void shouldAddFailedTasksToQueueWhenUncaughtExceptionIsThrown() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build();
        StandbyTask build2 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_2, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RUNNING).build();
        IllegalStateException illegalStateException = new IllegalStateException("Nobody expects the Spanish inquisition!");
        ((ChangelogReader) Mockito.doThrow(new Throwable[]{illegalStateException}).when(this.changelogReader)).restore(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(build.id(), build), Utils.mkEntry(build2.id(), build2)}));
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        this.stateUpdater.add(build2);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask(illegalStateException, build), new StateUpdater.ExceptionAndTask(illegalStateException, build2));
        verifyUpdatingTasks(new Task[0]);
        verifyRestoredActiveTasks(new StreamTask[0]);
        verifyPausedTasks(new Task[0]);
        Assertions.assertFalse(this.stateUpdater.isRunning());
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void shouldDrainFailedTasksAndExceptions() throws Exception {
        Assertions.assertFalse(this.stateUpdater.hasExceptionsAndFailedTasks());
        Assertions.assertTrue(this.stateUpdater.drainExceptionsAndFailedTasks().isEmpty());
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RESTORING).build();
        StreamTask build2 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_1, Set.of(TOPIC_PARTITION_C_0)).inState(Task.State.RESTORING).build();
        StreamTask build3 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build();
        StreamTask build4 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_2, Set.of(TOPIC_PARTITION_D_0)).inState(Task.State.RESTORING).build();
        StreamsException streamsException = new StreamsException("The Streams were crossed!", build.id());
        ((ChangelogReader) Mockito.doThrow(new Throwable[]{streamsException}).when(this.changelogReader)).restore(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(build.id(), build)}));
        StreamsException streamsException2 = new StreamsException("The Streams were crossed!", build2.id());
        StreamsException streamsException3 = new StreamsException("The Streams were crossed!", build3.id());
        StreamsException streamsException4 = new StreamsException("The Streams were crossed!", build4.id());
        ((ChangelogReader) Mockito.doThrow(new Throwable[]{streamsException2}).when(this.changelogReader)).restore(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(build2.id(), build2), Utils.mkEntry(build3.id(), build3), Utils.mkEntry(build4.id(), build4)}));
        ((ChangelogReader) Mockito.doThrow(new Throwable[]{streamsException3}).when(this.changelogReader)).restore(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(build3.id(), build3), Utils.mkEntry(build4.id(), build4)}));
        ((ChangelogReader) Mockito.doThrow(new Throwable[]{streamsException4}).when(this.changelogReader)).restore(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(build4.id(), build4)}));
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        verifyDrainingExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask(streamsException, build));
        this.stateUpdater.add(build2);
        this.stateUpdater.add(build3);
        this.stateUpdater.add(build4);
        verifyDrainingExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask(streamsException2, build2), new StateUpdater.ExceptionAndTask(streamsException3, build3), new StateUpdater.ExceptionAndTask(streamsException4, build4));
    }

    @Test
    public void shouldAutoCheckpointTasksOnInterval() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build();
        StreamTask build2 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_2, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RESTORING).build();
        StandbyTask build3 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_0, Set.of(TOPIC_PARTITION_C_0)).inState(Task.State.RUNNING).build();
        StandbyTask build4 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_1, Set.of(TOPIC_PARTITION_D_0)).inState(Task.State.RUNNING).build();
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        this.stateUpdater.add(build2);
        this.stateUpdater.add(build3);
        this.stateUpdater.add(build4);
        verifyUpdatingTasks(build, build2, build3, build4);
        this.time.sleep(101L);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask[0]);
        verifyCheckpointTasks(false, build, build2, build3, build4);
    }

    @Test
    public void shouldNotAutoCheckpointTasksIfIntervalNotElapsed() {
        MockTime mockTime = new MockTime();
        DefaultStateUpdater defaultStateUpdater = new DefaultStateUpdater("test-state-updater", this.metrics, this.config, (Consumer) null, this.changelogReader, this.topologyMetadata, mockTime);
        try {
            StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build();
            StreamTask build2 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_2, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RESTORING).build();
            StandbyTask build3 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_0, Set.of(TOPIC_PARTITION_C_0)).inState(Task.State.RUNNING).build();
            StandbyTask build4 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_1, Set.of(TOPIC_PARTITION_D_0)).inState(Task.State.RUNNING).build();
            Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
            Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
            defaultStateUpdater.start();
            defaultStateUpdater.add(build);
            defaultStateUpdater.add(build2);
            defaultStateUpdater.add(build3);
            defaultStateUpdater.add(build4);
            mockTime.sleep(100L);
            verifyNeverCheckpointTasks(build, build2, build3, build4);
            defaultStateUpdater.shutdown(Duration.ofMinutes(1L));
        } catch (Throwable th) {
            defaultStateUpdater.shutdown(Duration.ofMinutes(1L));
            throw th;
        }
    }

    private void verifyCheckpointTasks(boolean z, Task... taskArr) {
        for (Task task : taskArr) {
            ((Task) Mockito.verify(task, Mockito.timeout(VERIFICATION_TIMEOUT).atLeast(1))).maybeCheckpoint(z);
        }
    }

    private void verifyNeverCheckpointTasks(Task... taskArr) {
        for (Task task : taskArr) {
            ((Task) Mockito.verify(task, Mockito.never())).maybeCheckpoint(ArgumentMatchers.anyBoolean());
        }
    }

    @Test
    public void shouldTasksFromInputQueue() {
        this.stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE));
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build();
        StreamTask build2 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_0, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RESTORING).build();
        StandbyTask build3 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_2, Set.of(TOPIC_PARTITION_C_0)).inState(Task.State.RUNNING).build();
        StandbyTask build4 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_1, Set.of(TOPIC_PARTITION_D_0)).inState(Task.State.RUNNING).build();
        StandbyTask build5 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_1, Set.of(TOPIC_PARTITION_A_1)).inState(Task.State.RUNNING).build();
        this.stateUpdater.add(build);
        this.stateUpdater.add(build3);
        this.stateUpdater.add(build4);
        this.stateUpdater.remove(TASK_0_0);
        this.stateUpdater.add(build2);
        this.stateUpdater.add(build5);
        verifyGetTasks(Set.of(build, build2), Set.of(build3, build4, build5));
    }

    @Test
    public void shouldTasks() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build();
        StreamTask build2 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_0, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RESTORING).build();
        StandbyTask build3 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_2, Set.of(TOPIC_PARTITION_C_0)).inState(Task.State.RUNNING).build();
        StandbyTask build4 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_1, Set.of(TOPIC_PARTITION_D_0)).inState(Task.State.RUNNING).build();
        StandbyTask build5 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_1, Set.of(TOPIC_PARTITION_A_1)).inState(Task.State.RUNNING).build();
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        this.stateUpdater.add(build3);
        this.stateUpdater.add(build4);
        this.stateUpdater.add(build2);
        this.stateUpdater.add(build5);
        verifyUpdatingTasks(build, build2, build3, build4, build5);
        verifyGetTasks(Set.of(build, build2), Set.of(build3, build4, build5));
    }

    @Test
    public void shouldGetTasksFromRestoredActiveTasks() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build();
        StreamTask build2 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_0, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RESTORING).build();
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Set.of(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0));
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        this.stateUpdater.add(build2);
        verifyRestoredActiveTasks(build, build2);
        verifyGetTasks(Set.of(build, build2), Set.of());
        this.stateUpdater.drainRestoredActiveTasks(Duration.ofMinutes(1L));
        verifyGetTasks(Set.of(), Set.of());
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void shouldGetTasksFromExceptionsAndFailedTasks() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_0, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RESTORING).build();
        StandbyTask build2 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_1, Set.of(TOPIC_PARTITION_D_0)).inState(Task.State.RUNNING).build();
        StandbyTask build3 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_1, Set.of(TOPIC_PARTITION_A_1)).inState(Task.State.RUNNING).build();
        TaskCorruptedException taskCorruptedException = new TaskCorruptedException(Set.of(build3.id(), build2.id()));
        StreamsException streamsException = new StreamsException("The Streams were crossed!", build.id());
        ((ChangelogReader) Mockito.doReturn(0L).doThrow(new Throwable[]{taskCorruptedException}).doReturn(0L).when(this.changelogReader)).restore(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(build.id(), build), Utils.mkEntry(build3.id(), build3), Utils.mkEntry(build2.id(), build2)}));
        ((ChangelogReader) Mockito.doReturn(0L).doThrow(new Throwable[]{streamsException}).doReturn(0L).when(this.changelogReader)).restore(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(build.id(), build)}));
        this.stateUpdater.start();
        this.stateUpdater.add(build3);
        this.stateUpdater.add(build);
        this.stateUpdater.add(build2);
        verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTask(taskCorruptedException, build3), new StateUpdater.ExceptionAndTask(taskCorruptedException, build2), new StateUpdater.ExceptionAndTask(streamsException, build));
        verifyGetTasks(Set.of(build), Set.of(build3, build2));
        this.stateUpdater.drainExceptionsAndFailedTasks();
        verifyGetTasks(Set.of(), Set.of());
    }

    @Test
    public void shouldGetTasksFromPausedTasks() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build();
        StandbyTask build2 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_1, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RUNNING).build();
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        this.stateUpdater.add(build2);
        verifyUpdatingTasks(build, build2);
        Mockito.when(Boolean.valueOf(this.topologyMetadata.isPaused((String) null))).thenReturn(true);
        verifyPausedTasks(build, build2);
        verifyGetTasks(Set.of(build), Set.of(build2));
    }

    @Test
    public void shouldRecordMetrics() throws Exception {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(TASK_A_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(Task.State.RESTORING).build();
        StreamTask build2 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_B_0_0, Set.of(TOPIC_PARTITION_B_0)).inState(Task.State.RESTORING).build();
        StandbyTask build3 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_A_0_1, Set.of(TOPIC_PARTITION_A_1)).inState(Task.State.RUNNING).build();
        StandbyTask build4 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_B_0_1, Set.of(TOPIC_PARTITION_B_1)).inState(Task.State.RUNNING).build();
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(build.id(), build), Utils.mkEntry(build2.id(), build2), Utils.mkEntry(build3.id(), build3), Utils.mkEntry(build4.id(), build4)});
        Map mkMap2 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(build.id(), build), Utils.mkEntry(build3.id(), build3)});
        Mockito.when(Boolean.valueOf(this.topologyMetadata.isPaused(build2.id().topologyName()))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.topologyMetadata.isPaused(build4.id().topologyName()))).thenReturn(true);
        Mockito.when(this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when(Boolean.valueOf(this.changelogReader.allChangelogsCompleted())).thenReturn(false);
        Mockito.when(Long.valueOf(this.changelogReader.restore(mkMap))).thenReturn(1L);
        Mockito.when(Long.valueOf(this.changelogReader.restore(mkMap2))).thenReturn(1L);
        Mockito.when(Boolean.valueOf(this.changelogReader.isRestoringActive())).thenReturn(true);
        this.stateUpdater.start();
        this.stateUpdater.add(build);
        this.stateUpdater.add(build2);
        this.stateUpdater.add(build3);
        this.stateUpdater.add(build4);
        verifyPausedTasks(build2, build4);
        MatcherAssert.assertThat(Integer.valueOf(this.metrics.metrics().size()), Matchers.is(11));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("thread-id", "test-state-updater");
        verifyMetric(this.metrics, new MetricName("active-restoring-tasks", "stream-state-updater-metrics", "The number of active tasks currently undergoing restoration", linkedHashMap), Matchers.is(Double.valueOf(1.0d)));
        verifyMetric(this.metrics, new MetricName("standby-updating-tasks", "stream-state-updater-metrics", "The number of standby tasks currently undergoing state update", linkedHashMap), Matchers.is(Double.valueOf(1.0d)));
        verifyMetric(this.metrics, new MetricName("active-paused-tasks", "stream-state-updater-metrics", "The number of active tasks paused restoring", linkedHashMap), Matchers.is(Double.valueOf(1.0d)));
        verifyMetric(this.metrics, new MetricName("standby-paused-tasks", "stream-state-updater-metrics", "The number of standby tasks paused state update", linkedHashMap), Matchers.is(Double.valueOf(1.0d)));
        verifyMetric(this.metrics, new MetricName("idle-ratio", "stream-state-updater-metrics", "The fraction of time the thread spent on being idle", linkedHashMap), Matchers.greaterThanOrEqualTo(Double.valueOf(0.0d)));
        verifyMetric(this.metrics, new MetricName("active-restore-ratio", "stream-state-updater-metrics", "The fraction of time the thread spent on restoring active tasks", linkedHashMap), Matchers.greaterThanOrEqualTo(Double.valueOf(0.0d)));
        verifyMetric(this.metrics, new MetricName("standby-update-ratio", "stream-state-updater-metrics", "The fraction of time the thread spent on updating standby tasks", linkedHashMap), Matchers.is(Double.valueOf(0.0d)));
        verifyMetric(this.metrics, new MetricName("checkpoint-ratio", "stream-state-updater-metrics", "The fraction of time the thread spent on checkpointing tasks restored progress", linkedHashMap), Matchers.greaterThanOrEqualTo(Double.valueOf(0.0d)));
        verifyMetric(this.metrics, new MetricName("restore-records-rate", "stream-state-updater-metrics", "The average per-second number of records restored", linkedHashMap), Matchers.not(Double.valueOf(0.0d)));
        verifyMetric(this.metrics, new MetricName("restore-call-rate", "stream-state-updater-metrics", "The average per-second number of restore calls triggered", linkedHashMap), Matchers.not(Double.valueOf(0.0d)));
        this.stateUpdater.shutdown(Duration.ofMinutes(1L));
        MatcherAssert.assertThat(Integer.valueOf(this.metrics.metrics().size()), Matchers.is(1));
    }

    private static <T> void verifyMetric(Metrics metrics, MetricName metricName, Matcher<T> matcher) {
        MatcherAssert.assertThat(((KafkaMetric) metrics.metrics().get(metricName)).metricName().description(), Matchers.is(metricName.description()));
        MatcherAssert.assertThat(((KafkaMetric) metrics.metrics().get(metricName)).metricName().tags(), Matchers.is(metricName.tags()));
        MatcherAssert.assertThat(((KafkaMetric) metrics.metrics().get(metricName)).metricValue(), matcher);
    }

    private void verifyGetTasks(Set<StreamTask> set, Set<StandbyTask> set2) {
        Set tasks = this.stateUpdater.tasks();
        Assertions.assertEquals(set.size() + set2.size(), tasks.size());
        tasks.forEach(task -> {
            Assertions.assertInstanceOf(ReadOnlyTask.class, task);
        });
        Set set3 = (Set) tasks.stream().map((v0) -> {
            return v0.id();
        }).collect(Collectors.toSet());
        HashSet hashSet = new HashSet(set);
        hashSet.addAll(set2);
        Assertions.assertTrue(set3.containsAll((Set) hashSet.stream().map((v0) -> {
            return v0.id();
        }).collect(Collectors.toSet())));
        Set activeTasks = this.stateUpdater.activeTasks();
        Assertions.assertEquals(set.size(), activeTasks.size());
        Assertions.assertTrue(activeTasks.containsAll(set));
        Set standbyTasks = this.stateUpdater.standbyTasks();
        Assertions.assertEquals(set2.size(), standbyTasks.size());
        Assertions.assertTrue(standbyTasks.containsAll(set2));
    }

    private void verifyRestoredActiveTasks(StreamTask... streamTaskArr) throws Exception {
        if (streamTaskArr.length == 0) {
            TestUtils.waitForCondition(() -> {
                return this.stateUpdater.restoredActiveTasks().isEmpty();
            }, VERIFICATION_TIMEOUT, "Did not get empty restored active task within the given timeout!");
            return;
        }
        Set of = Set.of((Object[]) streamTaskArr);
        HashSet hashSet = new HashSet();
        TestUtils.waitForCondition(() -> {
            hashSet.addAll(this.stateUpdater.restoredActiveTasks());
            return hashSet.containsAll(of) && hashSet.size() == of.size();
        }, VERIFICATION_TIMEOUT, "Did not get all restored active task within the given timeout!");
    }

    private void verifyDrainingRestoredActiveTasks(StreamTask... streamTaskArr) throws Exception {
        Set of = Set.of((Object[]) streamTaskArr);
        HashSet hashSet = new HashSet();
        TestUtils.waitForCondition(() -> {
            hashSet.addAll(this.stateUpdater.drainRestoredActiveTasks(Duration.ofMillis(CALL_TIMEOUT)));
            return hashSet.containsAll(of) && hashSet.size() == of.size();
        }, VERIFICATION_TIMEOUT, "Did not get all restored active task within the given timeout!");
        Assertions.assertTrue(this.stateUpdater.drainRestoredActiveTasks(Duration.ZERO).isEmpty());
    }

    private void verifyUpdatingTasks(Task... taskArr) throws Exception {
        if (taskArr.length == 0) {
            TestUtils.waitForCondition(() -> {
                return this.stateUpdater.updatingTasks().isEmpty();
            }, VERIFICATION_TIMEOUT, "Did not get empty updating task within the given timeout!");
            return;
        }
        Set of = Set.of((Object[]) taskArr);
        HashSet hashSet = new HashSet();
        TestUtils.waitForCondition(() -> {
            hashSet.addAll(this.stateUpdater.updatingTasks());
            return hashSet.containsAll(of) && hashSet.size() == of.size();
        }, VERIFICATION_TIMEOUT, "Did not get all updating task within the given timeout!");
    }

    private void verifyUpdatingStandbyTasks(StandbyTask... standbyTaskArr) throws Exception {
        Set of = Set.of((Object[]) standbyTaskArr);
        HashSet hashSet = new HashSet();
        TestUtils.waitForCondition(() -> {
            hashSet.addAll(this.stateUpdater.updatingStandbyTasks());
            return hashSet.containsAll(of) && hashSet.size() == of.size();
        }, VERIFICATION_TIMEOUT, "Did not see all standby task within the given timeout!");
    }

    private void verifyIdle() throws Exception {
        TestUtils.waitForCondition(() -> {
            return this.stateUpdater.isIdle();
        }, VERIFICATION_TIMEOUT, "State updater did not enter an idling state!");
    }

    private void verifyPausedTasks(Task... taskArr) throws Exception {
        if (taskArr.length == 0) {
            TestUtils.waitForCondition(() -> {
                return this.stateUpdater.pausedTasks().isEmpty();
            }, VERIFICATION_TIMEOUT, "Did not get empty paused task within the given timeout!");
            return;
        }
        Set of = Set.of((Object[]) taskArr);
        HashSet hashSet = new HashSet();
        TestUtils.waitForCondition(() -> {
            hashSet.addAll(this.stateUpdater.pausedTasks());
            return hashSet.containsAll(of) && hashSet.size() == of.size();
        }, VERIFICATION_TIMEOUT, "Did not get all paused task within the given timeout!");
    }

    private void verifyExceptionsAndFailedTasks(StateUpdater.ExceptionAndTask... exceptionAndTaskArr) throws Exception {
        List asList = Arrays.asList(exceptionAndTaskArr);
        HashSet hashSet = new HashSet();
        TestUtils.waitForCondition(() -> {
            hashSet.addAll(this.stateUpdater.exceptionsAndFailedTasks());
            return hashSet.containsAll(asList) && hashSet.size() == asList.size();
        }, VERIFICATION_TIMEOUT, "Did not get all exceptions and failed tasks within the given timeout!");
    }

    private void verifyFailedTasks(Class<? extends RuntimeException> cls, Task... taskArr) throws Exception {
        List asList = Arrays.asList(taskArr);
        HashSet hashSet = new HashSet();
        TestUtils.waitForCondition(() -> {
            for (StateUpdater.ExceptionAndTask exceptionAndTask : this.stateUpdater.exceptionsAndFailedTasks()) {
                if (cls.isInstance(exceptionAndTask.exception())) {
                    hashSet.add(exceptionAndTask.task());
                }
            }
            return hashSet.containsAll(asList) && hashSet.size() == asList.size();
        }, VERIFICATION_TIMEOUT, "Did not get all exceptions and failed tasks within the given timeout!");
    }

    private void verifyDrainingExceptionsAndFailedTasks(StateUpdater.ExceptionAndTask... exceptionAndTaskArr) throws Exception {
        List asList = Arrays.asList(exceptionAndTaskArr);
        ArrayList arrayList = new ArrayList();
        TestUtils.waitForCondition(() -> {
            if (this.stateUpdater.hasExceptionsAndFailedTasks()) {
                List drainExceptionsAndFailedTasks = this.stateUpdater.drainExceptionsAndFailedTasks();
                Assertions.assertFalse(drainExceptionsAndFailedTasks.isEmpty());
                arrayList.addAll(drainExceptionsAndFailedTasks);
            }
            return arrayList.containsAll(asList) && arrayList.size() == asList.size();
        }, VERIFICATION_TIMEOUT, "Did not get all exceptions and failed tasks within the given timeout!");
        Assertions.assertFalse(this.stateUpdater.hasExceptionsAndFailedTasks());
        Assertions.assertTrue(this.stateUpdater.drainExceptionsAndFailedTasks().isEmpty());
    }

    private void verifyRemovedTasks(Task... taskArr) throws Exception {
        if (taskArr.length == 0) {
            TestUtils.waitForCondition(() -> {
                return this.stateUpdater.removedTasks().isEmpty();
            }, VERIFICATION_TIMEOUT, "Did not get empty removed task within the given timeout!");
            return;
        }
        Set of = Set.of((Object[]) taskArr);
        HashSet hashSet = new HashSet();
        TestUtils.waitForCondition(() -> {
            hashSet.addAll(this.stateUpdater.removedTasks());
            return hashSet.containsAll(of) && hashSet.size() == of.size();
        }, VERIFICATION_TIMEOUT, "Did not get all removed task within the given timeout!");
    }
}
