package net.corda.nodeapi.internal.protonwrapper.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.netty.util.internal.logging.Slf4JLoggerFactory;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManagerFactory;
import kotlin.Metadata;
import kotlin.Pair;
import kotlin.TuplesKt;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.collections.MapsKt;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import net.bytebuddy.implementation.auxiliary.TypeProxy;
import net.corda.core.utilities.KotlinUtilsKt;
import net.corda.core.utilities.NetworkHostAndPort;
import net.corda.nodeapi.internal.ArtemisUtils;
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage;
import net.corda.nodeapi.internal.protonwrapper.messages.SendableMessage;
import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl;
import net.corda.nodeapi.internal.revocation.CertDistPointCrlSource;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.qpid.proton.engine.Delivery;
import org.hibernate.event.internal.EntityCopyAllowedLoggedObserver;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import rx.Observable;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;

/* compiled from: AMQPServer.kt */
@Metadata(mv = {1, 1, 11}, bv = {1, 0, 2}, k = 1, d1 = {"��¢\u0001\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0010\u000e\n��\n\u0002\u0010\b\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010\u000b\n\u0002\b\u0003\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\b\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0010\u0012\n\u0002\b\u0003\n\u0002\u0018\u0002\n��\n\u0002\u0010$\n\u0002\u0010��\n\u0002\b\t\u0018�� F2\u00020\u0001:\u0002FGB=\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007\u0012\b\b\u0002\u0010\b\u001a\u00020\u0003\u0012\b\b\u0002\u0010\t\u001a\u00020\n\u0012\n\b\u0002\u0010\u000b\u001a\u0004\u0018\u00010\u0005¢\u0006\u0002\u0010\fJ\b\u0010/\u001a\u000200H\u0016J\u0016\u00101\u001a\u0002002\u0006\u00102\u001a\u0002032\u0006\u00104\u001a\u00020\u0017J<\u00105\u001a\u0002062\u0006\u00107\u001a\u0002082\u0006\u00109\u001a\u00020\u00032\u0006\u0010:\u001a\u00020\u00032\u0006\u0010;\u001a\u00020<2\u0014\u0010=\u001a\u0010\u0012\u0004\u0012\u00020\u0003\u0012\u0006\u0012\u0004\u0018\u00010?0>J\u000e\u0010@\u001a\u0002002\u0006\u0010A\u001a\u00020\u0017J\u0006\u0010B\u001a\u000200J\u0006\u0010C\u001a\u000200J\u000e\u0010D\u001a\u0002002\u0006\u0010E\u001a\u000206RN\u0010\r\u001aB\u0012\f\u0012\n \u0010*\u0004\u0018\u00010\u000f0\u000f\u0012\f\u0012\n \u0010*\u0004\u0018\u00010\u000f0\u000f \u0010* \u0012\f\u0012\n \u0010*\u0004\u0018\u00010\u000f0\u000f\u0012\f\u0012\n \u0010*\u0004\u0018\u00010\u000f0\u000f\u0018\u00010\u000e0\u000eX\u0082\u0004¢\u0006\u0002\n��RN\u0010\u0011\u001aB\u0012\f\u0012\n \u0010*\u0004\u0018\u00010\u00120\u0012\u0012\f\u0012\n \u0010*\u0004\u0018\u00010\u00120\u0012 \u0010* \u0012\f\u0012\n \u0010*\u0004\u0018\u00010\u00120\u0012\u0012\f\u0012\n \u0010*\u0004\u0018\u00010\u00120\u0012\u0018\u00010\u000e0\u000eX\u0082\u0004¢\u0006\u0002\n��R\u0010\u0010\u0013\u001a\u0004\u0018\u00010\u0014X\u0082\u000e¢\u0006\u0002\n��R\u001a\u0010\u0015\u001a\u000e\u0012\u0004\u0012\u00020\u0017\u0012\u0004\u0012\u00020\u00180\u0016X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0006\u001a\u00020\u0007X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\t\u001a\u00020\nX\u0082\u0004¢\u0006\u0002\n��R\u0011\u0010\u0002\u001a\u00020\u0003¢\u0006\b\n��\u001a\u0004\b\u0019\u0010\u001aR\u0011\u0010\u001b\u001a\u00020\u001c8F¢\u0006\u0006\u001a\u0004\b\u001d\u0010\u001eR\u000e\u0010\u001f\u001a\u00020 X\u0082\u0004¢\u0006\u0002\n��R\u0017\u0010!\u001a\b\u0012\u0004\u0012\u00020\u000f0\"8F¢\u0006\u0006\u001a\u0004\b#\u0010$R\u0017\u0010%\u001a\b\u0012\u0004\u0012\u00020\u00120\"8F¢\u0006\u0006\u001a\u0004\b&\u0010$R\u0011\u0010\u0004\u001a\u00020\u0005¢\u0006\b\n��\u001a\u0004\b'\u0010(R\u0012\u0010\u000b\u001a\u0004\u0018\u00010\u0005X\u0082\u0004¢\u0006\u0004\n\u0002\u0010)R\u0010\u0010*\u001a\u0004\u0018\u00010+X\u0082\u000e¢\u0006\u0002\n��R\u0010\u0010,\u001a\u0004\u0018\u00010-X\u0082\u000e¢\u0006\u0002\n��R\u000e\u0010\b\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n��R\u0010\u0010.\u001a\u0004\u0018\u00010\u0014X\u0082\u000e¢\u0006\u0002\n��¨\u0006H"}, d2 = {"Lnet/corda/nodeapi/internal/protonwrapper/netty/AMQPServer;", "Ljava/lang/AutoCloseable;", "hostName", "", "port", "", "configuration", "Lnet/corda/nodeapi/internal/protonwrapper/netty/AMQPConfiguration;", "threadPoolName", "distPointCrlSource", "Lnet/corda/nodeapi/internal/revocation/CertDistPointCrlSource;", TransportConstants.REMOTING_THREADS_PROPNAME, "(Ljava/lang/String;ILnet/corda/nodeapi/internal/protonwrapper/netty/AMQPConfiguration;Ljava/lang/String;Lnet/corda/nodeapi/internal/revocation/CertDistPointCrlSource;Ljava/lang/Integer;)V", "_onConnection", "Lrx/subjects/SerializedSubject;", "Lnet/corda/nodeapi/internal/protonwrapper/netty/ConnectionChange;", "kotlin.jvm.PlatformType", "_onReceive", "Lnet/corda/nodeapi/internal/protonwrapper/messages/ReceivedMessage;", "bossGroup", "Lio/netty/channel/EventLoopGroup;", "clientChannels", "Ljava/util/concurrent/ConcurrentHashMap;", "Ljava/net/InetSocketAddress;", "Lio/netty/channel/socket/SocketChannel;", "getHostName", "()Ljava/lang/String;", "listening", "", "getListening", "()Z", "lock", "Ljava/util/concurrent/locks/ReentrantLock;", "onConnection", "Lrx/Observable;", "getOnConnection", "()Lrx/Observable;", "onReceive", "getOnReceive", "getPort", "()I", "Ljava/lang/Integer;", "serverChannel", "Lio/netty/channel/Channel;", "sslDelegatedTaskExecutor", "Ljava/util/concurrent/ExecutorService;", "workerGroup", "close", "", "complete", "delivery", "Lorg/apache/qpid/proton/engine/Delivery;", TypeProxy.INSTANCE_FIELD, "createMessage", "Lnet/corda/nodeapi/internal/protonwrapper/messages/SendableMessage;", "payload", "", "topic", "destinationLegalName", "destinationLink", "Lnet/corda/core/utilities/NetworkHostAndPort;", "properties", "", "", "dropConnection", "connectionRemoteHost", "start", "stop", "write", "msg", "Companion", "ServerChannelInitializer", "node-api"})
/* loaded from: input_file:corda-node-api-4.8.12.jar:net/corda/nodeapi/internal/protonwrapper/netty/AMQPServer.class */
public final class AMQPServer implements AutoCloseable {
    private final ReentrantLock lock;
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    private Channel serverChannel;
    private ExecutorService sslDelegatedTaskExecutor;
    private final ConcurrentHashMap<InetSocketAddress, SocketChannel> clientChannels;
    private final SerializedSubject<ReceivedMessage, ReceivedMessage> _onReceive;
    private final SerializedSubject<ConnectionChange, ConnectionChange> _onConnection;

