package org.neo4j.causalclustering.core;

import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Function;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.neo4j.causalclustering.core.consensus.ContinuousJob;
import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.consensus.ReplicatedString;
import org.neo4j.causalclustering.core.replication.ReplicatedContent;
import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.messaging.LifecycleMessageHandler;
import org.neo4j.causalclustering.messaging.Message;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.NullLogProvider;

/* loaded from: input_file:org/neo4j/causalclustering/core/BatchingMessageHandlerTest.class */
public class BatchingMessageHandlerTest {
    private static final int MAX_BATCH = 16;
    private static final int QUEUE_SIZE = 64;
    private final Instant now = Instant.now();
    private LifecycleMessageHandler<RaftMessages.ReceivedInstantClusterIdAwareMessage<?>> downstreamHandler = (LifecycleMessageHandler) Mockito.mock(LifecycleMessageHandler.class);
    private ClusterId localClusterId = new ClusterId(UUID.randomUUID());
    private ContinuousJob mockJob = (ContinuousJob) Mockito.mock(ContinuousJob.class);
    private Function<Runnable, ContinuousJob> jobSchedulerFactory = runnable -> {
        return this.mockJob;
    };

    @Test
    public void shouldInvokeInnerHandlerWhenRun() {
        BatchingMessageHandler batchingMessageHandler = new BatchingMessageHandler(this.downstreamHandler, QUEUE_SIZE, MAX_BATCH, this.jobSchedulerFactory, NullLogProvider.getInstance());
        RaftMessages.ReceivedInstantClusterIdAwareMessage of = RaftMessages.ReceivedInstantClusterIdAwareMessage.of(this.now, this.localClusterId, new RaftMessages.NewEntry.Request((MemberId) null, (ReplicatedContent) null));
        batchingMessageHandler.handle(of);
        Mockito.verifyZeroInteractions(new Object[]{this.downstreamHandler});
        batchingMessageHandler.run();
        ((LifecycleMessageHandler) Mockito.verify(this.downstreamHandler)).handle(of);
    }

    @Test
    public void shouldInvokeHandlerOnQueuedMessage() throws Throwable {
        Runnable batchingMessageHandler = new BatchingMessageHandler(this.downstreamHandler, QUEUE_SIZE, MAX_BATCH, this.jobSchedulerFactory, NullLogProvider.getInstance());
        RaftMessages.ReceivedInstantClusterIdAwareMessage of = RaftMessages.ReceivedInstantClusterIdAwareMessage.of(this.now, this.localClusterId, new RaftMessages.NewEntry.Request((MemberId) null, (ReplicatedContent) null));
        Future<?> submit = Executors.newCachedThreadPool().submit(batchingMessageHandler);
        Thread.sleep(50L);
        batchingMessageHandler.handle(of);
        submit.get();
        ((LifecycleMessageHandler) Mockito.verify(this.downstreamHandler)).handle(of);
    }

    @Test
    public void shouldBatchRequests() {
        BatchingMessageHandler batchingMessageHandler = new BatchingMessageHandler(this.downstreamHandler, QUEUE_SIZE, MAX_BATCH, this.jobSchedulerFactory, NullLogProvider.getInstance());
        ReplicatedString replicatedString = new ReplicatedString("A");
        ReplicatedString replicatedString2 = new ReplicatedString("B");
        RaftMessages.NewEntry.Request request = new RaftMessages.NewEntry.Request((MemberId) null, replicatedString);
        RaftMessages.NewEntry.Request request2 = new RaftMessages.NewEntry.Request((MemberId) null, replicatedString2);
        batchingMessageHandler.handle(RaftMessages.ReceivedInstantClusterIdAwareMessage.of(this.now, this.localClusterId, request));
        batchingMessageHandler.handle(RaftMessages.ReceivedInstantClusterIdAwareMessage.of(this.now, this.localClusterId, request2));
        Mockito.verifyZeroInteractions(new Object[]{this.downstreamHandler});
        batchingMessageHandler.run();
        RaftMessages.NewEntry.BatchRequest batchRequest = new RaftMessages.NewEntry.BatchRequest(2);
        batchRequest.add(replicatedString);
        batchRequest.add(replicatedString2);
        ((LifecycleMessageHandler) Mockito.verify(this.downstreamHandler)).handle(RaftMessages.ReceivedInstantClusterIdAwareMessage.of(this.now, this.localClusterId, batchRequest));
    }

