package org.neo4j.bolt.runtime.scheduling;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.neo4j.bolt.runtime.BoltConnection;
import org.neo4j.bolt.testing.Jobs;
import org.neo4j.configuration.connectors.BoltConnector;
import org.neo4j.function.Predicates;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.internal.LogService;
import org.neo4j.logging.internal.SimpleLogService;
import org.neo4j.scheduler.Group;
import org.neo4j.scheduler.JobScheduler;

/* loaded from: input_file:org/neo4j/bolt/runtime/scheduling/ExecutorBoltSchedulerWithQueueTest.class */
public class ExecutorBoltSchedulerWithQueueTest {
    private static final String CONNECTOR_KEY = "connector-id";
    private static final int threadPoolSize = 1;
    private static final int queueSize = 1;
    private final CountDownLatch beforeExecuteEvent = new CountDownLatch(1);
    private final CountDownLatch beforeExecuteBarrier = new CountDownLatch(1);
    private final CountDownLatch afterExecuteEvent = new CountDownLatch(1);
    private final CountDownLatch afterExecuteBarrier = new CountDownLatch(1);
    private final AssertableLogProvider logProvider = new AssertableLogProvider();
    private final LogService logService = new SimpleLogService(this.logProvider, this.logProvider);
    private final ExecutorFactory executorFactory = new NotifyingThreadPoolWithBlockingQueueFactory();
    private final JobScheduler jobScheduler = (JobScheduler) Mockito.mock(JobScheduler.class);
    private final ExecutorBoltScheduler boltSchedulerWithQueue = new ExecutorBoltScheduler(CONNECTOR_KEY, this.executorFactory, this.jobScheduler, this.logService, 1, 1, Duration.ofMinutes(1), 1, ForkJoinPool.commonPool(), Duration.ZERO, BoltConnector.KeepAliveRequestType.OFF, Duration.ZERO);

    /* loaded from: input_file:org/neo4j/bolt/runtime/scheduling/ExecutorBoltSchedulerWithQueueTest$NotifyingThreadPoolExecutor.class */
    private class NotifyingThreadPoolExecutor extends ThreadPoolExecutor {
        private NotifyingThreadPoolExecutor(int i, int i2, Duration duration, BlockingQueue<Runnable> blockingQueue, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
            super(i, i2, duration.toMillis(), TimeUnit.MILLISECONDS, blockingQueue, threadFactory, rejectedExecutionHandler);
        }

        @Override // java.util.concurrent.ThreadPoolExecutor
        protected void beforeExecute(Thread thread, Runnable runnable) {
            try {
                ExecutorBoltSchedulerWithQueueTest.this.beforeExecuteEvent.await();
                super.beforeExecute(thread, runnable);
                ExecutorBoltSchedulerWithQueueTest.this.beforeExecuteBarrier.countDown();
            } catch (Throwable th) {
                throw new RuntimeException(th);
            }
        }

        @Override // java.util.concurrent.ThreadPoolExecutor
        protected void afterExecute(Runnable runnable, Throwable th) {
            try {
                ExecutorBoltSchedulerWithQueueTest.this.afterExecuteEvent.await();
                super.afterExecute(runnable, th);
                ExecutorBoltSchedulerWithQueueTest.this.afterExecuteBarrier.countDown();
            } catch (Throwable th2) {
                throw new RuntimeException(th2);
            }
        }
    }

    /* loaded from: input_file:org/neo4j/bolt/runtime/scheduling/ExecutorBoltSchedulerWithQueueTest$NotifyingThreadPoolWithBlockingQueueFactory.class */
    private class NotifyingThreadPoolWithBlockingQueueFactory implements ExecutorFactory {
        private NotifyingThreadPoolWithBlockingQueueFactory() {
        }

        public ExecutorService create(int i, int i2, Duration duration, int i3, boolean z, ThreadFactory threadFactory) {
            return new NotifyingThreadPoolExecutor(i, i2, duration, new ArrayBlockingQueue(i3), threadFactory, new ThreadPoolExecutor.AbortPolicy());
        }
    }

    @BeforeEach
    void setup() throws Throwable {
        Mockito.when(this.jobScheduler.threadFactory((Group) ArgumentMatchers.any())).thenReturn(Executors.defaultThreadFactory());
        this.boltSchedulerWithQueue.init();
        this.boltSchedulerWithQueue.start();
    }

