package org.apache.james.task.eventsourcing.distributed;

import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import org.apache.james.backends.rabbitmq.RabbitMQExtension;
import org.apache.james.backends.rabbitmq.RabbitMQManagementAPI;
import org.apache.james.server.task.json.JsonTaskSerializer;
import org.apache.james.server.task.json.TestTask;
import org.apache.james.server.task.json.dto.MemoryReferenceTaskStore;
import org.apache.james.server.task.json.dto.TaskDTOModule;
import org.apache.james.server.task.json.dto.TestTaskDTOModules;
import org.apache.james.task.CompletedTask;
import org.apache.james.task.MemoryReferenceTask;
import org.apache.james.task.Task;
import org.apache.james.task.TaskId;
import org.apache.james.task.TaskWithId;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.Durations;
import org.awaitility.core.ConditionTimeoutException;
import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.class */
class RabbitMQWorkQueueTest {
    private static final TaskId TASK_ID = TaskId.fromString("2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd");
    private static final TaskId TASK_ID_2 = TaskId.fromString("3c7f4081-aa30-11e9-bf6c-2d3b9e84aafd");
    private static final Task TASK = new CompletedTask();
    private static final Task TASK2 = new CompletedTask();
    private static final TaskWithId TASK_WITH_ID = new TaskWithId(TASK_ID, TASK);
    private static final TaskWithId TASK_WITH_ID_2 = new TaskWithId(TASK_ID_2, TASK2);

    @RegisterExtension
    static RabbitMQExtension rabbitMQExtension = RabbitMQExtension.defaultRabbitMQ().restartPolicy(RabbitMQExtension.DockerRestartPolicy.PER_CLASS).isolationPolicy(RabbitMQExtension.IsolationPolicy.WEAK);
    private RabbitMQWorkQueue testee;
    private ImmediateWorker worker;
    private JsonTaskSerializer serializer;
    private RabbitMQManagementAPI managementAPI;

    RabbitMQWorkQueueTest() {
    }