    @Test
    public void shouldBatchUsingReceivedInstantOfFirstReceivedMessage() {
        BatchingMessageHandler batchingMessageHandler = new BatchingMessageHandler(this.downstreamHandler, QUEUE_SIZE, MAX_BATCH, this.jobSchedulerFactory, NullLogProvider.getInstance());
        ReplicatedString replicatedString = new ReplicatedString("A");
        RaftMessages.NewEntry.Request request = new RaftMessages.NewEntry.Request((MemberId) null, replicatedString);
        Instant ofEpochMilli = Instant.ofEpochMilli(1L);
        Instant plusMillis = ofEpochMilli.plusMillis(1L);
        batchingMessageHandler.handle(RaftMessages.ReceivedInstantClusterIdAwareMessage.of(ofEpochMilli, this.localClusterId, request));
        batchingMessageHandler.handle(RaftMessages.ReceivedInstantClusterIdAwareMessage.of(plusMillis, this.localClusterId, request));
        batchingMessageHandler.run();
        RaftMessages.NewEntry.BatchRequest batchRequest = new RaftMessages.NewEntry.BatchRequest(2);
        batchRequest.add(replicatedString);
        batchRequest.add(replicatedString);
        ((LifecycleMessageHandler) Mockito.verify(this.downstreamHandler)).handle(RaftMessages.ReceivedInstantClusterIdAwareMessage.of(ofEpochMilli, this.localClusterId, batchRequest));
    }

    @Test
    public void shouldBatchNewEntriesAndHandleOtherMessagesSingularly() {
        BatchingMessageHandler batchingMessageHandler = new BatchingMessageHandler(this.downstreamHandler, QUEUE_SIZE, MAX_BATCH, this.jobSchedulerFactory, NullLogProvider.getInstance());
        ReplicatedString replicatedString = new ReplicatedString("A");
        ReplicatedString replicatedString2 = new ReplicatedString("C");
        RaftMessages.ReceivedInstantClusterIdAwareMessage of = RaftMessages.ReceivedInstantClusterIdAwareMessage.of(this.now, this.localClusterId, new RaftMessages.NewEntry.Request((MemberId) null, replicatedString));
        RaftMessages.ReceivedInstantClusterIdAwareMessage of2 = RaftMessages.ReceivedInstantClusterIdAwareMessage.of(this.now, this.localClusterId, new RaftMessages.Heartbeat((MemberId) null, 0L, 0L, 0L));
        RaftMessages.ReceivedInstantClusterIdAwareMessage of3 = RaftMessages.ReceivedInstantClusterIdAwareMessage.of(this.now, this.localClusterId, new RaftMessages.NewEntry.Request((MemberId) null, replicatedString2));
        RaftMessages.ReceivedInstantClusterIdAwareMessage of4 = RaftMessages.ReceivedInstantClusterIdAwareMessage.of(this.now, this.localClusterId, new RaftMessages.Heartbeat((MemberId) null, 1L, 1L, 1L));
        batchingMessageHandler.handle(of);
        batchingMessageHandler.handle(of2);
        batchingMessageHandler.handle(of3);
        batchingMessageHandler.handle(of4);
        Mockito.verifyZeroInteractions(new Object[]{this.downstreamHandler});
        batchingMessageHandler.run();
        RaftMessages.NewEntry.BatchRequest batchRequest = new RaftMessages.NewEntry.BatchRequest(2);
        batchRequest.add(replicatedString);
        batchRequest.add(replicatedString2);
        ((LifecycleMessageHandler) Mockito.verify(this.downstreamHandler)).handle(RaftMessages.ReceivedInstantClusterIdAwareMessage.of(this.now, this.localClusterId, batchRequest));
        ((LifecycleMessageHandler) Mockito.verify(this.downstreamHandler)).handle(of2);
        ((LifecycleMessageHandler) Mockito.verify(this.downstreamHandler)).handle(of4);
    }