    @AfterEach
    void cleanup() throws Throwable {
        this.boltSchedulerWithQueue.stop();
        this.boltSchedulerWithQueue.shutdown();
    }

    @Test
    void shouldInvokeHandleSchedulingErrorIfNoThreadsAvailableAndFullQueue() throws Throwable {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        BoltConnection newConnection = newConnection(UUID.randomUUID().toString());
        ((BoltConnection) Mockito.doAnswer(newCountingAnswer(atomicInteger)).when(newConnection)).handleSchedulingError((Throwable) ArgumentMatchers.any());
        submitWork(2);
        this.boltSchedulerWithQueue.created(newConnection);
        CompletableFuture.runAsync(() -> {
            this.boltSchedulerWithQueue.enqueued(newConnection, Jobs.noop());
        });
        Predicates.awaitForever(() -> {
            return atomicInteger.get() > 0;
        }, 500L, TimeUnit.MILLISECONDS);
        Assertions.assertEquals(1, atomicInteger.get());
        this.afterExecuteEvent.countDown();
        this.afterExecuteBarrier.await();
    }

    @Test
    void shouldStartQueueingWorkIfThreadPoolIsOccupied() throws Throwable {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        BoltConnection newConnection = newConnection(UUID.randomUUID().toString());
        ((BoltConnection) Mockito.doAnswer(newCountingAnswer(atomicInteger)).when(newConnection)).handleSchedulingError((Throwable) ArgumentMatchers.any());
        submitWork(1);
        this.boltSchedulerWithQueue.created(newConnection);
        Assertions.assertDoesNotThrow(() -> {
            this.boltSchedulerWithQueue.enqueued(newConnection, Jobs.noop());
        });
        Assertions.assertEquals(0, atomicInteger.get());
        this.afterExecuteEvent.countDown();
        this.afterExecuteBarrier.await();
    }

    @Test
    void shouldNotScheduleNewJobIfHandlingSchedulingError() throws Throwable {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        BoltConnection newConnection = newConnection(UUID.randomUUID().toString());
        ((BoltConnection) Mockito.doAnswer(newBlockingAnswer(atomicInteger, atomicBoolean)).when(newConnection)).handleSchedulingError((Throwable) ArgumentMatchers.any());
        submitWork(2);
        this.boltSchedulerWithQueue.created(newConnection);
        CompletableFuture.runAsync(() -> {
            this.boltSchedulerWithQueue.enqueued(newConnection, Jobs.noop());
        });
        Predicates.awaitForever(() -> {
            return atomicInteger.get() > 0;
        }, 500L, TimeUnit.MILLISECONDS);
        this.afterExecuteEvent.countDown();
        this.afterExecuteBarrier.await();
        this.boltSchedulerWithQueue.enqueued(newConnection, Jobs.noop());
        atomicBoolean.set(true);
        Assertions.assertEquals(1, atomicInteger.get());
        ((BoltConnection) Mockito.verify(newConnection, Mockito.never())).processNextBatch();
    }

    private void submitWork(int i) throws InterruptedException {
        for (int i2 = 0; i2 < i; i2++) {
            BoltConnection newConnection = newConnection(UUID.randomUUID().toString());
            this.boltSchedulerWithQueue.created(newConnection);
            this.boltSchedulerWithQueue.enqueued(newConnection, Jobs.noop());
        }
        this.beforeExecuteEvent.countDown();
        this.beforeExecuteBarrier.await();
    }

    private <T> Answer<T> newCountingAnswer(AtomicInteger atomicInteger) {
        return invocationOnMock -> {
            atomicInteger.incrementAndGet();
            return null;
        };
    }

    private <T> Answer<T> newBlockingAnswer(AtomicInteger atomicInteger, AtomicBoolean atomicBoolean) {
        return invocationOnMock -> {
            atomicInteger.incrementAndGet();
            Predicates.awaitForever(() -> {
                return Thread.currentThread().isInterrupted() || atomicBoolean.get();
            }, 500L, TimeUnit.MILLISECONDS);
            return null;
        };
    }

    private BoltConnection newConnection(String str) {
        BoltConnection boltConnection = (BoltConnection) Mockito.mock(BoltConnection.class);
        Mockito.when(boltConnection.id()).thenReturn(str);
        Mockito.when(boltConnection.remoteAddress()).thenReturn(new InetSocketAddress("localhost", 32000));
        return boltConnection;
    }
}
