package io.pravega.client.netty.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.util.FingerprintTrustManagerFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.pravega.client.ClientConfig;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.shared.protocol.netty.AppendBatchSizeTracker;
import io.pravega.shared.protocol.netty.CommandDecoder;
import io.pravega.shared.protocol.netty.CommandEncoder;
import io.pravega.shared.protocol.netty.ConnectionFailedException;
import io.pravega.shared.protocol.netty.ExceptionLoggingHandler;
import io.pravega.shared.protocol.netty.PravegaNodeUri;
import io.pravega.shared.protocol.netty.ReplyProcessor;
import io.pravega.shared.protocol.netty.WireCommands;
import java.io.File;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLParameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/client/netty/impl/ConnectionPoolImpl.class */
public class ConnectionPoolImpl implements ConnectionPool {
    private final ClientConfig clientConfig;

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(ConnectionPoolImpl.class);
    private static final Comparator<Connection> COMPARATOR = new Comparator<Connection>() { // from class: io.pravega.client.netty.impl.ConnectionPoolImpl.1
        @Override // java.util.Comparator
        public int compare(Connection connection, Connection connection2) {
            return Integer.compare(connection.getFlowCount().orElse(Integer.MAX_VALUE).intValue(), connection2.getFlowCount().orElse(Integer.MAX_VALUE).intValue());
        }
    };

    @SuppressFBWarnings(justification = "generated code")
    private final Object $lock = new Object[0];
    private final AtomicBoolean closed = new AtomicBoolean(false);

    @VisibleForTesting
    private final ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @GuardedBy("$lock")
    private final Map<PravegaNodeUri, List<Connection>> connectionMap = new HashMap();
    private final EventLoopGroup group = getEventLoopGroup();

    public ConnectionPoolImpl(ClientConfig clientConfig) {
        this.clientConfig = clientConfig;
    }

    @Override // io.pravega.client.netty.impl.ConnectionPool
    public CompletableFuture<ClientConnection> getClientConnection(Flow flow, PravegaNodeUri pravegaNodeUri, ReplyProcessor replyProcessor) {
        Connection connection;
        CompletableFuture thenApply;
        synchronized (this.$lock) {
            Preconditions.checkNotNull(flow, "Flow");
            Preconditions.checkNotNull(pravegaNodeUri, "Location");
            Preconditions.checkNotNull(replyProcessor, "ReplyProcessor");
            Exceptions.checkNotClosed(this.closed.get(), this);
            List<Connection> list = (List) this.connectionMap.getOrDefault(pravegaNodeUri, new ArrayList()).stream().filter(connection2 -> {
                CompletableFuture<FlowHandler> flowHandler = connection2.getFlowHandler();
                return !flowHandler.isDone() || (Futures.isSuccessful(flowHandler) && flowHandler.join().isConnectionEstablished());
            }).collect(Collectors.toList());
            log.debug("List of connections to {} that can be used: {}", pravegaNodeUri, list);
            Optional<Connection> min = list.stream().min(COMPARATOR);
            if (!min.isPresent() || (list.size() < this.clientConfig.getMaxConnectionsPerSegmentStore() && !isUnused(min.get()))) {
                log.debug("Creating a new connection to {}", pravegaNodeUri);
                connection = new Connection(pravegaNodeUri, establishConnection(pravegaNodeUri));
            } else {
                log.debug("Reusing connection: {}", min.get());
                Connection connection3 = min.get();
                list.remove(connection3);
                connection = new Connection(connection3.getUri(), connection3.getFlowHandler());
            }
            list.add(connection);
            this.connectionMap.put(pravegaNodeUri, list);
            thenApply = connection.getFlowHandler().thenApply(flowHandler -> {
                return flowHandler.createFlow(flow, replyProcessor);
            });
        }
        return thenApply;
    }

    @Override // io.pravega.client.netty.impl.ConnectionPool
    public CompletableFuture<ClientConnection> getClientConnection(PravegaNodeUri pravegaNodeUri, ReplyProcessor replyProcessor) {
        Preconditions.checkNotNull(pravegaNodeUri, "Location");
        Preconditions.checkNotNull(replyProcessor, "ReplyProcessor");
        Exceptions.checkNotClosed(this.closed.get(), this);
        return new Connection(pravegaNodeUri, establishConnection(pravegaNodeUri)).getFlowHandler().thenApply(flowHandler -> {
            return flowHandler.createConnectionWithFlowDisabled(replyProcessor);
        });
    }

    private boolean isUnused(Connection connection) {
        return connection.getFlowCount().isPresent() && connection.getFlowCount().get().intValue() == 0;
    }

    @VisibleForTesting
    public void pruneUnusedConnections() {
        Iterator<List<Connection>> it = this.connectionMap.values().iterator();
        while (it.hasNext()) {
            Iterator<Connection> it2 = it.next().iterator();
            while (it2.hasNext()) {
                Connection next = it2.next();
                if (isUnused(next)) {
                    next.getFlowHandler().join().close();
                    it2.remove();
                }
            }
        }
    }

    @Override // io.pravega.client.netty.impl.ConnectionPool
    public int getActiveChannelCount() {
        return getActiveChannels().size();
    }