    @Test
    public void shouldDropMessagesAfterBeingStopped() throws Throwable {
        AssertableLogProvider assertableLogProvider = new AssertableLogProvider();
        BatchingMessageHandler batchingMessageHandler = new BatchingMessageHandler(this.downstreamHandler, QUEUE_SIZE, MAX_BATCH, this.jobSchedulerFactory, assertableLogProvider);
        RaftMessages.ReceivedInstantClusterIdAwareMessage of = RaftMessages.ReceivedInstantClusterIdAwareMessage.of(this.now, this.localClusterId, new RaftMessages.NewEntry.Request((MemberId) null, (ReplicatedContent) null));
        batchingMessageHandler.stop();
        batchingMessageHandler.handle(of);
        batchingMessageHandler.run();
        ((LifecycleMessageHandler) Mockito.verify(this.downstreamHandler, Mockito.never())).handle((Message) ArgumentMatchers.any(RaftMessages.ReceivedInstantClusterIdAwareMessage.class));
        assertableLogProvider.assertAtLeastOnce(new AssertableLogProvider.LogMatcher[]{AssertableLogProvider.inLog(BatchingMessageHandler.class).debug("This handler has been stopped, dropping the message: %s", new Object[]{of})});
    }

    @Test(timeout = 5000)
    public void shouldGiveUpAddingMessagesInTheQueueIfTheHandlerHasBeenStopped() throws Throwable {
        BatchingMessageHandler batchingMessageHandler = new BatchingMessageHandler(this.downstreamHandler, 1, MAX_BATCH, this.jobSchedulerFactory, NullLogProvider.getInstance());
        RaftMessages.ReceivedInstantClusterIdAwareMessage of = RaftMessages.ReceivedInstantClusterIdAwareMessage.of(this.now, this.localClusterId, new RaftMessages.NewEntry.Request((MemberId) null, (ReplicatedContent) null));
        batchingMessageHandler.handle(of);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Thread thread = new Thread(() -> {
            countDownLatch.countDown();
            batchingMessageHandler.handle(of);
        });
        thread.start();
        countDownLatch.await();
        batchingMessageHandler.stop();
        thread.join();
    }

    @Test
    public void shouldDelegateStart() throws Throwable {
        BatchingMessageHandler batchingMessageHandler = new BatchingMessageHandler(this.downstreamHandler, QUEUE_SIZE, MAX_BATCH, this.jobSchedulerFactory, NullLogProvider.getInstance());
        ClusterId clusterId = new ClusterId(UUID.randomUUID());
        batchingMessageHandler.start(clusterId);
        ((LifecycleMessageHandler) Mockito.verify(this.downstreamHandler)).start(clusterId);
    }

    @Test
    public void shouldDelegateStop() throws Throwable {
        new BatchingMessageHandler(this.downstreamHandler, QUEUE_SIZE, MAX_BATCH, this.jobSchedulerFactory, NullLogProvider.getInstance()).stop();
        ((LifecycleMessageHandler) Mockito.verify(this.downstreamHandler)).stop();
    }

    @Test
    public void shouldStartJob() throws Throwable {
        new BatchingMessageHandler(this.downstreamHandler, QUEUE_SIZE, MAX_BATCH, this.jobSchedulerFactory, NullLogProvider.getInstance()).start(new ClusterId(UUID.randomUUID()));
        ((ContinuousJob) Mockito.verify(this.mockJob)).start();
    }

    @Test
    public void shouldStopJob() throws Throwable {
        new BatchingMessageHandler(this.downstreamHandler, QUEUE_SIZE, MAX_BATCH, this.jobSchedulerFactory, NullLogProvider.getInstance()).stop();
        ((ContinuousJob) Mockito.verify(this.mockJob)).stop();
    }
}
