package org.neo4j.bolt.runtime.scheduling;

import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.neo4j.bolt.runtime.BoltConnection;
import org.neo4j.bolt.runtime.Job;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.internal.LogService;
import org.neo4j.scheduler.Group;
import org.neo4j.scheduler.JobScheduler;

/* loaded from: input_file:org/neo4j/bolt/runtime/scheduling/ExecutorBoltScheduler.class */
public class ExecutorBoltScheduler extends LifecycleAdapter implements BoltScheduler {
    private final String connector;
    private final ExecutorFactory executorFactory;
    private final JobScheduler scheduler;
    private final Log log;
    private final ConcurrentHashMap<String, BoltConnection> activeConnections = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, CompletableFuture<Boolean>> activeWorkItems = new ConcurrentHashMap<>();
    private final int corePoolSize;
    private final int maxPoolSize;
    private final Duration keepAlive;
    private final int queueSize;
    private final ExecutorService forkJoinPool;
    private final Duration shutdownWaitTime;
    private ExecutorService threadPool;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/neo4j/bolt/runtime/scheduling/ExecutorBoltScheduler$NameAppendingThreadFactory.class */
    private static class NameAppendingThreadFactory implements ThreadFactory {
        private final String nameToAppend;
        private final ThreadFactory factory;

        private NameAppendingThreadFactory(String str, ThreadFactory threadFactory) {
            this.nameToAppend = str;
            this.factory = threadFactory;
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread newThread = this.factory.newThread(runnable);
            newThread.setName(String.format("%s [%s]", newThread.getName(), this.nameToAppend));
            return newThread;
        }
    }

    public ExecutorBoltScheduler(String str, ExecutorFactory executorFactory, JobScheduler jobScheduler, LogService logService, int i, int i2, Duration duration, int i3, ExecutorService executorService, Duration duration2) {
        this.connector = str;
        this.executorFactory = executorFactory;
        this.scheduler = jobScheduler;
        this.log = logService.getInternalLog(getClass());
        this.corePoolSize = i;
        this.maxPoolSize = i2;
        this.keepAlive = duration;
        this.queueSize = i3;
        this.forkJoinPool = executorService;
        this.shutdownWaitTime = duration2;
    }

    boolean isRegistered(BoltConnection boltConnection) {
        return this.activeConnections.containsKey(boltConnection.id());
    }

    boolean isActive(BoltConnection boltConnection) {
        return this.activeWorkItems.containsKey(boltConnection.id());
    }

    @Override // org.neo4j.bolt.runtime.scheduling.BoltScheduler
    public String connector() {
        return this.connector;
    }

    @Override // org.neo4j.bolt.runtime.scheduling.BoltScheduler
    public void init() {
        this.threadPool = this.executorFactory.create(this.corePoolSize, this.maxPoolSize, this.keepAlive, this.queueSize, true, new NameAppendingThreadFactory(this.connector, this.scheduler.threadFactory(Group.BOLT_WORKER)));
        this.log.debug("Initialized bolt thread pool");
    }

    @Override // org.neo4j.bolt.runtime.scheduling.BoltScheduler
    public void start() {
    }

    @Override // org.neo4j.bolt.runtime.scheduling.BoltScheduler
    public void stop() {
        this.log.debug("Stopping idle connections");
        this.activeConnections.values().stream().filter((v0) -> {
            return v0.idle();
        }).forEach(this::stopConnection);
        this.log.debug("Idle connections stopped");
    }