    @NotNull
    private final String hostName;
    private final int port;
    private final AMQPConfiguration configuration;
    private final String threadPoolName;
    private final CertDistPointCrlSource distPointCrlSource;
    private final Integer remotingThreads;
    private static final Logger log;
    private static final Integer DEFAULT_REMOTING_THREADS;
    public static final Companion Companion = new Companion(null);

    /* compiled from: AMQPServer.kt */
    @Metadata(mv = {1, 1, 11}, bv = {1, 0, 2}, k = 1, d1 = {"��\u001a\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0010\b\n\u0002\b\u0003\n\u0002\u0018\u0002\n��\b\u0086\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002R\u0018\u0010\u0003\u001a\n \u0005*\u0004\u0018\u00010\u00040\u0004X\u0082\u0004¢\u0006\u0004\n\u0002\u0010\u0006R\u000e\u0010\u0007\u001a\u00020\bX\u0082\u0004¢\u0006\u0002\n��¨\u0006\t"}, d2 = {"Lnet/corda/nodeapi/internal/protonwrapper/netty/AMQPServer$Companion;", "", "()V", "DEFAULT_REMOTING_THREADS", "", "kotlin.jvm.PlatformType", "Ljava/lang/Integer;", EntityCopyAllowedLoggedObserver.SHORT_NAME, "Lorg/slf4j/Logger;", "node-api"})
    /* loaded from: input_file:corda-node-api-4.8.12.jar:net/corda/nodeapi/internal/protonwrapper/netty/AMQPServer$Companion.class */
    public static final class Companion {
        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* compiled from: AMQPServer.kt */
    @Metadata(mv = {1, 1, 11}, bv = {1, 0, 2}, k = 1, d1 = {"��R\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0010$\n\u0002\u0010\u000e\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0002\b\u0002\u0018��2\b\u0012\u0004\u0012\u00020\u00020\u0001B\r\u0012\u0006\u0010\u0003\u001a\u00020\u0004¢\u0006\u0002\u0010\u0005J0\u0010\u000e\u001a\u001a\u0012\u0004\u0012\u00020\u0010\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\u0012\u0012\u0004\u0012\u00020\u00130\u00110\u000f2\u0006\u0010\u0014\u001a\u00020\u00072\u0006\u0010\u0015\u001a\u00020\u0002H\u0002J\u0010\u0010\u0016\u001a\u00020\u00172\u0006\u0010\u0015\u001a\u00020\u0002H\u0014J\u0018\u0010\u0018\u001a\u00020\u00172\u0006\u0010\u0019\u001a\u00020\u00022\u0006\u0010\u001a\u001a\u00020\u001bH\u0002J\u0018\u0010\u001c\u001a\u00020\u00172\u0006\u0010\u0019\u001a\u00020\u00022\u0006\u0010\u001a\u001a\u00020\u001bH\u0002R\u000e\u0010\u0006\u001a\u00020\u0007X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\b\u001a\u00020\tX\u0082\u0004¢\u0006\u0002\n��R\u0011\u0010\u0003\u001a\u00020\u0004¢\u0006\b\n��\u001a\u0004\b\n\u0010\u000bR\u000e\u0010\f\u001a\u00020\rX\u0082\u0004¢\u0006\u0002\n��¨\u0006\u001d"}, d2 = {"Lnet/corda/nodeapi/internal/protonwrapper/netty/AMQPServer$ServerChannelInitializer;", "Lio/netty/channel/ChannelInitializer;", "Lio/netty/channel/socket/SocketChannel;", "parent", "Lnet/corda/nodeapi/internal/protonwrapper/netty/AMQPServer;", "(Lnet/corda/nodeapi/internal/protonwrapper/netty/AMQPServer;)V", "conf", "Lnet/corda/nodeapi/internal/protonwrapper/netty/AMQPConfiguration;", "keyManagerFactory", "Ljavax/net/ssl/KeyManagerFactory;", "getParent", "()Lnet/corda/nodeapi/internal/protonwrapper/netty/AMQPServer;", "trustManagerFactory", "Ljavax/net/ssl/TrustManagerFactory;", "createSSLHandler", "Lkotlin/Pair;", "Lio/netty/channel/ChannelHandler;", "", "", "Lnet/corda/nodeapi/internal/protonwrapper/netty/CertHoldingKeyManagerFactoryWrapper;", "amqpConfig", "ch", "initChannel", "", "onChannelClose", "channel", "change", "Lnet/corda/nodeapi/internal/protonwrapper/netty/ConnectionChange;", "onChannelOpen", "node-api"})
    /* loaded from: input_file:corda-node-api-4.8.12.jar:net/corda/nodeapi/internal/protonwrapper/netty/AMQPServer$ServerChannelInitializer.class */
    public static final class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
        private final KeyManagerFactory keyManagerFactory;
        private final TrustManagerFactory trustManagerFactory;
        private final AMQPConfiguration conf;