    @BeforeEach
    void setUp() throws Exception {
        this.worker = new ImmediateWorker();
        this.serializer = JsonTaskSerializer.of(new TaskDTOModule[]{TestTaskDTOModules.COMPLETED_TASK_MODULE, (TaskDTOModule) TestTaskDTOModules.MEMORY_REFERENCE_TASK_MODULE.apply(new MemoryReferenceTaskStore())});
        this.testee = new RabbitMQWorkQueue(this.worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), this.serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate(), rabbitMQExtension.getRabbitMQ().getConfiguration());
        this.testee.start();
        this.managementAPI = rabbitMQExtension.managementAPI();
    }

    @AfterEach
    void tearDown() {
        this.testee.close();
    }

    @Test
    void shouldSetConsumerTimeoutArgumentOnTaskQueue() {
        Assertions.assertThat((String) this.managementAPI.queueDetails("/", "taskManagerWorkQueue").getArguments().get("x-consumer-timeout")).isEqualTo("86400000");
    }

    @Test
    void workQueueShouldConsumeSubmittedTask() {
        this.testee.submit(TASK_WITH_ID);
        Awaitility.await().atMost(Durations.FIVE_HUNDRED_MILLISECONDS).until(() -> {
            return Boolean.valueOf(!this.worker.results.isEmpty());
        });
        Assertions.assertThat(this.worker.tasks).containsExactly(new TaskWithId[]{TASK_WITH_ID});
        Assertions.assertThat(this.worker.results).containsExactly(new Task.Result[]{Task.Result.COMPLETED});
    }

    @Test
    void workQueueShouldConsumeTwoSubmittedTasks() {
        this.testee.submit(TASK_WITH_ID);
        this.testee.submit(TASK_WITH_ID_2);
        Awaitility.await().atMost(Durations.FIVE_HUNDRED_MILLISECONDS).until(() -> {
            return Boolean.valueOf(this.worker.results.size() == 2);
        });
        Assertions.assertThat(this.worker.tasks).containsExactly(new TaskWithId[]{TASK_WITH_ID, TASK_WITH_ID_2});
        Assertions.assertThat(this.worker.results).allSatisfy(result -> {
            Assertions.assertThat(result).isEqualTo(Task.Result.COMPLETED);
        });
    }

    @Test
    void givenTwoWorkQueuesOnlyTheFirstOneIsConsumingTasks() throws Exception {
        this.testee.submit(TASK_WITH_ID);
        ImmediateWorker immediateWorker = new ImmediateWorker();
        RabbitMQWorkQueue rabbitMQWorkQueue = new RabbitMQWorkQueue(immediateWorker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), this.serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate(), rabbitMQExtension.getRabbitMQ().getConfiguration());
        try {
            rabbitMQWorkQueue.start();
            IntStream.range(0, 9).forEach(i -> {
                this.testee.submit(TASK_WITH_ID_2);
            });
            Awaitility.await().atMost(Durations.FIVE_HUNDRED_MILLISECONDS).until(() -> {
                return Boolean.valueOf(this.worker.results.size() == 10);
            });
            Assertions.assertThat(immediateWorker.tasks).isEmpty();
            rabbitMQWorkQueue.close();
        } catch (Throwable th) {
            try {
                rabbitMQWorkQueue.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void givenANonDeserializableTaskItShouldBeFlaggedAsFailedAndItDoesNotPreventFollowingTasks() throws Exception {
        TaskWithId taskWithId = new TaskWithId(TaskId.fromString("4bf6d081-aa30-11e9-bf6c-2d3b9e84aafd"), new TestTask(42L));
        RabbitMQWorkQueue rabbitMQWorkQueue = new RabbitMQWorkQueue(new ImmediateWorker(), rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), JsonTaskSerializer.of(new TaskDTOModule[]{TestTaskDTOModules.TEST_TYPE}), RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate(), rabbitMQExtension.getRabbitMQ().getConfiguration());
        try {
            Thread.sleep(200L);
            rabbitMQWorkQueue.start();
            rabbitMQWorkQueue.submit(taskWithId);
            Awaitility.await().atMost(Durations.FIVE_HUNDRED_MILLISECONDS).until(() -> {
                return Boolean.valueOf(this.worker.failedTasks.size() == 1);
            });
            Assertions.assertThat(this.worker.failedTasks).containsExactly(new TaskId[]{taskWithId.getId()});
            this.testee.submit(TASK_WITH_ID);
            Awaitility.await().atMost(Durations.FIVE_HUNDRED_MILLISECONDS).until(() -> {
                return Boolean.valueOf(this.worker.results.size() == 1);
            });
            Assertions.assertThat(this.worker.tasks).containsExactly(new TaskWithId[]{TASK_WITH_ID});
            rabbitMQWorkQueue.close();
        } catch (Throwable th) {
            try {
                rabbitMQWorkQueue.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void tasksShouldBeConsumedSequentially() {
        AtomicLong atomicLong = new AtomicLong(0L);
        TaskWithId taskWithId = new TaskWithId(TaskId.fromString("1111d081-aa30-11e9-bf6c-2d3b9e84aafd"), new MemoryReferenceTask(() -> {
            atomicLong.addAndGet(1L);
            Thread.sleep(1000L);
            return Task.Result.COMPLETED;
        }));
        TaskWithId taskWithId2 = new TaskWithId(TaskId.fromString("2222d082-aa30-22e9-bf6c-2d3b9e84aafd"), new MemoryReferenceTask(() -> {
            atomicLong.addAndGet(2L);
            return Task.Result.COMPLETED;
        }));
        this.testee.submit(taskWithId);
        this.testee.submit(taskWithId2);
        Assertions.assertThatThrownBy(() -> {
            Awaitility.await().atMost(Durations.FIVE_HUNDRED_MILLISECONDS).untilAtomic(atomicLong, CoreMatchers.equalTo(3L));
        }).isInstanceOf(ConditionTimeoutException.class);
        Assertions.assertThatCode(() -> {
            Awaitility.await().atMost(Durations.TWO_SECONDS).untilAtomic(atomicLong, CoreMatchers.equalTo(3L));
        }).doesNotThrowAnyException();
    }
}
