package org.neo4j.bolt.runtime;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.neo4j.bolt.BoltKernelExtension;
import org.neo4j.bolt.testing.Jobs;
import org.neo4j.function.Predicates;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.logging.SimpleLogService;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.NullLog;
import org.neo4j.scheduler.JobScheduler;
import org.neo4j.test.matchers.CommonMatchers;

/* loaded from: input_file:org/neo4j/bolt/runtime/ExecutorBoltSchedulerTest.class */
public class ExecutorBoltSchedulerTest {
    private static final String CONNECTOR_KEY = "connector-id";
    private final AssertableLogProvider logProvider = new AssertableLogProvider();
    private final LogService logService = new SimpleLogService(this.logProvider);
    private final Config config = Config.defaults();
    private final ExecutorFactory executorFactory = new CachedThreadPoolExecutorFactory(NullLog.getInstance());
    private final JobScheduler jobScheduler = (JobScheduler) Mockito.mock(JobScheduler.class);
    private final ExecutorBoltScheduler boltScheduler = new ExecutorBoltScheduler(CONNECTOR_KEY, this.executorFactory, this.jobScheduler, this.logService, 0, 10, Duration.ofMinutes(1), 0, ForkJoinPool.commonPool());

    @Before
    public void setup() {
        Mockito.when(this.jobScheduler.threadFactory((JobScheduler.Group) ArgumentMatchers.any())).thenReturn(Executors.defaultThreadFactory());
    }

    @After
    public void cleanup() throws Throwable {
        this.boltScheduler.stop();
    }