        @NotNull
        private final AMQPServer parent;

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.netty.channel.ChannelInitializer
        public void initChannel(@NotNull SocketChannel ch) {
            Intrinsics.checkParameterIsNotNull(ch, "ch");
            AMQPConfiguration aMQPConfiguration = this.parent.configuration;
            ChannelPipeline pipeline = ch.pipeline();
            String healthCheckPhrase = aMQPConfiguration.getHealthCheckPhrase();
            if (healthCheckPhrase != null) {
                pipeline.addLast(ModeSelectingChannel.NAME, new ModeSelectingChannel(healthCheckPhrase));
            }
            Pair<ChannelHandler, Map<String, CertHoldingKeyManagerFactoryWrapper>> createSSLHandler = createSSLHandler(aMQPConfiguration, ch);
            ChannelHandler component1 = createSSLHandler.component1();
            Map<String, CertHoldingKeyManagerFactoryWrapper> component2 = createSSLHandler.component2();
            pipeline.addLast("sslHandler", component1);
            if (this.conf.getTrace()) {
                pipeline.addLast("logger", new LoggingHandler(LogLevel.INFO));
            }
            Set<String> silencedIPs = aMQPConfiguration.getSilencedIPs();
            InetSocketAddress remoteAddress = ch.remoteAddress();
            pipeline.addLast(new AMQPChannelHandler(true, null, component2, this.conf.getUserName(), this.conf.getPassword(), this.conf.getTrace(), CollectionsKt.contains(silencedIPs, remoteAddress != null ? remoteAddress.getHostString() : null), new AMQPServer$ServerChannelInitializer$initChannel$2(this), new AMQPServer$ServerChannelInitializer$initChannel$3(this), new AMQPServer$ServerChannelInitializer$initChannel$4(this.parent._onReceive)));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public final void onChannelOpen(SocketChannel socketChannel, ConnectionChange connectionChange) {
            AMQPServer aMQPServer = this.parent;
            ConcurrentHashMap concurrentHashMap = aMQPServer.clientChannels;
            InetSocketAddress remoteAddress = socketChannel.remoteAddress();
            Intrinsics.checkExpressionValueIsNotNull(remoteAddress, "channel.remoteAddress()");
            concurrentHashMap.put(remoteAddress, socketChannel);
            aMQPServer._onConnection.onNext(connectionChange);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public final void onChannelClose(SocketChannel socketChannel, ConnectionChange connectionChange) {
            AMQPServer aMQPServer = this.parent;
            aMQPServer.clientChannels.remove(socketChannel.remoteAddress());
            aMQPServer._onConnection.onNext(connectionChange);
        }

        private final Pair<ChannelHandler, Map<String, CertHoldingKeyManagerFactoryWrapper>> createSSLHandler(AMQPConfiguration aMQPConfiguration, SocketChannel socketChannel) {
            SslHandler createServerSslHandler;
            if (aMQPConfiguration.getUseOpenSsl() && aMQPConfiguration.getEnableSNI() && aMQPConfiguration.getKeyStore().aliases().size() > 1) {
                Map<String, CertHoldingKeyManagerFactoryWrapper> splitKeystore = SSLHelperKt.splitKeystore(aMQPConfiguration);
                return new Pair<>(SSLHelperKt.createServerSNIOpenSniHandler(splitKeystore, this.trustManagerFactory), splitKeystore);
            }
            CertHoldingKeyManagerFactoryWrapper certHoldingKeyManagerFactoryWrapper = new CertHoldingKeyManagerFactoryWrapper(this.keyManagerFactory, aMQPConfiguration);
            ExecutorService executorService = this.parent.sslDelegatedTaskExecutor;
            if (executorService == null) {
                throw new IllegalStateException("Required value was null.".toString());
            }
            if (aMQPConfiguration.getUseOpenSsl()) {
                TrustManagerFactory trustManagerFactory = this.trustManagerFactory;
                ByteBufAllocator alloc = socketChannel.alloc();
                Intrinsics.checkExpressionValueIsNotNull(alloc, "ch.alloc()");
                createServerSslHandler = SSLHelperKt.createServerOpenSslHandler(certHoldingKeyManagerFactoryWrapper, trustManagerFactory, alloc, executorService);
            } else {
                createServerSslHandler = SSLHelperKt.createServerSslHandler(aMQPConfiguration.getKeyStore(), certHoldingKeyManagerFactoryWrapper, this.trustManagerFactory, executorService);
            }
            SslHandler sslHandler = createServerSslHandler;
            sslHandler.setHandshakeTimeoutMillis(aMQPConfiguration.getSslHandshakeTimeout().toMillis());
            return new Pair<>(sslHandler, MapsKt.mapOf(TuplesKt.to("default", certHoldingKeyManagerFactoryWrapper)));
        }

        @NotNull
        public final AMQPServer getParent() {
            return this.parent;
        }

        public ServerChannelInitializer(@NotNull AMQPServer parent) {
            Intrinsics.checkParameterIsNotNull(parent, "parent");
            this.parent = parent;
            this.keyManagerFactory = SSLHelperKt.keyManagerFactory(this.parent.configuration.getKeyStore());
            this.trustManagerFactory = SSLHelperKt.trustManagerFactoryWithRevocation(this.parent.configuration.getTrustStore(), this.parent.configuration.getRevocationConfig(), this.parent.distPointCrlSource);
            this.conf = this.parent.configuration;
        }
    }

    /* JADX WARN: Type inference failed for: r0v23, types: [java.lang.Object, io.netty.channel.ChannelFuture] */
    public final void start() {
        int intValue;
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            stop();
            this.sslDelegatedTaskExecutor = SSLHelperKt.sslDelegatedTaskExecutor(this.threadPoolName);
            this.bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory(this.threadPoolName + "-boss", 10));
            Integer num = this.remotingThreads;
            if (num != null) {
                intValue = num.intValue();
            } else {
                Integer DEFAULT_REMOTING_THREADS2 = DEFAULT_REMOTING_THREADS;
                Intrinsics.checkExpressionValueIsNotNull(DEFAULT_REMOTING_THREADS2, "DEFAULT_REMOTING_THREADS");
                intValue = DEFAULT_REMOTING_THREADS2.intValue();
            }
            this.workerGroup = new NioEventLoopGroup(intValue, new DefaultThreadFactory(this.threadPoolName + "-worker", 10));
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(this.bossGroup, this.workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).handler(new NettyServerEventLogger(LogLevel.INFO, this.configuration.getSilencedIPs())).childHandler(new ServerChannelInitializer(this));
            log.info("Try to bind " + this.port);
            ?? channelFuture = serverBootstrap.bind(this.hostName, this.port).sync2();
            Intrinsics.checkExpressionValueIsNotNull(channelFuture, "channelFuture");
            if (!channelFuture.isDone() || !channelFuture.isSuccess()) {
                throw new BindException("Failed to bind port " + this.port);
            }
            log.info("Listening on port " + this.port);
            this.serverChannel = channelFuture.channel();
            Unit unit = Unit.INSTANCE;
            reentrantLock.unlock();
        } catch (Throwable th) {
            reentrantLock.unlock();
            throw th;
        }
    }

