package org.neo4j.bolt.runtime.scheduling;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.neo4j.bolt.runtime.BoltConnection;
import org.neo4j.bolt.testing.Jobs;
import org.neo4j.function.Predicates;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.LogAssertions;
import org.neo4j.logging.internal.LogService;
import org.neo4j.logging.internal.SimpleLogService;
import org.neo4j.scheduler.Group;
import org.neo4j.scheduler.JobScheduler;

/* loaded from: input_file:org/neo4j/bolt/runtime/scheduling/ExecutorBoltSchedulerTest.class */
class ExecutorBoltSchedulerTest {
    private static final String CONNECTOR_KEY = "connector-id";
    private final AssertableLogProvider logProvider = new AssertableLogProvider();
    private final LogService logService = new SimpleLogService(this.logProvider);
    private final ExecutorFactory executorFactory = new CachedThreadPoolExecutorFactory();
    private final JobScheduler jobScheduler = (JobScheduler) Mockito.mock(JobScheduler.class);
    private final ExecutorBoltScheduler boltScheduler = new ExecutorBoltScheduler(CONNECTOR_KEY, this.executorFactory, this.jobScheduler, this.logService, 0, 10, Duration.ofMinutes(1), 0, ForkJoinPool.commonPool(), Duration.ZERO, Duration.ZERO);

    ExecutorBoltSchedulerTest() {
    }

    @BeforeEach
    void setup() {
        Mockito.when(this.jobScheduler.threadFactory((Group) ArgumentMatchers.any())).thenReturn(Executors.defaultThreadFactory());
    }

    @AfterEach
    void cleanup() throws Throwable {
        this.boltScheduler.stop();
        this.boltScheduler.shutdown();
    }

    @Test
    void shouldScheduleKeepAliveService() throws Throwable {
        String uuid = UUID.randomUUID().toString();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        BoltConnection newConnection = newConnection(uuid);
        Mockito.when(Boolean.valueOf(newConnection.processNextBatch())).thenAnswer(invocationOnMock -> {
            return Boolean.valueOf(awaitExit(atomicBoolean));
        });
        AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        ((BoltConnection) Mockito.doAnswer(invocationOnMock2 -> {
            atomicBoolean2.set(true);
            return null;
        }).when(newConnection)).keepAlive();
        ExecutorBoltScheduler executorBoltScheduler = new ExecutorBoltScheduler(CONNECTOR_KEY, this.executorFactory, this.jobScheduler, this.logService, 0, 10, Duration.ofMinutes(1L), 0, ForkJoinPool.commonPool(), Duration.ZERO, Duration.ofMillis(10L));
        executorBoltScheduler.init();
        executorBoltScheduler.created(newConnection);
        executorBoltScheduler.start();
        executorBoltScheduler.enqueued(newConnection, Jobs.noop());
        Predicates.await(() -> {
            return executorBoltScheduler.isActive(newConnection);
        }, 1L, TimeUnit.MINUTES);
        Objects.requireNonNull(atomicBoolean2);
        Predicates.await(atomicBoolean2::get, 1L, TimeUnit.MINUTES);
        ((BoltConnection) Mockito.verify(newConnection)).initKeepAliveTimer();
        ((BoltConnection) Mockito.verify(newConnection, Mockito.atLeastOnce())).keepAlive();
        atomicBoolean.set(true);
        Predicates.await(() -> {
            return !executorBoltScheduler.isActive(newConnection);
        }, 1L, TimeUnit.MINUTES);
        executorBoltScheduler.stop();
        executorBoltScheduler.shutdown();
    }

    @Test
    void nonPositiveScheduleIntervalShouldTurnOffKeepAliveService() throws Throwable {
        String uuid = UUID.randomUUID().toString();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        BoltConnection newConnection = newConnection(uuid);
        Mockito.when(Boolean.valueOf(newConnection.processNextBatch())).thenAnswer(invocationOnMock -> {
            return Boolean.valueOf(awaitExit(atomicBoolean));
        });
        this.boltScheduler.init();
        this.boltScheduler.created(newConnection);
        this.boltScheduler.start();
        this.boltScheduler.enqueued(newConnection, Jobs.noop());
        Predicates.await(() -> {
            return this.boltScheduler.isActive(newConnection);
        }, 1L, TimeUnit.MINUTES);
        ((BoltConnection) Mockito.verify(newConnection, Mockito.never())).initKeepAliveTimer();
        ((BoltConnection) Mockito.verify(newConnection, Mockito.never())).keepAlive();
        atomicBoolean.set(true);
        Predicates.await(() -> {
            return !this.boltScheduler.isActive(newConnection);
        }, 1L, TimeUnit.MINUTES);
    }