    @Test
    public void initShouldCreateThreadPool() throws Throwable {
        ExecutorFactory executorFactory = (ExecutorFactory) Mockito.mock(ExecutorFactory.class);
        Mockito.when(executorFactory.create(ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), (Duration) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), (ThreadFactory) ArgumentMatchers.any())).thenReturn(Executors.newCachedThreadPool());
        new ExecutorBoltScheduler(CONNECTOR_KEY, executorFactory, this.jobScheduler, this.logService, 0, 10, Duration.ofMinutes(1L), 0, ForkJoinPool.commonPool()).start();
        ((JobScheduler) Mockito.verify(this.jobScheduler)).threadFactory(JobScheduler.Groups.boltWorker);
        ((ExecutorFactory) Mockito.verify(executorFactory, Mockito.times(1))).create(ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), (Duration) ArgumentMatchers.any(Duration.class), ArgumentMatchers.anyInt(), (ThreadFactory) ArgumentMatchers.any(ThreadFactory.class));
    }

    @Test
    public void shutdownShouldTerminateThreadPool() throws Throwable {
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        ExecutorFactory executorFactory = (ExecutorFactory) Mockito.mock(ExecutorFactory.class);
        Mockito.when(executorFactory.create(ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), (Duration) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), (ThreadFactory) ArgumentMatchers.any())).thenReturn(newCachedThreadPool);
        ExecutorBoltScheduler executorBoltScheduler = new ExecutorBoltScheduler(CONNECTOR_KEY, executorFactory, this.jobScheduler, this.logService, 0, 10, Duration.ofMinutes(1L), 0, ForkJoinPool.commonPool());
        executorBoltScheduler.start();
        executorBoltScheduler.stop();
        Assert.assertTrue(newCachedThreadPool.isShutdown());
    }

    @Test
    public void createdShouldAddConnectionToActiveConnections() throws Throwable {
        BoltConnection newConnection = newConnection(UUID.randomUUID().toString());
        this.boltScheduler.start();
        this.boltScheduler.created(newConnection);
        ((BoltConnection) Mockito.verify(newConnection)).id();
        Assert.assertTrue(this.boltScheduler.isRegistered(newConnection));
    }

    @Test
    public void destroyedShouldRemoveConnectionFromActiveConnections() throws Throwable {
        BoltConnection newConnection = newConnection(UUID.randomUUID().toString());
        this.boltScheduler.start();
        this.boltScheduler.created(newConnection);
        this.boltScheduler.closed(newConnection);
        Assert.assertFalse(this.boltScheduler.isRegistered(newConnection));
    }

    @Test
    public void enqueuedShouldScheduleJob() throws Throwable {
        String uuid = UUID.randomUUID().toString();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        BoltConnection newConnection = newConnection(uuid);
        Mockito.when(Boolean.valueOf(newConnection.processNextBatch())).thenAnswer(invocationOnMock -> {
            return Boolean.valueOf(awaitExit(atomicBoolean));
        });
        this.boltScheduler.start();
        this.boltScheduler.created(newConnection);
        this.boltScheduler.enqueued(newConnection, Jobs.noop());
        Predicates.await(() -> {
            return this.boltScheduler.isActive(newConnection);
        }, 1L, TimeUnit.MINUTES);
        atomicBoolean.set(true);
        Predicates.await(() -> {
            return !this.boltScheduler.isActive(newConnection);
        }, 1L, TimeUnit.MINUTES);
        ((BoltConnection) Mockito.verify(newConnection)).processNextBatch();
    }

    @Test
    public void enqueuedShouldNotScheduleJobWhenActiveWorkItemExists() throws Throwable {
        BoltConnection newConnection = newConnection(UUID.randomUUID().toString());
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        Mockito.when(Boolean.valueOf(newConnection.processNextBatch())).thenAnswer(invocationOnMock -> {
            return Boolean.valueOf(awaitExit(atomicBoolean));
        });
        this.boltScheduler.start();
        this.boltScheduler.created(newConnection);
        this.boltScheduler.enqueued(newConnection, Jobs.noop());
        Predicates.await(() -> {
            return this.boltScheduler.isActive(newConnection);
        }, 1L, TimeUnit.MINUTES);
        this.boltScheduler.enqueued(newConnection, Jobs.noop());
        atomicBoolean.set(true);
        Predicates.await(() -> {
            return !this.boltScheduler.isActive(newConnection);
        }, 1L, TimeUnit.MINUTES);
        ((BoltConnection) Mockito.verify(newConnection)).processNextBatch();
    }

    @Test
    public void failingJobShouldLogAndStopConnection() throws Throwable {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        BoltConnection newConnection = newConnection(UUID.randomUUID().toString());
        ((BoltConnection) Mockito.doThrow(new Throwable[]{new RuntimeException("some unexpected error")}).when(newConnection)).processNextBatch();
        ((BoltConnection) Mockito.doAnswer(invocationOnMock -> {
            return Boolean.valueOf(atomicBoolean.getAndSet(true));
        }).when(newConnection)).stop();
        this.boltScheduler.start();
        this.boltScheduler.created(newConnection);
        this.boltScheduler.enqueued(newConnection, Jobs.noop());
        Predicates.await(() -> {
            return atomicBoolean.get();
        }, 1L, TimeUnit.MINUTES);
        Assert.assertFalse(this.boltScheduler.isActive(newConnection));
        ((BoltConnection) Mockito.verify(newConnection)).processNextBatch();
        ((BoltConnection) Mockito.verify(newConnection)).stop();
        this.logProvider.assertExactly(new AssertableLogProvider.LogMatcher[]{AssertableLogProvider.inLog(Matchers.containsString(BoltKernelExtension.class.getPackage().getName())).error(Matchers.containsString("Unexpected error during job scheduling for session"), CommonMatchers.matchesExceptionMessage(Matchers.containsString("some unexpected error")))});
    }

    @Test
    public void successfulJobsShouldTriggerSchedulingOfPendingJobs() throws Throwable {
        AtomicInteger atomicInteger = new AtomicInteger();
        BoltConnection newConnection = newConnection(UUID.randomUUID().toString());
        Mockito.when(Boolean.valueOf(newConnection.processNextBatch())).thenAnswer(invocationOnMock -> {
            return Boolean.valueOf(atomicInteger.incrementAndGet() > 0);
        });
        Mockito.when(Boolean.valueOf(newConnection.hasPendingJobs())).thenReturn(true).thenReturn(false);
        this.boltScheduler.start();
        this.boltScheduler.created(newConnection);
        this.boltScheduler.enqueued(newConnection, Jobs.noop());
        Predicates.await(() -> {
            return atomicInteger.get() > 1;
        }, 1L, TimeUnit.MINUTES);
        ((BoltConnection) Mockito.verify(newConnection, Mockito.times(2))).processNextBatch();
    }

    @Test
    public void destroyedShouldCancelActiveWorkItem() throws Throwable {
        AtomicInteger atomicInteger = new AtomicInteger();
        BoltConnection newConnection = newConnection(UUID.randomUUID().toString());
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        Mockito.when(Boolean.valueOf(newConnection.processNextBatch())).thenAnswer(invocationOnMock -> {
            atomicInteger.incrementAndGet();
            return Boolean.valueOf(awaitExit(atomicBoolean));
        });
        this.boltScheduler.start();
        this.boltScheduler.created(newConnection);
        this.boltScheduler.enqueued(newConnection, Jobs.noop());
        Predicates.await(() -> {
            return atomicInteger.get() > 0;
        }, 1L, TimeUnit.MINUTES);
        this.boltScheduler.closed(newConnection);
        Predicates.await(() -> {
            return !this.boltScheduler.isActive(newConnection);
        }, 1L, TimeUnit.MINUTES);
        Assert.assertFalse(this.boltScheduler.isActive(newConnection));
        Assert.assertEquals(1L, atomicInteger.get());
        atomicBoolean.set(true);
    }

    @Test
    public void createdWorkerThreadsShouldContainConnectorName() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        BoltConnection newConnection = newConnection(UUID.randomUUID().toString());
        Mockito.when(Boolean.valueOf(newConnection.processNextBatch())).thenAnswer(invocationOnMock -> {
            atomicReference.set(Thread.currentThread());
            atomicReference2.set(Thread.currentThread().getName());
            atomicInteger.incrementAndGet();
            return true;
        });
        this.boltScheduler.start();
        this.boltScheduler.created(newConnection);
        this.boltScheduler.enqueued(newConnection, Jobs.noop());
        Predicates.await(() -> {
            return atomicInteger.get() > 0;
        }, 1L, TimeUnit.MINUTES);
        Assert.assertThat(((Thread) atomicReference.get()).getName(), Matchers.not(Matchers.equalTo(atomicReference2.get())));
        Assert.assertThat(((Thread) atomicReference.get()).getName(), Matchers.containsString(String.format("[%s]", CONNECTOR_KEY)));
        Assert.assertThat(((Thread) atomicReference.get()).getName(), Matchers.not(Matchers.containsString(String.format("[%s]", newConnection.remoteAddress()))));
    }

    @Test
    public void createdWorkerThreadsShouldContainConnectorNameAndRemoteAddressInTheirNamesWhenActive() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        AtomicInteger atomicInteger = new AtomicInteger();
        BoltConnection newConnection = newConnection(UUID.randomUUID().toString());
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        Mockito.when(Boolean.valueOf(newConnection.processNextBatch())).thenAnswer(invocationOnMock -> {
            atomicReference.set(Thread.currentThread().getName());
            atomicInteger.incrementAndGet();
            return Boolean.valueOf(awaitExit(atomicBoolean));
        });
        this.boltScheduler.start();
        this.boltScheduler.created(newConnection);
        this.boltScheduler.enqueued(newConnection, Jobs.noop());
        Predicates.await(() -> {
            return atomicInteger.get() > 0;
        }, 1L, TimeUnit.MINUTES);
        Assert.assertThat(atomicReference.get(), Matchers.containsString(String.format("[%s]", CONNECTOR_KEY)));
        Assert.assertThat(atomicReference.get(), Matchers.containsString(String.format("[%s]", newConnection.remoteAddress())));
        atomicBoolean.set(true);
    }

    private BoltConnection newConnection(String str) {
        BoltConnection boltConnection = (BoltConnection) Mockito.mock(BoltConnection.class);
        Mockito.when(boltConnection.id()).thenReturn(str);
        Mockito.when(boltConnection.remoteAddress()).thenReturn(new InetSocketAddress("localhost", 32000));
        return boltConnection;
    }

    private static boolean awaitExit(AtomicBoolean atomicBoolean) {
        Predicates.awaitForever(() -> {
            return Thread.currentThread().isInterrupted() || atomicBoolean.get();
        }, 500L, TimeUnit.MILLISECONDS);
        return true;
    }
}