    public final void stop() {
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            Channel channel = this.serverChannel;
            if (channel != null) {
                channel.close();
            }
            this.serverChannel = (Channel) null;
            EventLoopGroup eventLoopGroup = this.workerGroup;
            if (eventLoopGroup != null) {
                eventLoopGroup.shutdownGracefully();
            }
            EventLoopGroup eventLoopGroup2 = this.workerGroup;
            if (eventLoopGroup2 != null) {
                Future<?> terminationFuture = eventLoopGroup2.terminationFuture();
                if (terminationFuture != null) {
                    terminationFuture.sync2();
                }
            }
            this.workerGroup = (EventLoopGroup) null;
            EventLoopGroup eventLoopGroup3 = this.bossGroup;
            if (eventLoopGroup3 != null) {
                eventLoopGroup3.shutdownGracefully();
            }
            EventLoopGroup eventLoopGroup4 = this.bossGroup;
            if (eventLoopGroup4 != null) {
                Future<?> terminationFuture2 = eventLoopGroup4.terminationFuture();
                if (terminationFuture2 != null) {
                    terminationFuture2.sync2();
                }
            }
            this.bossGroup = (EventLoopGroup) null;
            ExecutorService executorService = this.sslDelegatedTaskExecutor;
            if (executorService != null) {
                executorService.shutdown();
            }
            this.sslDelegatedTaskExecutor = (ExecutorService) null;
            Unit unit = Unit.INSTANCE;
            reentrantLock.unlock();
        } catch (Throwable th) {
            reentrantLock.unlock();
            throw th;
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        stop();
    }

