package org.apache.kafka.streams.processor.internals.tasks;

import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TaskExecutionMetadata;
import org.apache.kafka.streams.processor.internals.TasksRegistry;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.class */
public class DefaultTaskManagerTest {
    private static final long VERIFICATION_TIMEOUT = 15000;
    private final Time time = new MockTime(1);
    private final TaskId taskId = new TaskId(0, 0, "A");
    private final StreamTask task = StreamsTestUtils.TaskBuilder.statelessTask(this.taskId).build();
    private final TasksRegistry tasks = (TasksRegistry) Mockito.mock(TasksRegistry.class);
    private final TaskExecutor taskExecutor = (TaskExecutor) Mockito.mock(TaskExecutor.class);
    private final StreamsException exception = (StreamsException) Mockito.mock(StreamsException.class);
    private final TaskExecutionMetadata taskExecutionMetadata = (TaskExecutionMetadata) Mockito.mock(TaskExecutionMetadata.class);
    private final TaskManager taskManager = new DefaultTaskManager(this.time, "TaskManager", this.tasks, (taskManager, str, time, taskExecutionMetadata) -> {
        return this.taskExecutor;
    }, this.taskExecutionMetadata, 1);

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest$AwaitingRunnable.class */
    private class AwaitingRunnable implements Runnable {
        private final CountDownLatch awaitDone = new CountDownLatch(1);
        private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);

