package reactor.tcp.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.net.ssl.SSLEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Environment;
import reactor.core.Reactor;
import reactor.core.composable.Deferred;
import reactor.core.composable.Promise;
import reactor.core.composable.Stream;
import reactor.core.composable.spec.Promises;
import reactor.core.composable.spec.Streams;
import reactor.function.Consumer;
import reactor.function.Supplier;
import reactor.io.Buffer;
import reactor.support.NamedDaemonThreadFactory;
import reactor.tcp.Reconnect;
import reactor.tcp.TcpClient;
import reactor.tcp.TcpConnection;
import reactor.tcp.config.ClientSocketOptions;
import reactor.tcp.config.SslOptions;
import reactor.tcp.encoding.Codec;
import reactor.tcp.ssl.SSLEngineSupplier;
import reactor.tuple.Tuple2;

/* loaded from: input_file:WEB-INF/lib/reactor-tcp-1.0.0.RELEASE.jar:reactor/tcp/netty/NettyTcpClient.class */
public class NettyTcpClient<IN, OUT> extends TcpClient<IN, OUT> {
    private final Logger log;
    private final Bootstrap bootstrap;
    private final Reactor eventsReactor;
    private final ClientSocketOptions options;
    private final EventLoopGroup ioGroup;
    private final Supplier<ChannelFuture> connectionSupplier;
    private volatile InetSocketAddress connectAddress;
    private volatile boolean closing;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: reactor.tcp.netty.NettyTcpClient$5, reason: invalid class name */
    /* loaded from: input_file:WEB-INF/lib/reactor-tcp-1.0.0.RELEASE.jar:reactor/tcp/netty/NettyTcpClient$5.class */
    public class AnonymousClass5 implements ChannelFutureListener {
        final AtomicInteger attempts = new AtomicInteger(0);
        final ChannelFutureListener self = this;
        final /* synthetic */ Reconnect val$reconnect;
        final /* synthetic */ Deferred val$connections;

        AnonymousClass5(Reconnect reconnect, Deferred deferred) {
            this.val$reconnect = reconnect;
            this.val$connections = deferred;
        }

        @Override // io.netty.util.concurrent.GenericFutureListener
        public void operationComplete(final ChannelFuture channelFuture) throws Exception {
            if (channelFuture.isSuccess()) {
                if (NettyTcpClient.this.log.isInfoEnabled()) {
                    NettyTcpClient.this.log.info("CONNECT: " + channelFuture.channel());
                }
                final NettyTcpConnection nettyTcpConnection = (NettyTcpConnection) NettyTcpClient.this.select(channelFuture.channel());
                channelFuture.channel().closeFuture().addListener2((GenericFutureListener<? extends Future<? super Void>>) new ChannelFutureListener() { // from class: reactor.tcp.netty.NettyTcpClient.5.3
                    @Override // io.netty.util.concurrent.GenericFutureListener
                    public void operationComplete(ChannelFuture channelFuture2) throws Exception {
                        if (NettyTcpClient.this.log.isInfoEnabled()) {
                            NettyTcpClient.this.log.info("CLOSED: " + channelFuture2.channel());
                        }
                        NettyTcpClient.this.connections.unregister(channelFuture2.channel());
                        NettyTcpClient.this.notifyClose(nettyTcpConnection);
                        if (nettyTcpConnection.isClosing()) {
                            return;
                        }
                        if (null == AnonymousClass5.this.val$reconnect.reconnect(NettyTcpClient.this.connectAddress, AnonymousClass5.this.attempts.incrementAndGet())) {
                            return;
                        }
                        NettyTcpClient.this.createConnection(AnonymousClass5.this.self);
                    }
                });
                channelFuture.channel().eventLoop().execute(new Runnable() { // from class: reactor.tcp.netty.NettyTcpClient.5.4
                    @Override // java.lang.Runnable
                    public void run() {
                        AnonymousClass5.this.val$connections.accept((Deferred) nettyTcpConnection);
                    }
                });
                return;
            }
            int incrementAndGet = this.attempts.incrementAndGet();
            Tuple2<InetSocketAddress, Long> reconnect = this.val$reconnect.reconnect(NettyTcpClient.this.connectAddress, incrementAndGet);
            if (null == reconnect) {
                if (NettyTcpClient.this.log.isErrorEnabled()) {
                    NettyTcpClient.this.log.error("Reconnection to {} failed after {} attempts.", NettyTcpClient.this.connectAddress, Integer.valueOf(incrementAndGet - 1));
                }
                channelFuture.channel().eventLoop().execute(new Runnable() { // from class: reactor.tcp.netty.NettyTcpClient.5.1
                    @Override // java.lang.Runnable
                    public void run() {
                        AnonymousClass5.this.val$connections.accept(channelFuture.cause());
                    }
                });
                return;
            }
            NettyTcpClient.this.connectAddress = reconnect.getT1();
            NettyTcpClient.this.bootstrap.remoteAddress(NettyTcpClient.this.connectAddress);
            long longValue = reconnect.getT2().longValue();
            if (NettyTcpClient.this.log.isInfoEnabled()) {
                NettyTcpClient.this.log.info("Attempting to reconnect to {} after {}ms", NettyTcpClient.this.connectAddress, Long.valueOf(longValue));
            }
            NettyTcpClient.this.env.getRootTimer().submit(new Consumer<Long>() { // from class: reactor.tcp.netty.NettyTcpClient.5.2
                @Override // reactor.function.Consumer
                public void accept(Long l) {
                    NettyTcpClient.this.createConnection(AnonymousClass5.this.self);
                }
            }, longValue, TimeUnit.MILLISECONDS);
        }
    }

