package org.neo4j.coreedge.messaging;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.neo4j.coreedge.messaging.monitoring.MessageQueueMonitor;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

/* loaded from: input_file:org/neo4j/coreedge/messaging/SenderService.class */
public class SenderService extends LifecycleAdapter implements Outbound<AdvertisedSocketAddress, Message> {
    private final ChannelInitializer<SocketChannel> channelInitializer;
    private final Log log;
    private final Monitors monitors;
    private JobScheduler.JobHandle jobHandle;
    private boolean senderServiceRunning;
    private Bootstrap bootstrap;
    private NioEventLoopGroup eventLoopGroup;
    private int maxQueueSize;
    private final ReadWriteLock serviceLock = new ReentrantReadWriteLock();
    private NonBlockingChannels nonBlockingChannels = new NonBlockingChannels();

    public SenderService(ChannelInitializer<SocketChannel> channelInitializer, LogProvider logProvider, Monitors monitors, int i) {
        this.channelInitializer = channelInitializer;
        this.log = logProvider.getLog(getClass());
        this.monitors = monitors;
        this.maxQueueSize = i;
    }

    @Override // org.neo4j.coreedge.messaging.Outbound
    public void send(AdvertisedSocketAddress advertisedSocketAddress, Message message) {
        this.serviceLock.readLock().lock();
        try {
            if (this.senderServiceRunning) {
                channel(advertisedSocketAddress).send(message);
            }
        } finally {
            this.serviceLock.readLock().unlock();
        }
    }

    @Override // org.neo4j.coreedge.messaging.Outbound
    public void send(AdvertisedSocketAddress advertisedSocketAddress, Collection<Message> collection) {
        this.serviceLock.readLock().lock();
        try {
            if (this.senderServiceRunning) {
                NonBlockingChannel channel = channel(advertisedSocketAddress);
                channel.getClass();
                collection.forEach((v1) -> {
                    r1.send(v1);
                });
                this.serviceLock.readLock().unlock();
            }
        } finally {
            this.serviceLock.readLock().unlock();
        }
    }

    private NonBlockingChannel channel(AdvertisedSocketAddress advertisedSocketAddress) {
        MessageQueueMonitor messageQueueMonitor = (MessageQueueMonitor) this.monitors.newMonitor(MessageQueueMonitor.class, NonBlockingChannel.class, new String[0]);
        NonBlockingChannel nonBlockingChannel = this.nonBlockingChannels.get(advertisedSocketAddress);
        if (nonBlockingChannel == null) {
            nonBlockingChannel = new NonBlockingChannel(this.bootstrap, advertisedSocketAddress.socketAddress(), this.log, messageQueueMonitor, this.maxQueueSize);
            NonBlockingChannel putIfAbsent = this.nonBlockingChannels.putIfAbsent(advertisedSocketAddress, nonBlockingChannel);
            if (putIfAbsent != null) {
                nonBlockingChannel.dispose();
                nonBlockingChannel = putIfAbsent;
            }
        }
        messageQueueMonitor.register(advertisedSocketAddress.socketAddress());
        return nonBlockingChannel;
    }

    public synchronized void start() {
        this.serviceLock.writeLock().lock();
        try {
            this.eventLoopGroup = new NioEventLoopGroup(0, new NamedThreadFactory("sender-service"));
            this.bootstrap = new Bootstrap().group(this.eventLoopGroup).channel(NioSocketChannel.class).handler(this.channelInitializer);
            this.senderServiceRunning = true;
        } finally {
            this.serviceLock.writeLock().unlock();
        }
    }

    public synchronized void stop() {
        this.serviceLock.writeLock().lock();
        try {
            this.senderServiceRunning = false;
            if (this.jobHandle != null) {
                this.jobHandle.cancel(true);
                this.jobHandle = null;
            }
            Iterator<NonBlockingChannel> it = this.nonBlockingChannels.values().iterator();
            while (it.hasNext()) {
                it.next().dispose();
                it.remove();
            }
            try {
                this.eventLoopGroup.shutdownGracefully(0L, 0L, TimeUnit.MICROSECONDS).sync();
            } catch (InterruptedException e) {
                this.log.warn("Interrupted while stopping sender service.");
            }
        } finally {
            this.serviceLock.writeLock().unlock();
        }
    }
}