    @VisibleForTesting
    public List<Channel> getActiveChannels() {
        return (List) this.channelGroup.stream().filter((v0) -> {
            return v0.isActive();
        }).peek(channel -> {
            log.debug("Channel with id {} localAddress {} and remoteAddress {} is active.", new Object[]{channel.id(), channel.localAddress(), channel.remoteAddress()});
        }).collect(Collectors.toList());
    }

    private CompletableFuture<FlowHandler> establishConnection(PravegaNodeUri pravegaNodeUri) {
        AppendBatchSizeTrackerImpl appendBatchSizeTrackerImpl = new AppendBatchSizeTrackerImpl();
        FlowHandler flowHandler = new FlowHandler(pravegaNodeUri.getEndpoint(), appendBatchSizeTrackerImpl);
        Bootstrap handler = getNettyBootstrap().handler(getChannelInitializer(pravegaNodeUri, appendBatchSizeTrackerImpl, flowHandler));
        CompletableFuture completableFuture = new CompletableFuture();
        try {
            handler.connect(pravegaNodeUri.getEndpoint(), pravegaNodeUri.getPort()).addListener2((GenericFutureListener<? extends Future<? super Void>>) channelFuture -> {
                if (!channelFuture.isSuccess()) {
                    completableFuture.completeExceptionally(new ConnectionFailedException(channelFuture.cause()));
                    return;
                }
                Channel channel = channelFuture.channel();
                log.debug("Connect operation completed for channel:{}, local address:{}, remote address:{}", new Object[]{channel.id(), channel.localAddress(), channel.remoteAddress()});
                this.channelGroup.add(channel);
                completableFuture.complete(flowHandler);
            });
        } catch (Exception e) {
            completableFuture.completeExceptionally(new ConnectionFailedException(e));
        }
        CompletableFuture<Void> completableFuture2 = new CompletableFuture<>();
        flowHandler.completeWhenRegistered(completableFuture2);
        return completableFuture.thenCombine((CompletionStage) completableFuture2, (flowHandler2, r3) -> {
            return flowHandler2;
        });
    }

    private Bootstrap getNettyBootstrap() {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(this.group).channel(Epoll.isAvailable() ? EpollSocketChannel.class : NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true);
        return bootstrap;
    }

    private ChannelInitializer<SocketChannel> getChannelInitializer(final PravegaNodeUri pravegaNodeUri, final AppendBatchSizeTracker appendBatchSizeTracker, final FlowHandler flowHandler) {
        final SslContext sslContext = getSslContext();
        return new ChannelInitializer<SocketChannel>() { // from class: io.pravega.client.netty.impl.ConnectionPoolImpl.2
            @Override // io.netty.channel.ChannelInitializer
            public void initChannel(SocketChannel socketChannel) throws Exception {
                ChannelPipeline pipeline = socketChannel.pipeline();
                if (sslContext != null) {
                    SslHandler newHandler = sslContext.newHandler(socketChannel.alloc(), pravegaNodeUri.getEndpoint(), pravegaNodeUri.getPort());
                    if (ConnectionPoolImpl.this.clientConfig.isValidateHostName()) {
                        SSLEngine engine = newHandler.engine();
                        SSLParameters sSLParameters = engine.getSSLParameters();
                        sSLParameters.setEndpointIdentificationAlgorithm("HTTPS");
                        engine.setSSLParameters(sSLParameters);
                    }
                    pipeline.addLast(newHandler);
                }
                pipeline.addLast(new ExceptionLoggingHandler(pravegaNodeUri.getEndpoint()), new CommandEncoder(appendBatchSizeTracker), new LengthFieldBasedFrameDecoder(WireCommands.MAX_WIRECOMMAND_SIZE, 4, 4), new CommandDecoder(), flowHandler);
            }
        };
    }

    private SslContext getSslContext() {
        SslContextBuilder trustManager;
        SslContext build;
        if (this.clientConfig.isEnableTls()) {
            try {
                SslContextBuilder forClient = SslContextBuilder.forClient();
                if (Strings.isNullOrEmpty(this.clientConfig.getTrustStore())) {
                    trustManager = forClient.trustManager(FingerprintTrustManagerFactory.getInstance(FingerprintTrustManagerFactory.getDefaultAlgorithm()));
                    log.debug("SslContextBuilder was set to an instance of {}", FingerprintTrustManagerFactory.class);
                } else {
                    trustManager = SslContextBuilder.forClient().trustManager(new File(this.clientConfig.getTrustStore()));
                }
                build = trustManager.build();
            } catch (NoSuchAlgorithmException | SSLException e) {
                throw new RuntimeException(e);
            }
        } else {
            build = null;
        }
        return build;
    }

    private EventLoopGroup getEventLoopGroup() {
        if (Epoll.isAvailable()) {
            return new EpollEventLoopGroup();
        }
        log.warn("Epoll not available. Falling back on NIO.");
        return new NioEventLoopGroup();
    }

    @Override // io.pravega.client.netty.impl.ConnectionPool, java.lang.AutoCloseable
    public void close() {
        log.info("Shutting down connection pool");
        if (this.closed.compareAndSet(false, true)) {
            this.group.shutdownGracefully();
        }
    }

    @SuppressFBWarnings(justification = "generated code")
    ChannelGroup getChannelGroup() {
        return this.channelGroup;
    }
}