    public NettyTcpClient(@Nonnull Environment environment, @Nonnull Reactor reactor2, @Nonnull InetSocketAddress inetSocketAddress, @Nonnull ClientSocketOptions clientSocketOptions, @Nullable final SslOptions sslOptions, @Nullable Codec<Buffer, IN, OUT> codec) {
        super(environment, reactor2, inetSocketAddress, clientSocketOptions, sslOptions, codec);
        this.log = LoggerFactory.getLogger(NettyTcpClient.class);
        this.eventsReactor = reactor2;
        this.connectAddress = inetSocketAddress;
        this.options = clientSocketOptions;
        this.ioGroup = new NioEventLoopGroup(((Integer) environment.getProperty("reactor.tcp.ioThreadCount", Integer.class, Integer.valueOf(Environment.PROCESSORS))).intValue(), new NamedDaemonThreadFactory("reactor-tcp-io"));
        this.bootstrap = new Bootstrap().group(this.ioGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_RCVBUF, Integer.valueOf(this.options.rcvbuf())).option(ChannelOption.SO_SNDBUF, Integer.valueOf(this.options.sndbuf())).option(ChannelOption.SO_KEEPALIVE, Boolean.valueOf(this.options.keepAlive())).option(ChannelOption.SO_LINGER, Integer.valueOf(this.options.linger())).option(ChannelOption.TCP_NODELAY, Boolean.valueOf(this.options.tcpNoDelay())).remoteAddress(this.connectAddress).handler(new ChannelInitializer<SocketChannel>() { // from class: reactor.tcp.netty.NettyTcpClient.1
            @Override // io.netty.channel.ChannelInitializer
            public void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.config().setConnectTimeoutMillis(NettyTcpClient.this.options.timeout());
                if (null != sslOptions) {
                    SSLEngine sSLEngine = new SSLEngineSupplier(sslOptions, true).get();
                    NettyTcpClient.this.log.debug("SSL enabled using keystore {}", null != sslOptions.keystoreFile() ? sslOptions.keystoreFile() : "<DEFAULT>");
                    socketChannel.pipeline().addLast(new SslHandler(sSLEngine));
                }
                if ((NettyTcpClient.this.options instanceof NettyClientSocketOptions) && null != ((NettyClientSocketOptions) NettyTcpClient.this.options).pipelineConfigurer()) {
                    ((NettyClientSocketOptions) NettyTcpClient.this.options).pipelineConfigurer().accept(socketChannel.pipeline());
                }
                socketChannel.pipeline().addLast(NettyTcpClient.this.createChannelHandlers(socketChannel));
            }
        });
        this.connectionSupplier = new Supplier<ChannelFuture>() { // from class: reactor.tcp.netty.NettyTcpClient.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // reactor.function.Supplier
            public ChannelFuture get() {
                if (NettyTcpClient.this.closing) {
                    return null;
                }
                return NettyTcpClient.this.bootstrap.connect(NettyTcpClient.this.connectAddress);
            }
        };
    }

    @Override // reactor.tcp.TcpClient
    public Promise<TcpConnection<IN, OUT>> open() {
        Deferred<TcpConnection<IN, OUT>, Promise<TcpConnection<IN, OUT>>> defer = Promises.defer(this.env, this.eventsReactor.getDispatcher());
        createConnection(createConnectListener(defer));
        return defer.compose();
    }

    @Override // reactor.tcp.TcpClient
    public Stream<TcpConnection<IN, OUT>> open(Reconnect reconnect) {
        Deferred<TcpConnection<IN, OUT>, Stream<TcpConnection<IN, OUT>>> defer = Streams.defer(this.env, this.eventsReactor.getDispatcher());
        createConnection(createReconnectListener(defer, reconnect));
        return defer.compose();
    }

    @Override // reactor.tcp.TcpClient
    protected <C> TcpConnection<IN, OUT> createConnection(C c) {
        SocketChannel socketChannel = (SocketChannel) c;
        return new NettyTcpConnection(this.env, getCodec(), new NettyEventLoopDispatcher(socketChannel.eventLoop(), ((Integer) this.env.getProperty("reactor.tcp.connectionReactorBacklog", Integer.class, 128)).intValue()), this.eventsReactor, socketChannel, this.connectAddress);
    }

    protected ChannelHandler[] createChannelHandlers(SocketChannel socketChannel) {
        return new ChannelHandler[]{new NettyTcpConnectionChannelInboundHandler((NettyTcpConnection) select(socketChannel))};
    }

    @Override // reactor.tcp.TcpClient
    protected void doClose(Deferred<Void, Promise<Void>> deferred) {
        this.closing = true;
        try {
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            this.ioGroup.shutdownGracefully().addListener2(new GenericFutureListener() { // from class: reactor.tcp.netty.NettyTcpClient.3
                @Override // io.netty.util.concurrent.GenericFutureListener
                public void operationComplete(Future future) throws Exception {
                    countDownLatch.countDown();
                }
            });
            if (countDownLatch.await(30L, TimeUnit.SECONDS)) {
                deferred.accept((Deferred<Void, Promise<Void>>) null);
            } else {
                deferred.accept(new TimeoutException("NettyTcpClient could not close connection after 30 seconds"));
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            deferred.accept(e);
        }
    }

    private ChannelFutureListener createConnectListener(final Deferred<TcpConnection<IN, OUT>, Promise<TcpConnection<IN, OUT>>> deferred) {
        return new ChannelFutureListener() { // from class: reactor.tcp.netty.NettyTcpClient.4
            @Override // io.netty.util.concurrent.GenericFutureListener
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (!channelFuture.isSuccess()) {
                    if (NettyTcpClient.this.log.isErrorEnabled()) {
                        NettyTcpClient.this.log.error(channelFuture.cause().getMessage(), channelFuture.cause());
                    }
                    deferred.accept(channelFuture.cause());
                } else {
                    if (NettyTcpClient.this.log.isInfoEnabled()) {
                        NettyTcpClient.this.log.info("CONNECT: " + channelFuture.channel());
                    }
                    final NettyTcpConnection nettyTcpConnection = (NettyTcpConnection) NettyTcpClient.this.select(channelFuture.channel());
                    channelFuture.channel().closeFuture().addListener2((GenericFutureListener<? extends Future<? super Void>>) new ChannelFutureListener() { // from class: reactor.tcp.netty.NettyTcpClient.4.1
                        @Override // io.netty.util.concurrent.GenericFutureListener
                        public void operationComplete(ChannelFuture channelFuture2) throws Exception {
                            if (NettyTcpClient.this.log.isInfoEnabled()) {
                                NettyTcpClient.this.log.info("CLOSED: " + channelFuture2.channel());
                            }
                            NettyTcpClient.this.connections.unregister(channelFuture2.channel());
                            NettyTcpClient.this.notifyClose(nettyTcpConnection);
                        }
                    });
                    deferred.accept((Deferred) nettyTcpConnection);
                }
            }
        };
    }

    private ChannelFutureListener createReconnectListener(Deferred<TcpConnection<IN, OUT>, Stream<TcpConnection<IN, OUT>>> deferred, Reconnect reconnect) {
        return new AnonymousClass5(reconnect, deferred);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void createConnection(ChannelFutureListener channelFutureListener) {
        ChannelFuture channelFuture = this.connectionSupplier.get();
        if (channelFuture != null) {
            channelFuture.addListener2((GenericFutureListener<? extends Future<? super Void>>) channelFutureListener);
        }
    }
}