        private AwaitingRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!this.shutdownRequested.get()) {
                try {
                    TaskManager taskManager = DefaultTaskManagerTest.this.taskManager;
                    AtomicBoolean atomicBoolean = this.shutdownRequested;
                    Objects.requireNonNull(atomicBoolean);
                    taskManager.awaitProcessableTasks(atomicBoolean::get);
                } catch (InterruptedException e) {
                }
                this.awaitDone.countDown();
            }
        }

        public void shutdown() {
            this.shutdownRequested.set(true);
            DefaultTaskManagerTest.this.taskManager.signalTaskExecutors();
        }
    }

    @BeforeEach
    public void setUp() {
        Mockito.when(Boolean.valueOf(this.task.isProcessable(ArgumentMatchers.anyLong()))).thenReturn(true);
        Mockito.when(this.tasks.activeTasks()).thenReturn(Collections.singleton(this.task));
        Mockito.when(this.tasks.task(this.taskId)).thenReturn(this.task);
    }

    @Test
    public void shouldShutdownTaskExecutors() {
        Duration duration = (Duration) Mockito.mock(Duration.class);
        this.taskManager.shutdown(duration);
        ((TaskExecutor) Mockito.verify(this.taskExecutor)).requestShutdown();
        ((TaskExecutor) Mockito.verify(this.taskExecutor)).awaitShutdown(duration);
    }

    @Test
    public void shouldStartTaskExecutors() {
        this.taskManager.startTaskExecutors();
        ((TaskExecutor) Mockito.verify(this.taskExecutor)).start();
    }

    @Test
    public void shouldAddTask() {
        this.taskManager.add(Collections.singleton(this.task));
        ((TasksRegistry) Mockito.verify(this.tasks)).addTask(this.task);
        Mockito.when(this.tasks.activeTasks()).thenReturn(Collections.singleton(this.task));
        Assertions.assertEquals(1, this.taskManager.getTasks().size());
    }

    @Test
    public void shouldAssignTaskThatCanBeProcessed() {
        this.taskManager.add(Collections.singleton(this.task));
        Mockito.when(this.tasks.activeTasks()).thenReturn(Collections.singleton(this.task));
        Mockito.when(Boolean.valueOf(this.taskExecutionMetadata.canProcessTask((Task) ArgumentMatchers.eq(this.task), ArgumentMatchers.anyLong()))).thenReturn(true);
        Assertions.assertEquals(this.task, this.taskManager.assignNextTask(this.taskExecutor));
        Assertions.assertNull(this.taskManager.assignNextTask(this.taskExecutor));
    }

    @Test
    public void shouldBlockOnAwait() throws InterruptedException {
        AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
        new Thread(awaitingRunnable).start();
        Assertions.assertFalse(awaitingRunnable.awaitDone.await(100L, TimeUnit.MILLISECONDS));
        awaitingRunnable.shutdown();
    }

    @Test
    public void shouldReturnFromAwaitOnInterruption() throws InterruptedException {
        AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
        Thread thread = new Thread(awaitingRunnable);
        thread.start();
        ((TasksRegistry) Mockito.verify(this.tasks, Mockito.timeout(VERIFICATION_TIMEOUT).atLeastOnce())).activeTasks();
        thread.interrupt();
        Assertions.assertTrue(awaitingRunnable.awaitDone.await(VERIFICATION_TIMEOUT, TimeUnit.MILLISECONDS));
        awaitingRunnable.shutdown();
    }

    @Test
    public void shouldReturnFromAwaitOnSignalProcessableTasks() throws InterruptedException {
        AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
        new Thread(awaitingRunnable).start();
        ((TasksRegistry) Mockito.verify(this.tasks, Mockito.timeout(VERIFICATION_TIMEOUT).atLeastOnce())).activeTasks();
        this.taskManager.signalTaskExecutors();
        Assertions.assertTrue(awaitingRunnable.awaitDone.await(VERIFICATION_TIMEOUT, TimeUnit.MILLISECONDS));
        awaitingRunnable.shutdown();
    }

    @Test
    public void shouldReturnFromAwaitOnUnassignment() throws InterruptedException {
        this.taskManager.add(Collections.singleton(this.task));
        Mockito.when(Boolean.valueOf(this.taskExecutionMetadata.canProcessTask((Task) ArgumentMatchers.eq(this.task), ArgumentMatchers.anyLong()))).thenReturn(true);
        StreamTask assignNextTask = this.taskManager.assignNextTask(this.taskExecutor);
        Assertions.assertNotNull(assignNextTask);
        AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
        new Thread(awaitingRunnable).start();
        ((TasksRegistry) Mockito.verify(this.tasks, Mockito.timeout(VERIFICATION_TIMEOUT).atLeastOnce())).activeTasks();
        this.taskManager.unassignTask(assignNextTask, this.taskExecutor);
        Assertions.assertTrue(awaitingRunnable.awaitDone.await(VERIFICATION_TIMEOUT, TimeUnit.MILLISECONDS));
        awaitingRunnable.shutdown();
    }

    @Test
    public void shouldReturnFromAwaitOnAdding() throws InterruptedException {
        AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
        new Thread(awaitingRunnable).start();
        ((TasksRegistry) Mockito.verify(this.tasks, Mockito.timeout(VERIFICATION_TIMEOUT).atLeastOnce())).activeTasks();
        this.taskManager.add(Collections.singleton(this.task));
        Assertions.assertTrue(awaitingRunnable.awaitDone.await(VERIFICATION_TIMEOUT, TimeUnit.MILLISECONDS));
        awaitingRunnable.shutdown();
    }

    @Test
    public void shouldReturnFromAwaitOnUnlocking() throws InterruptedException {
        this.taskManager.add(Collections.singleton(this.task));
        this.taskManager.lockTasks(Collections.singleton(this.task.id()));
        AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
        new Thread(awaitingRunnable).start();
        ((TasksRegistry) Mockito.verify(this.tasks, Mockito.timeout(VERIFICATION_TIMEOUT).atLeastOnce())).activeTasks();
        this.taskManager.unlockAllTasks();
        Assertions.assertTrue(awaitingRunnable.awaitDone.await(VERIFICATION_TIMEOUT, TimeUnit.MILLISECONDS));
        awaitingRunnable.shutdown();
    }

    @Test
    public void shouldAssignTasksThatCanBeSystemTimePunctuated() {
        this.taskManager.add(Collections.singleton(this.task));
        Mockito.when(this.tasks.activeTasks()).thenReturn(Collections.singleton(this.task));
        Mockito.when(Boolean.valueOf(this.taskExecutionMetadata.canProcessTask((Task) ArgumentMatchers.eq(this.task), ArgumentMatchers.anyLong()))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.task.canPunctuateSystemTime())).thenReturn(true);
        Assertions.assertEquals(this.task, this.taskManager.assignNextTask(this.taskExecutor));
        Assertions.assertNull(this.taskManager.assignNextTask(this.taskExecutor));
    }

    @Test
    public void shouldAssignTasksThatCanBeStreamTimePunctuated() {
        this.taskManager.add(Collections.singleton(this.task));
        Mockito.when(this.tasks.activeTasks()).thenReturn(Collections.singleton(this.task));
        Mockito.when(Boolean.valueOf(this.taskExecutionMetadata.canPunctuateTask((Task) ArgumentMatchers.eq(this.task)))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.task.canPunctuateStreamTime())).thenReturn(true);
        Assertions.assertEquals(this.task, this.taskManager.assignNextTask(this.taskExecutor));
        Assertions.assertNull(this.taskManager.assignNextTask(this.taskExecutor));
    }

    @Test
    public void shouldNotAssignTasksIfUncaughtExceptionPresent() {
        this.taskManager.add(Collections.singleton(this.task));
        Mockito.when(this.tasks.activeTasks()).thenReturn(Collections.singleton(this.task));
        ensureTaskMakesProgress();
        this.taskManager.assignNextTask(this.taskExecutor);
        this.taskManager.setUncaughtException(new StreamsException("Exception"), this.taskId);
        this.taskManager.unassignTask(this.task, this.taskExecutor);
        Assertions.assertNull(this.taskManager.assignNextTask(this.taskExecutor));
    }

    @Test
    public void shouldNotAssignTasksForPunctuationIfPunctuationDisabled() {
        this.taskManager.add(Collections.singleton(this.task));
        Mockito.when(this.tasks.activeTasks()).thenReturn(Collections.singleton(this.task));
        Mockito.when(Boolean.valueOf(this.taskExecutionMetadata.canPunctuateTask((Task) ArgumentMatchers.eq(this.task)))).thenReturn(false);
        Mockito.when(Boolean.valueOf(this.task.canPunctuateStreamTime())).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.task.canPunctuateSystemTime())).thenReturn(true);
        Assertions.assertNull(this.taskManager.assignNextTask(this.taskExecutor));
    }

    @Test
    public void shouldNotAssignTasksForProcessingIfProcessingDisabled() {
        this.taskManager.add(Collections.singleton(this.task));
        Mockito.when(this.tasks.activeTasks()).thenReturn(Collections.singleton(this.task));
        Mockito.when(Boolean.valueOf(this.taskExecutionMetadata.canProcessTask((Task) ArgumentMatchers.eq(this.task), ArgumentMatchers.anyLong()))).thenReturn(false);
        Mockito.when(Boolean.valueOf(this.task.isProcessable(ArgumentMatchers.anyLong()))).thenReturn(true);
        Assertions.assertNull(this.taskManager.assignNextTask(this.taskExecutor));
    }

    @Test
    public void shouldUnassignTask() {
        this.taskManager.add(Collections.singleton(this.task));
        Mockito.when(this.tasks.activeTasks()).thenReturn(Collections.singleton(this.task));
        Mockito.when(Boolean.valueOf(this.taskExecutionMetadata.canProcessTask((Task) ArgumentMatchers.eq(this.task), ArgumentMatchers.anyLong()))).thenReturn(true);
        Assertions.assertEquals(this.task, this.taskManager.assignNextTask(this.taskExecutor));
        this.taskManager.unassignTask(this.task, this.taskExecutor);
        Assertions.assertEquals(this.task, this.taskManager.assignNextTask(this.taskExecutor));
    }

    @Test
    public void shouldNotUnassignNotOwnedTask() {
        this.taskManager.add(Collections.singleton(this.task));
        Mockito.when(this.tasks.activeTasks()).thenReturn(Collections.singleton(this.task));
        Mockito.when(Boolean.valueOf(this.taskExecutionMetadata.canProcessTask((Task) ArgumentMatchers.eq(this.task), ArgumentMatchers.anyLong()))).thenReturn(true);
        Assertions.assertEquals(this.task, this.taskManager.assignNextTask(this.taskExecutor));
        TaskExecutor taskExecutor = (TaskExecutor) Mockito.mock(TaskExecutor.class);
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.taskManager.unassignTask(this.task, taskExecutor);
        });
    }

    @Test
    public void shouldNotRemoveUnlockedTask() {
        this.taskManager.add(Collections.singleton(this.task));
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.taskManager.remove(this.task.id());
        });
    }

    @Test
    public void shouldNotRemoveAssignedTask() {
        this.taskManager.add(Collections.singleton(this.task));
        this.taskManager.assignNextTask(this.taskExecutor);
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.taskManager.remove(this.task.id());
        });
    }

    @Test
    public void shouldRemoveTask() {
        this.taskManager.add(Collections.singleton(this.task));
        Mockito.when(this.tasks.activeTasks()).thenReturn(Collections.singleton(this.task));
        Mockito.when(this.tasks.task(this.task.id())).thenReturn(this.task);
        Mockito.when(Boolean.valueOf(this.tasks.contains(this.task.id()))).thenReturn(true);
        this.taskManager.lockTasks(Collections.singleton(this.task.id()));
        this.taskManager.remove(this.task.id());
        ((TasksRegistry) Mockito.verify(this.tasks)).removeTask(this.task);
        Mockito.reset(new TasksRegistry[]{this.tasks});
        Mockito.when(this.tasks.activeTasks()).thenReturn(Collections.emptySet());
        Assertions.assertEquals(0, this.taskManager.getTasks().size());
    }

    @Test
    public void shouldNotAssignLockedTask() {
        this.taskManager.add(Collections.singleton(this.task));
        Mockito.when(this.tasks.activeTasks()).thenReturn(Collections.singleton(this.task));
        Mockito.when(Boolean.valueOf(this.taskExecutionMetadata.canProcessTask((Task) ArgumentMatchers.eq(this.task), ArgumentMatchers.anyLong()))).thenReturn(true);
        Mockito.when(this.tasks.task(this.task.id())).thenReturn(this.task);
        Mockito.when(Boolean.valueOf(this.tasks.contains(this.task.id()))).thenReturn(true);
        Assertions.assertTrue(this.taskManager.lockTasks(Collections.singleton(this.task.id())).isDone());
        Assertions.assertNull(this.taskManager.assignNextTask(this.taskExecutor));
    }

    @Test
    public void shouldLockAnEmptySetOfTasks() {
        this.taskManager.add(Collections.singleton(this.task));
        Mockito.when(this.tasks.activeTasks()).thenReturn(Collections.singleton(this.task));
        Mockito.when(Boolean.valueOf(this.taskExecutionMetadata.canProcessTask((Task) ArgumentMatchers.eq(this.task), ArgumentMatchers.anyLong()))).thenReturn(true);
        Mockito.when(this.tasks.task(this.task.id())).thenReturn(this.task);
        Mockito.when(Boolean.valueOf(this.tasks.contains(this.task.id()))).thenReturn(true);
        Assertions.assertTrue(this.taskManager.lockTasks(Collections.emptySet()).isDone());
        Assertions.assertEquals(this.task, this.taskManager.assignNextTask(this.taskExecutor));
    }

    @Test
    public void shouldLockATaskThatWasVoluntarilyReleased() {
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        this.taskManager.add(Collections.singleton(this.task));
        Mockito.when(this.tasks.activeTasks()).thenReturn(Collections.singleton(this.task));
        Mockito.when(Boolean.valueOf(this.taskExecutionMetadata.canProcessTask((Task) ArgumentMatchers.eq(this.task), ArgumentMatchers.anyLong()))).thenReturn(true);
        Mockito.when(this.tasks.task(this.task.id())).thenReturn(this.task);
        Mockito.when(Boolean.valueOf(this.tasks.contains(this.task.id()))).thenReturn(true);
        Mockito.when(this.taskExecutor.unassign()).thenReturn(kafkaFutureImpl);
        Assertions.assertEquals(this.task, this.taskManager.assignNextTask(this.taskExecutor));
        KafkaFuture lockTasks = this.taskManager.lockTasks(Collections.singleton(this.task.id()));
        Assertions.assertFalse(lockTasks.isDone());
        this.taskManager.unassignTask(this.task, this.taskExecutor);
        kafkaFutureImpl.complete((Object) null);
        Assertions.assertTrue(lockTasks.isDone());
        Assertions.assertNull(this.taskManager.assignNextTask(this.taskExecutor));
    }

    @Test
    public void shouldNotAssignAnyLockedTask() {
        this.taskManager.add(Collections.singleton(this.task));
        Mockito.when(this.tasks.activeTasks()).thenReturn(Collections.singleton(this.task));
        Mockito.when(Boolean.valueOf(this.taskExecutionMetadata.canProcessTask((Task) ArgumentMatchers.eq(this.task), ArgumentMatchers.anyLong()))).thenReturn(true);
        Mockito.when(this.tasks.task(this.task.id())).thenReturn(this.task);
        Mockito.when(Boolean.valueOf(this.tasks.contains(this.task.id()))).thenReturn(true);
        Assertions.assertTrue(this.taskManager.lockAllTasks().isDone());
        Assertions.assertNull(this.taskManager.assignNextTask(this.taskExecutor));
    }

    @Test
    public void shouldNotSetUncaughtExceptionsForUnassignedTasks() {
        this.taskManager.add(Collections.singleton(this.task));
        Assertions.assertEquals("An uncaught exception can only be set as long as the task is still assigned", ((Exception) Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.taskManager.setUncaughtException(this.exception, this.task.id());
        })).getMessage());
    }

    @Test
    public void shouldNotSetUncaughtExceptionsTwice() {
        this.taskManager.add(Collections.singleton(this.task));
        Mockito.when(this.tasks.activeTasks()).thenReturn(Collections.singleton(this.task));
        Mockito.when(Boolean.valueOf(this.taskExecutionMetadata.canProcessTask((Task) ArgumentMatchers.eq(this.task), ArgumentMatchers.anyLong()))).thenReturn(true);
        this.taskManager.assignNextTask(this.taskExecutor);
        this.taskManager.setUncaughtException(this.exception, this.task.id());
        Assertions.assertEquals("The uncaught exception must be cleared before restarting processing", ((Exception) Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.taskManager.setUncaughtException(this.exception, this.task.id());
        })).getMessage());
    }

    @Test
    public void shouldReturnAndClearExceptionsOnDrainExceptions() {
        this.taskManager.add(Collections.singleton(this.task));
        Mockito.when(this.tasks.activeTasks()).thenReturn(Collections.singleton(this.task));
        Mockito.when(Boolean.valueOf(this.taskExecutionMetadata.canProcessTask((Task) ArgumentMatchers.eq(this.task), ArgumentMatchers.anyLong()))).thenReturn(true);
        this.taskManager.assignNextTask(this.taskExecutor);
        this.taskManager.setUncaughtException(this.exception, this.task.id());
        Assertions.assertEquals(this.taskManager.drainUncaughtExceptions(), Collections.singletonMap(this.task.id(), this.exception));
        Assertions.assertEquals(this.taskManager.drainUncaughtExceptions(), Collections.emptyMap());
    }

    @Test
    public void shouldUnassignLockingTask() {
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        this.taskManager.add(Collections.singleton(this.task));
        Mockito.when(this.tasks.activeTasks()).thenReturn(Collections.singleton(this.task));
        Mockito.when(this.tasks.task(this.task.id())).thenReturn(this.task);
        Mockito.when(Boolean.valueOf(this.tasks.contains(this.task.id()))).thenReturn(true);
        Mockito.when(this.taskExecutor.unassign()).thenReturn(kafkaFutureImpl);
        Mockito.when(Boolean.valueOf(this.taskExecutionMetadata.canProcessTask((Task) ArgumentMatchers.eq(this.task), ArgumentMatchers.anyLong()))).thenReturn(true);
        Assertions.assertEquals(this.task, this.taskManager.assignNextTask(this.taskExecutor));
        Mockito.when(this.taskExecutor.currentTask()).thenReturn(new ReadOnlyTask(this.task));
        KafkaFuture lockAllTasks = this.taskManager.lockAllTasks();
        Assertions.assertFalse(lockAllTasks.isDone());
        ((TaskExecutor) Mockito.verify(this.taskExecutor)).unassign();
        this.taskManager.unassignTask(this.task, this.taskExecutor);
        kafkaFutureImpl.complete(this.task);
        Assertions.assertTrue(lockAllTasks.isDone());
    }

    private void ensureTaskMakesProgress() {
        Mockito.when(Boolean.valueOf(this.taskExecutionMetadata.canPunctuateTask((Task) ArgumentMatchers.eq(this.task)))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.task.canPunctuateStreamTime())).thenReturn(true);
    }
}