    @Test
    void initShouldCreateThreadPool() throws Throwable {
        ExecutorFactory executorFactory = (ExecutorFactory) Mockito.mock(ExecutorFactory.class);
        Mockito.when(executorFactory.create(ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), (Duration) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyBoolean(), (ThreadFactory) ArgumentMatchers.any())).thenReturn(Executors.newCachedThreadPool());
        new ExecutorBoltScheduler(CONNECTOR_KEY, executorFactory, this.jobScheduler, this.logService, 0, 10, Duration.ofMinutes(1L), 0, ForkJoinPool.commonPool(), Duration.ZERO, Duration.ZERO).init();
        ((JobScheduler) Mockito.verify(this.jobScheduler)).threadFactory(Group.BOLT_WORKER);
        ((ExecutorFactory) Mockito.verify(executorFactory)).create(ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), (Duration) ArgumentMatchers.any(Duration.class), ArgumentMatchers.anyInt(), ArgumentMatchers.anyBoolean(), (ThreadFactory) ArgumentMatchers.any(ThreadFactory.class));
    }

    @Test
    void shutdownShouldTerminateThreadPool() throws Throwable {
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        ExecutorFactory executorFactory = (ExecutorFactory) Mockito.mock(ExecutorFactory.class);
        Mockito.when(executorFactory.create(ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), (Duration) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyBoolean(), (ThreadFactory) ArgumentMatchers.any())).thenReturn(newCachedThreadPool);
        ExecutorBoltScheduler executorBoltScheduler = new ExecutorBoltScheduler(CONNECTOR_KEY, executorFactory, this.jobScheduler, this.logService, 0, 10, Duration.ofMinutes(1L), 0, ForkJoinPool.commonPool(), Duration.ZERO, Duration.ZERO);
        executorBoltScheduler.init();
        executorBoltScheduler.shutdown();
        Assertions.assertTrue(newCachedThreadPool.isShutdown());
    }

    @Test
    void createdShouldAddConnectionToActiveConnections() throws Throwable {
        BoltConnection newConnection = newConnection(UUID.randomUUID().toString());
        this.boltScheduler.init();
        this.boltScheduler.start();
        this.boltScheduler.created(newConnection);
        ((BoltConnection) Mockito.verify(newConnection)).id();
        Assertions.assertTrue(this.boltScheduler.isRegistered(newConnection));
    }

    @Test
    void destroyedShouldRemoveConnectionFromActiveConnections() throws Throwable {
        BoltConnection newConnection = newConnection(UUID.randomUUID().toString());
        this.boltScheduler.init();
        this.boltScheduler.start();
        this.boltScheduler.created(newConnection);
        this.boltScheduler.closed(newConnection);
        Assertions.assertFalse(this.boltScheduler.isRegistered(newConnection));
    }

    @Test
    void enqueuedShouldScheduleJob() throws Throwable {
        String uuid = UUID.randomUUID().toString();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        BoltConnection newConnection = newConnection(uuid);
        Mockito.when(Boolean.valueOf(newConnection.processNextBatch())).thenAnswer(invocationOnMock -> {
            return Boolean.valueOf(awaitExit(atomicBoolean));
        });
        this.boltScheduler.init();
        this.boltScheduler.start();
        this.boltScheduler.created(newConnection);
        this.boltScheduler.enqueued(newConnection, Jobs.noop());
        Predicates.await(() -> {
            return this.boltScheduler.isActive(newConnection);
        }, 1L, TimeUnit.MINUTES);
        atomicBoolean.set(true);
        Predicates.await(() -> {
            return !this.boltScheduler.isActive(newConnection);
        }, 1L, TimeUnit.MINUTES);
        ((BoltConnection) Mockito.verify(newConnection)).processNextBatch();
    }

    @Test
    void enqueuedShouldNotScheduleJobWhenActiveWorkItemExists() throws Throwable {
        BoltConnection newConnection = newConnection(UUID.randomUUID().toString());
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        Mockito.when(Boolean.valueOf(newConnection.processNextBatch())).thenAnswer(invocationOnMock -> {
            return Boolean.valueOf(awaitExit(atomicBoolean));
        });
        this.boltScheduler.init();
        this.boltScheduler.start();
        this.boltScheduler.created(newConnection);
        this.boltScheduler.enqueued(newConnection, Jobs.noop());
        Predicates.await(() -> {
            return this.boltScheduler.isActive(newConnection);
        }, 1L, TimeUnit.MINUTES);
        this.boltScheduler.enqueued(newConnection, Jobs.noop());
        atomicBoolean.set(true);
        Predicates.await(() -> {
            return !this.boltScheduler.isActive(newConnection);
        }, 1L, TimeUnit.MINUTES);
        ((BoltConnection) Mockito.verify(newConnection)).processNextBatch();
    }

    @Test
    void failingJobShouldLogAndStopConnection() throws Throwable {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        BoltConnection newConnection = newConnection(UUID.randomUUID().toString());
        RuntimeException runtimeException = new RuntimeException("some unexpected error");
        ((BoltConnection) Mockito.doThrow(new Throwable[]{runtimeException}).when(newConnection)).processNextBatch();
        ((BoltConnection) Mockito.doAnswer(invocationOnMock -> {
            return Boolean.valueOf(atomicBoolean.getAndSet(true));
        }).when(newConnection)).stop();
        this.boltScheduler.init();
        this.boltScheduler.start();
        this.boltScheduler.created(newConnection);
        this.boltScheduler.enqueued(newConnection, Jobs.noop());
        Objects.requireNonNull(atomicBoolean);
        Predicates.await(atomicBoolean::get, 1L, TimeUnit.MINUTES);
        Assertions.assertFalse(this.boltScheduler.isActive(newConnection));
        ((BoltConnection) Mockito.verify(newConnection)).processNextBatch();
        ((BoltConnection) Mockito.verify(newConnection)).stop();
        LogAssertions.assertThat(this.logProvider).forClass(ExecutorBoltScheduler.class).forLevel(AssertableLogProvider.Level.ERROR).assertExceptionForLogMessage("Unexpected error during job scheduling for session").hasCause(runtimeException);
    }

    @Test
    void successfulJobsShouldTriggerSchedulingOfPendingJobs() throws Throwable {
        AtomicInteger atomicInteger = new AtomicInteger();
        BoltConnection newConnection = newConnection(UUID.randomUUID().toString());
        Mockito.when(Boolean.valueOf(newConnection.processNextBatch())).thenAnswer(invocationOnMock -> {
            return Boolean.valueOf(atomicInteger.incrementAndGet() > 0);
        });
        Mockito.when(Boolean.valueOf(newConnection.hasPendingJobs())).thenReturn(true).thenReturn(false);
        this.boltScheduler.init();
        this.boltScheduler.start();
        this.boltScheduler.created(newConnection);
        this.boltScheduler.enqueued(newConnection, Jobs.noop());
        Predicates.await(() -> {
            return atomicInteger.get() > 1;
        }, 1L, TimeUnit.MINUTES);
        ((BoltConnection) Mockito.verify(newConnection, Mockito.times(2))).processNextBatch();
    }

    @Test
    void destroyedShouldCancelActiveWorkItem() throws Throwable {
        AtomicInteger atomicInteger = new AtomicInteger();
        BoltConnection newConnection = newConnection(UUID.randomUUID().toString());
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        Mockito.when(Boolean.valueOf(newConnection.processNextBatch())).thenAnswer(invocationOnMock -> {
            atomicInteger.incrementAndGet();
            return Boolean.valueOf(awaitExit(atomicBoolean));
        });
        this.boltScheduler.init();
        this.boltScheduler.start();
        this.boltScheduler.created(newConnection);
        this.boltScheduler.enqueued(newConnection, Jobs.noop());
        Predicates.await(() -> {
            return atomicInteger.get() > 0;
        }, 1L, TimeUnit.MINUTES);
        this.boltScheduler.closed(newConnection);
        Predicates.await(() -> {
            return !this.boltScheduler.isActive(newConnection);
        }, 1L, TimeUnit.MINUTES);
        Assertions.assertFalse(this.boltScheduler.isActive(newConnection));
        Assertions.assertEquals(1, atomicInteger.get());
        atomicBoolean.set(true);
    }

    @Test
    void createdWorkerThreadsShouldContainConnectorName() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        BoltConnection newConnection = newConnection(UUID.randomUUID().toString());
        Mockito.when(Boolean.valueOf(newConnection.hasPendingJobs())).thenAnswer(invocationOnMock -> {
            atomicInteger.incrementAndGet();
            return false;
        });
        Mockito.when(Boolean.valueOf(newConnection.processNextBatch())).thenAnswer(invocationOnMock2 -> {
            atomicReference.set(Thread.currentThread());
            atomicReference2.set(Thread.currentThread().getName());
            return true;
        });
        this.boltScheduler.init();
        this.boltScheduler.start();
        this.boltScheduler.created(newConnection);
        this.boltScheduler.enqueued(newConnection, Jobs.noop());
        Predicates.await(() -> {
            return atomicInteger.get() > 0;
        }, 1L, TimeUnit.MINUTES);
        org.assertj.core.api.Assertions.assertThat(((Thread) atomicReference.get()).getName()).isNotEqualTo(atomicReference2.get());
        org.assertj.core.api.Assertions.assertThat(((Thread) atomicReference.get()).getName()).contains(new CharSequence[]{String.format("[%s]", CONNECTOR_KEY)});
        org.assertj.core.api.Assertions.assertThat(((Thread) atomicReference.get()).getName()).doesNotContain(new CharSequence[]{String.format("[%s]", newConnection.remoteAddress())});
    }

    @Test
    void createdWorkerThreadsShouldContainConnectorNameAndRemoteAddressInTheirNamesWhenActive() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        AtomicInteger atomicInteger = new AtomicInteger();
        BoltConnection newConnection = newConnection(UUID.randomUUID().toString());
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        Mockito.when(Boolean.valueOf(newConnection.processNextBatch())).thenAnswer(invocationOnMock -> {
            atomicReference.set(Thread.currentThread().getName());
            atomicInteger.incrementAndGet();
            return Boolean.valueOf(awaitExit(atomicBoolean));
        });
        this.boltScheduler.init();
        this.boltScheduler.start();
        this.boltScheduler.created(newConnection);
        this.boltScheduler.enqueued(newConnection, Jobs.noop());
        Predicates.await(() -> {
            return atomicInteger.get() > 0;
        }, 1L, TimeUnit.MINUTES);
        org.assertj.core.api.Assertions.assertThat((String) atomicReference.get()).contains(new CharSequence[]{String.format("[%s]", CONNECTOR_KEY)});
        org.assertj.core.api.Assertions.assertThat((String) atomicReference.get()).contains(new CharSequence[]{String.format("[%s]", newConnection.remoteAddress())});
        atomicBoolean.set(true);
    }

    @Test
    void stopShouldStopIdleConnections() throws Exception {
        this.boltScheduler.init();
        this.boltScheduler.start();
        BoltConnection newConnection = newConnection(this.boltScheduler, true);
        BoltConnection newConnection2 = newConnection(this.boltScheduler, false);
        BoltConnection newConnection3 = newConnection(this.boltScheduler, true);
        this.boltScheduler.stop();
        ((BoltConnection) Mockito.verify(newConnection, Mockito.times(1))).stop();
        ((BoltConnection) Mockito.verify(newConnection2, Mockito.never())).stop();
        ((BoltConnection) Mockito.verify(newConnection3, Mockito.times(1))).stop();
    }

    @Test
    void shutdownShouldStopAllConnections() throws Exception {
        this.boltScheduler.init();
        this.boltScheduler.start();
        BoltConnection newConnection = newConnection(this.boltScheduler, true);
        BoltConnection newConnection2 = newConnection(this.boltScheduler, false);
        BoltConnection newConnection3 = newConnection(this.boltScheduler, true);
        this.boltScheduler.shutdown();
        ((BoltConnection) Mockito.verify(newConnection, Mockito.times(1))).stop();
        ((BoltConnection) Mockito.verify(newConnection2, Mockito.times(1))).stop();
        ((BoltConnection) Mockito.verify(newConnection3, Mockito.times(1))).stop();
    }

    private static BoltConnection newConnection(String str) {
        BoltConnection boltConnection = (BoltConnection) Mockito.mock(BoltConnection.class);
        Mockito.when(boltConnection.id()).thenReturn(str);
        Mockito.when(boltConnection.remoteAddress()).thenReturn(new InetSocketAddress("localhost", 32000));
        return boltConnection;
    }

    private static BoltConnection newConnection(ExecutorBoltScheduler executorBoltScheduler, boolean z) {
        BoltConnection newConnection = newConnection(UUID.randomUUID().toString());
        executorBoltScheduler.created(newConnection);
        Mockito.when(Boolean.valueOf(newConnection.idle())).thenReturn(Boolean.valueOf(z));
        return newConnection;
    }

    private static boolean awaitExit(AtomicBoolean atomicBoolean) {
        Predicates.awaitForever(() -> {
            return Thread.currentThread().isInterrupted() || atomicBoolean.get();
        }, 500L, TimeUnit.MILLISECONDS);
        return true;
    }
}
