package org.apache.pulsar.client.impl;

import com.google.common.annotations.VisibleForTesting;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.resolver.dns.DnsNameResolver;
import io.netty.resolver.dns.DnsNameResolverBuilder;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.5.0.jar:META-INF/bundled-dependencies/pulsar-client-original-2.5.0.jar:org/apache/pulsar/client/impl/ConnectionPool.class
 */
/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-client-original-2.5.0.jar:org/apache/pulsar/client/impl/ConnectionPool.class */
public class ConnectionPool implements Closeable {
    protected final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool;
    private final Bootstrap bootstrap;
    private final EventLoopGroup eventLoopGroup;
    private final int maxConnectionsPerHosts;
    protected final DnsNameResolver dnsResolver;
    private static final Random random = new Random();
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ConnectionPool.class);

    public ConnectionPool(ClientConfigurationData clientConfigurationData, EventLoopGroup eventLoopGroup) throws PulsarClientException {
        this(clientConfigurationData, eventLoopGroup, () -> {
            return new ClientCnx(clientConfigurationData, eventLoopGroup);
        });
    }

    public ConnectionPool(ClientConfigurationData clientConfigurationData, EventLoopGroup eventLoopGroup, Supplier<ClientCnx> supplier) throws PulsarClientException {
        this.eventLoopGroup = eventLoopGroup;
        this.maxConnectionsPerHosts = clientConfigurationData.getConnectionsPerBroker();
        this.pool = new ConcurrentHashMap<>();
        this.bootstrap = new Bootstrap();
        this.bootstrap.group(eventLoopGroup);
        this.bootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
        this.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(clientConfigurationData.getConnectionTimeoutMs()));
        this.bootstrap.option(ChannelOption.TCP_NODELAY, Boolean.valueOf(clientConfigurationData.isUseTcpNoDelay()));
        this.bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
        try {
            this.bootstrap.handler(new PulsarChannelInitializer(clientConfigurationData, supplier));
            this.dnsResolver = new DnsNameResolverBuilder(eventLoopGroup.next()).traceEnabled(true).channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup)).build();
        } catch (Exception e) {
            log.error("Failed to create channel initializer");
            throw new PulsarClientException(e);
        }
    }

    public CompletableFuture<ClientCnx> getConnection(InetSocketAddress inetSocketAddress) {
        return getConnection(inetSocketAddress, inetSocketAddress);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeAllConnections() {
        this.pool.values().forEach(concurrentMap -> {
            concurrentMap.values().forEach(completableFuture -> {
                if (!completableFuture.isDone()) {
                    completableFuture.thenAccept((v0) -> {
                        v0.close();
                    });
                } else {
                    if (completableFuture.isCompletedExceptionally()) {
                        return;
                    }
                    ((ClientCnx) completableFuture.join()).close();
                }
            });
        });
    }

    public CompletableFuture<ClientCnx> getConnection(InetSocketAddress inetSocketAddress, InetSocketAddress inetSocketAddress2) {
        if (this.maxConnectionsPerHosts == 0) {
            return createConnection(inetSocketAddress, inetSocketAddress2, -1);
        }
        int signSafeMod = signSafeMod(random.nextInt(), this.maxConnectionsPerHosts);
        return this.pool.computeIfAbsent(inetSocketAddress, inetSocketAddress3 -> {
            return new ConcurrentHashMap();
        }).computeIfAbsent(Integer.valueOf(signSafeMod), num -> {
            return createConnection(inetSocketAddress, inetSocketAddress2, signSafeMod);
        });
    }

    private CompletableFuture<ClientCnx> createConnection(InetSocketAddress inetSocketAddress, InetSocketAddress inetSocketAddress2, int i) {
        if (log.isDebugEnabled()) {
            log.debug("Connection for {} not found in cache", inetSocketAddress);
        }
        CompletableFuture<ClientCnx> completableFuture = new CompletableFuture<>();
        createConnection(inetSocketAddress2).thenAccept(channel -> {
            log.info("[{}] Connected to server", channel);
            channel.closeFuture().addListener2(future -> {
                if (log.isDebugEnabled()) {
                    log.debug("Removing closed connection from pool: {}", future);
                }
                cleanupConnection(inetSocketAddress, i, completableFuture);
            });
            ClientCnx clientCnx = (ClientCnx) channel.pipeline().get("handler");
            if (!channel.isActive() || clientCnx == null) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Connection was already closed by the time we got notified", channel);
                }
                completableFuture.completeExceptionally(new ChannelException("Connection already closed"));
            } else {
                if (!inetSocketAddress.equals(inetSocketAddress2)) {
                    clientCnx.setTargetBroker(inetSocketAddress);
                }
                clientCnx.setRemoteHostName(inetSocketAddress2.getHostName());
                clientCnx.connectionFuture().thenRun(() -> {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Connection handshake completed", clientCnx.channel());
                    }
                    completableFuture.complete(clientCnx);
                }).exceptionally(th -> {
                    log.warn("[{}] Connection handshake failed: {}", clientCnx.channel(), th.getMessage());
                    completableFuture.completeExceptionally(th);
                    cleanupConnection(inetSocketAddress, i, completableFuture);
                    clientCnx.ctx().close();
                    return null;
                });
            }
        }).exceptionally(th -> {
            this.eventLoopGroup.execute(() -> {
                log.warn("Failed to open connection to {} : {}", inetSocketAddress2, th.getMessage());
                cleanupConnection(inetSocketAddress, i, completableFuture);
                completableFuture.completeExceptionally(new PulsarClientException(th));
            });
            return null;
        });
        return completableFuture;
    }

    private CompletableFuture<Channel> createConnection(InetSocketAddress inetSocketAddress) {
        String hostString = inetSocketAddress.getHostString();
        int port = inetSocketAddress.getPort();
        return resolveName(hostString).thenCompose(list -> {
            return connectToResolvedAddresses(list.iterator(), port);
        });
    }

    private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> it, int i) {
        CompletableFuture<Channel> completableFuture = new CompletableFuture<>();
        connectToAddress(it.next(), i).thenAccept(channel -> {
            completableFuture.complete(channel);
        }).exceptionally(th -> {
            if (it.hasNext()) {
                connectToResolvedAddresses(it, i).thenAccept(channel2 -> {
                    completableFuture.complete(channel2);
                }).exceptionally(th -> {
                    completableFuture.completeExceptionally(th);
                    return null;
                });
                return null;
            }
            completableFuture.completeExceptionally(th);
            return null;
        });
        return completableFuture;
    }

    @VisibleForTesting
    CompletableFuture<List<InetAddress>> resolveName(String str) {
        CompletableFuture<List<InetAddress>> completableFuture = new CompletableFuture<>();
        this.dnsResolver.resolveAll(str).addListener2(future -> {
            if (future.isSuccess()) {
                completableFuture.complete(future.get());
            } else {
                completableFuture.completeExceptionally(future.cause());
            }
        });
        return completableFuture;
    }

    private CompletableFuture<Channel> connectToAddress(InetAddress inetAddress, int i) {
        CompletableFuture<Channel> completableFuture = new CompletableFuture<>();
        this.bootstrap.connect(inetAddress, i).addListener2(channelFuture -> {
            if (channelFuture.isSuccess()) {
                completableFuture.complete(channelFuture.channel());
            } else {
                completableFuture.completeExceptionally(channelFuture.cause());
            }
        });
        return completableFuture;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            this.eventLoopGroup.shutdownGracefully(0L, 1L, TimeUnit.SECONDS).await2();
        } catch (InterruptedException e) {
            log.warn("EventLoopGroup shutdown was interrupted", (Throwable) e);
        }
        this.dnsResolver.close();
    }

    private void cleanupConnection(InetSocketAddress inetSocketAddress, int i, CompletableFuture<ClientCnx> completableFuture) {
        ConcurrentMap<Integer, CompletableFuture<ClientCnx>> concurrentMap = this.pool.get(inetSocketAddress);
        if (concurrentMap != null) {
            concurrentMap.remove(Integer.valueOf(i), completableFuture);
        }
    }

    public static int signSafeMod(long j, int i) {
        int i2 = (int) (j % i);
        if (i2 < 0) {
            i2 += i;
        }
        return i2;
    }
}