    public final boolean getListening() {
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            Channel channel = this.serverChannel;
            reentrantLock.unlock();
            if (channel != null) {
                return channel.isActive();
            }
            return false;
        } catch (Throwable th) {
            reentrantLock.unlock();
            throw th;
        }
    }

    @NotNull
    public final SendableMessage createMessage(@NotNull byte[] payload, @NotNull String topic, @NotNull String destinationLegalName, @NotNull NetworkHostAndPort destinationLink, @NotNull Map<String, ? extends Object> properties) {
        Intrinsics.checkParameterIsNotNull(payload, "payload");
        Intrinsics.checkParameterIsNotNull(topic, "topic");
        Intrinsics.checkParameterIsNotNull(destinationLegalName, "destinationLegalName");
        Intrinsics.checkParameterIsNotNull(destinationLink, "destinationLink");
        Intrinsics.checkParameterIsNotNull(properties, "properties");
        ArtemisUtils.requireMessageSize(payload.length, this.configuration.getMaxMessageSize());
        InetSocketAddress inetSocketAddress = new InetSocketAddress(destinationLink.getHost(), destinationLink.getPort());
        if (((ConcurrentHashMap.KeySetView) this.clientChannels.keySet()).contains(inetSocketAddress)) {
            return new SendableMessageImpl(payload, topic, destinationLegalName, destinationLink, properties);
        }
        throw new IllegalArgumentException(("Destination " + inetSocketAddress + " is not available").toString());
    }

    public final void write(@NotNull SendableMessage msg) {
        Intrinsics.checkParameterIsNotNull(msg, "msg");
        SocketChannel socketChannel = this.clientChannels.get(new InetSocketAddress(msg.getDestinationLink().getHost(), msg.getDestinationLink().getPort()));
        if (socketChannel == null) {
            throw new IllegalStateException("Connection to " + msg.getDestinationLink() + " not active");
        }
        Logger logger = log;
        if (logger.isDebugEnabled()) {
            logger.debug("Writing message with payload of size " + msg.getPayload().length + " into channel " + socketChannel);
        }
        socketChannel.writeAndFlush(msg);
        Logger logger2 = log;
        if (logger2.isDebugEnabled()) {
            logger2.debug("Done writing message with payload of size " + msg.getPayload().length + " into channel " + socketChannel);
        }
    }

    public final void dropConnection(@NotNull InetSocketAddress connectionRemoteHost) {
        Intrinsics.checkParameterIsNotNull(connectionRemoteHost, "connectionRemoteHost");
        SocketChannel socketChannel = this.clientChannels.get(connectionRemoteHost);
        if (socketChannel != null) {
            socketChannel.close();
        }
    }

    public final void complete(@NotNull Delivery delivery, @NotNull InetSocketAddress target) {
        Intrinsics.checkParameterIsNotNull(delivery, "delivery");
        Intrinsics.checkParameterIsNotNull(target, "target");
        SocketChannel socketChannel = this.clientChannels.get(target);
        if (socketChannel != null) {
            Logger logger = log;
            if (logger.isDebugEnabled()) {
                logger.debug("Writing delivery " + delivery + " into channel " + socketChannel);
            }
            socketChannel.writeAndFlush(delivery);
            Logger logger2 = log;
            if (logger2.isDebugEnabled()) {
                logger2.debug("Done writing delivery " + delivery + " into channel " + socketChannel);
            }
        }
    }

    @NotNull
    public final Observable<ReceivedMessage> getOnReceive() {
        SerializedSubject<ReceivedMessage, ReceivedMessage> _onReceive = this._onReceive;
        Intrinsics.checkExpressionValueIsNotNull(_onReceive, "_onReceive");
        return _onReceive;
    }

    @NotNull
    public final Observable<ConnectionChange> getOnConnection() {
        SerializedSubject<ConnectionChange, ConnectionChange> _onConnection = this._onConnection;
        Intrinsics.checkExpressionValueIsNotNull(_onConnection, "_onConnection");
        return _onConnection;
    }

    @NotNull
    public final String getHostName() {
        return this.hostName;
    }

    public final int getPort() {
        return this.port;
    }

    public AMQPServer(@NotNull String hostName, int i, @NotNull AMQPConfiguration configuration, @NotNull String threadPoolName, @NotNull CertDistPointCrlSource distPointCrlSource, @Nullable Integer num) {
        Intrinsics.checkParameterIsNotNull(hostName, "hostName");
        Intrinsics.checkParameterIsNotNull(configuration, "configuration");
        Intrinsics.checkParameterIsNotNull(threadPoolName, "threadPoolName");
        Intrinsics.checkParameterIsNotNull(distPointCrlSource, "distPointCrlSource");
        this.hostName = hostName;
        this.port = i;
        this.configuration = configuration;
        this.threadPoolName = threadPoolName;
        this.distPointCrlSource = distPointCrlSource;
        this.remotingThreads = num;
        this.lock = new ReentrantLock();
        this.clientChannels = new ConcurrentHashMap<>();
        this._onReceive = PublishSubject.create().toSerialized();
        this._onConnection = PublishSubject.create().toSerialized();
    }

    public /* synthetic */ AMQPServer(String str, int i, AMQPConfiguration aMQPConfiguration, String str2, CertDistPointCrlSource certDistPointCrlSource, Integer num, int i2, DefaultConstructorMarker defaultConstructorMarker) {
        this(str, i, aMQPConfiguration, (i2 & 8) != 0 ? "AMQPServer" : str2, (i2 & 16) != 0 ? CertDistPointCrlSource.Companion.getSINGLETON() : certDistPointCrlSource, (i2 & 32) != 0 ? (Integer) null : num);
    }

    static {
        InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE);
        log = KotlinUtilsKt.contextLogger(Companion);
        DEFAULT_REMOTING_THREADS = Integer.getInteger("net.corda.nodeapi.amqpserver.NumServerThreads", 4);
    }
}