    @Override // org.neo4j.bolt.runtime.scheduling.BoltScheduler
    public void shutdown() {
        this.log.debug("Stopping connections");
        this.activeConnections.values().forEach(this::stopConnection);
        this.log.debug("Connections stopped");
        if (this.threadPool != null) {
            this.log.debug("Shutting down thread pool");
            this.threadPool.shutdown();
            try {
                if (this.threadPool.awaitTermination(this.shutdownWaitTime.toMillis(), TimeUnit.MILLISECONDS)) {
                    this.log.debug("Thread pool shut down");
                } else {
                    this.log.warn("Waited %s for the thread pool to shutdown cleanly, but timed out waiting for existing work to finish cleanly", new Object[]{this.shutdownWaitTime});
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    @Override // org.neo4j.bolt.runtime.scheduling.BoltConnectionLifetimeListener
    public void created(BoltConnection boltConnection) {
        BoltConnection put = this.activeConnections.put(boltConnection.id(), boltConnection);
        if (!$assertionsDisabled && put != null) {
            throw new AssertionError();
        }
    }

    @Override // org.neo4j.bolt.runtime.scheduling.BoltConnectionLifetimeListener
    public void closed(BoltConnection boltConnection) {
        String id = boltConnection.id();
        try {
            CompletableFuture<Boolean> remove = this.activeWorkItems.remove(id);
            if (remove != null) {
                remove.cancel(false);
            }
        } finally {
            this.activeConnections.remove(id);
        }
    }

    @Override // org.neo4j.bolt.runtime.scheduling.BoltConnectionQueueMonitor
    public void enqueued(BoltConnection boltConnection, Job job) {
        handleSubmission(boltConnection);
    }

    @Override // org.neo4j.bolt.runtime.scheduling.BoltConnectionQueueMonitor
    public void drained(BoltConnection boltConnection, Collection<Job> collection) {
    }

    private void handleSubmission(BoltConnection boltConnection) {
        this.activeWorkItems.computeIfAbsent(boltConnection.id(), str -> {
            return scheduleBatchOrHandleError(boltConnection).whenCompleteAsync((bool, th) -> {
                handleCompletion(boltConnection, bool, th);
            }, (Executor) this.forkJoinPool);
        });
    }

    private CompletableFuture<Boolean> scheduleBatchOrHandleError(BoltConnection boltConnection) {
        try {
            return CompletableFuture.supplyAsync(() -> {
                return Boolean.valueOf(executeBatch(boltConnection));
            }, this.threadPool);
        } catch (RejectedExecutionException e) {
            return CompletableFuture.failedFuture(e);
        }
    }

    private boolean executeBatch(BoltConnection boltConnection) {
        Thread currentThread = Thread.currentThread();
        String name = currentThread.getName();
        currentThread.setName(String.format("%s [%s] ", name, boltConnection.remoteAddress()));
        try {
            boolean processNextBatch = boltConnection.processNextBatch();
            currentThread.setName(name);
            return processNextBatch;
        } catch (Throwable th) {
            currentThread.setName(name);
            throw th;
        }
    }

    private void handleCompletion(BoltConnection boltConnection, Boolean bool, Throwable th) {
        if (th != null) {
            try {
                if (ExceptionUtils.hasCause(th, RejectedExecutionException.class)) {
                    boltConnection.handleSchedulingError(th);
                    this.activeWorkItems.remove(boltConnection.id());
                    return;
                }
            } catch (Throwable th2) {
                this.activeWorkItems.remove(boltConnection.id());
                throw th2;
            }
        }
        this.activeWorkItems.remove(boltConnection.id());
        if (th != null) {
            this.log.error(String.format("Unexpected error during job scheduling for session '%s'.", boltConnection.id()), th);
            stopConnection(boltConnection);
        } else if (bool.booleanValue() && boltConnection.hasPendingJobs()) {
            handleSubmission(boltConnection);
        }
    }

    private void stopConnection(BoltConnection boltConnection) {
        try {
            boltConnection.stop();
        } catch (Throwable th) {
            this.log.warn(String.format("An unexpected error occurred while stopping BoltConnection [%s]", boltConnection.id()), th);
        }
    }

    static {
        $assertionsDisabled = !ExecutorBoltScheduler.class.desiredAssertionStatus();
    }
}
