package org.apache.pulsar.client.impl;

import com.google.common.annotations.VisibleForTesting;
import io.kubernetes.client.openapi.models.V1RuntimeClass;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.resolver.AddressResolver;
import io.netty.resolver.dns.DnsAddressResolverGroup;
import io.netty.resolver.dns.DnsNameResolverBuilder;
import io.netty.resolver.dns.SequentialDnsServerAddressStreamProvider;
import io.netty.util.concurrent.ScheduledFuture;
import io.opentelemetry.api.common.Attributes;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.metrics.Counter;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.client.impl.metrics.Unit;
import org.apache.pulsar.client.util.MathUtils;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.netty.ChannelFutures;
import org.apache.pulsar.common.util.netty.DnsResolverUtil;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-client-original-4.0.4.2.jar:org/apache/pulsar/client/impl/ConnectionPool.class */
public class ConnectionPool implements AutoCloseable {
    public static final int IDLE_DETECTION_INTERVAL_SECONDS_MIN = 15;
    protected final ConcurrentMap<Key, CompletableFuture<ClientCnx>> pool;
    private final Bootstrap bootstrap;
    private final PulsarChannelInitializer channelInitializerHandler;
    private final ClientConfigurationData clientConfig;
    private final EventLoopGroup eventLoopGroup;
    private final int maxConnectionsPerHosts;
    private final boolean isSniProxy;
    protected final AddressResolver<InetSocketAddress> addressResolver;
    private final boolean shouldCloseDnsResolver;

    @VisibleForTesting
    int connectionMaxIdleSeconds;
    private int idleDetectionIntervalSeconds;
    private boolean autoReleaseIdleConnectionsEnabled;
    private ScheduledFuture asyncReleaseUselessConnectionsTask;
    private final Counter connectionsTcpFailureCounter;
    private final Counter connectionsHandshakeFailureCounter;
    private static final Random random = new Random();
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ConnectionPool.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-client-original-4.0.4.2.jar:org/apache/pulsar/client/impl/ConnectionPool$Key.class */
    public static final class Key {
        private final InetSocketAddress logicalAddress;
        private final InetSocketAddress physicalAddress;
        private final int randomKey;

        @Generated
        public Key(InetSocketAddress inetSocketAddress, InetSocketAddress inetSocketAddress2, int i) {
            this.logicalAddress = inetSocketAddress;
            this.physicalAddress = inetSocketAddress2;
            this.randomKey = i;
        }

        @Generated
        public InetSocketAddress getLogicalAddress() {
            return this.logicalAddress;
        }

        @Generated
        public InetSocketAddress getPhysicalAddress() {
            return this.physicalAddress;
        }

        @Generated
        public int getRandomKey() {
            return this.randomKey;
        }

        @Generated
        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof Key)) {
                return false;
            }
            Key key = (Key) obj;
            if (getRandomKey() != key.getRandomKey()) {
                return false;
            }
            InetSocketAddress logicalAddress = getLogicalAddress();
            InetSocketAddress logicalAddress2 = key.getLogicalAddress();
            if (logicalAddress == null) {
                if (logicalAddress2 != null) {
                    return false;
                }
            } else if (!logicalAddress.equals(logicalAddress2)) {
                return false;
            }
            InetSocketAddress physicalAddress = getPhysicalAddress();
            InetSocketAddress physicalAddress2 = key.getPhysicalAddress();
            return physicalAddress == null ? physicalAddress2 == null : physicalAddress.equals(physicalAddress2);
        }

        @Generated
        public int hashCode() {
            int randomKey = (1 * 59) + getRandomKey();
            InetSocketAddress logicalAddress = getLogicalAddress();
            int hashCode = (randomKey * 59) + (logicalAddress == null ? 43 : logicalAddress.hashCode());
            InetSocketAddress physicalAddress = getPhysicalAddress();
            return (hashCode * 59) + (physicalAddress == null ? 43 : physicalAddress.hashCode());
        }

        @Generated
        public String toString() {
            return "ConnectionPool.Key(logicalAddress=" + getLogicalAddress() + ", physicalAddress=" + getPhysicalAddress() + ", randomKey=" + getRandomKey() + DefaultExpressionEngine.DEFAULT_INDEX_END;
        }
    }

    public ConnectionPool(InstrumentProvider instrumentProvider, ClientConfigurationData clientConfigurationData, EventLoopGroup eventLoopGroup, ScheduledExecutorService scheduledExecutorService) throws PulsarClientException {
        this(instrumentProvider, clientConfigurationData, eventLoopGroup, () -> {
            return new ClientCnx(instrumentProvider, clientConfigurationData, eventLoopGroup);
        }, scheduledExecutorService);
    }

    public ConnectionPool(InstrumentProvider instrumentProvider, ClientConfigurationData clientConfigurationData, EventLoopGroup eventLoopGroup, Supplier<ClientCnx> supplier, ScheduledExecutorService scheduledExecutorService) throws PulsarClientException {
        this(instrumentProvider, clientConfigurationData, eventLoopGroup, supplier, Optional.empty(), scheduledExecutorService);
    }

    public ConnectionPool(InstrumentProvider instrumentProvider, ClientConfigurationData clientConfigurationData, EventLoopGroup eventLoopGroup, Supplier<ClientCnx> supplier, Optional<AddressResolver<InetSocketAddress>> optional, ScheduledExecutorService scheduledExecutorService) throws PulsarClientException {
        this.eventLoopGroup = eventLoopGroup;
        this.clientConfig = clientConfigurationData;
        this.maxConnectionsPerHosts = clientConfigurationData.getConnectionsPerBroker();
        this.isSniProxy = this.clientConfig.isUseTls() && this.clientConfig.getProxyProtocol() != null && StringUtils.isNotBlank(this.clientConfig.getProxyServiceUrl());
        this.pool = new ConcurrentHashMap();
        this.bootstrap = new Bootstrap();
        this.bootstrap.group(eventLoopGroup);
        this.bootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
        this.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(clientConfigurationData.getConnectionTimeoutMs()));
        this.bootstrap.option(ChannelOption.TCP_NODELAY, Boolean.valueOf(clientConfigurationData.isUseTcpNoDelay()));
        this.bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
        try {
            this.channelInitializerHandler = new PulsarChannelInitializer(clientConfigurationData, supplier, scheduledExecutorService);
            this.bootstrap.handler(this.channelInitializerHandler);
            this.shouldCloseDnsResolver = !optional.isPresent();
            this.addressResolver = optional.orElseGet(() -> {
                return createAddressResolver(clientConfigurationData, eventLoopGroup);
            });
            this.connectionMaxIdleSeconds = clientConfigurationData.getConnectionMaxIdleSeconds();
            this.autoReleaseIdleConnectionsEnabled = this.connectionMaxIdleSeconds > 0;
            if (this.autoReleaseIdleConnectionsEnabled) {
                this.idleDetectionIntervalSeconds = this.connectionMaxIdleSeconds;
                if (this.idleDetectionIntervalSeconds < 15) {
                    log.warn("Connection idle detect interval seconds default same as max idle seconds, but max idle seconds less than 15, to avoid checking connection status too much, use default value : 15");
                    this.idleDetectionIntervalSeconds = 15;
                }
                this.asyncReleaseUselessConnectionsTask = eventLoopGroup.scheduleWithFixedDelay(() -> {
                    try {
                        doMarkAndReleaseUselessConnections();
                    } catch (Exception e) {
                        log.error("Auto release useless connections failure.", (Throwable) e);
                    }
                }, this.idleDetectionIntervalSeconds, this.idleDetectionIntervalSeconds, TimeUnit.SECONDS);
            }
            this.connectionsTcpFailureCounter = instrumentProvider.newCounter("pulsar.client.connection.failed", Unit.Connections, "The number of failed connection attempts", null, Attributes.builder().put("pulsar.failure.type", "tcp-failed").build());
            this.connectionsHandshakeFailureCounter = instrumentProvider.newCounter("pulsar.client.connection.failed", Unit.Connections, "The number of failed connection attempts", null, Attributes.builder().put("pulsar.failure.type", "handshake").build());
        } catch (Exception e) {
            log.error("Failed to create channel initializer");
            throw new PulsarClientException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static AddressResolver<InetSocketAddress> createAddressResolver(ClientConfigurationData clientConfigurationData, EventLoopGroup eventLoopGroup) {
        DnsNameResolverBuilder socketChannelType = new DnsNameResolverBuilder().traceEnabled(true).channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup)).socketChannelType(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup), true);
        if (clientConfigurationData.getDnsLookupBindAddress() != null) {
            socketChannelType.localAddress(new InetSocketAddress(clientConfigurationData.getDnsLookupBindAddress(), clientConfigurationData.getDnsLookupBindPort()));
        }
        List<InetSocketAddress> dnsServerAddresses = clientConfigurationData.getDnsServerAddresses();
        if (dnsServerAddresses != null && !dnsServerAddresses.isEmpty()) {
            socketChannelType.nameServerProvider(new SequentialDnsServerAddressStreamProvider(dnsServerAddresses));
        }
        DnsResolverUtil.applyJdkDnsCacheSettings(socketChannelType);
        return new DnsAddressResolverGroup(socketChannelType).getResolver(eventLoopGroup.next());
    }

    public int genRandomKeyToSelectCon() {
        if (this.maxConnectionsPerHosts == 0) {
            return -1;
        }
        return MathUtils.signSafeMod(random.nextInt(), this.maxConnectionsPerHosts);
    }

    public CompletableFuture<ClientCnx> getConnection(InetSocketAddress inetSocketAddress) {
        return this.maxConnectionsPerHosts == 0 ? getConnection(inetSocketAddress, inetSocketAddress, -1) : getConnection(inetSocketAddress, inetSocketAddress, MathUtils.signSafeMod(random.nextInt(), this.maxConnectionsPerHosts));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeAllConnections() {
        this.pool.values().forEach(completableFuture -> {
            if (!completableFuture.isDone()) {
                completableFuture.thenAccept((v0) -> {
                    v0.close();
                });
            } else {
                if (completableFuture.isCompletedExceptionally()) {
                    return;
                }
                ((ClientCnx) completableFuture.join()).close();
            }
        });
    }

    public CompletableFuture<ClientCnx> getConnection(InetSocketAddress inetSocketAddress, InetSocketAddress inetSocketAddress2, int i) {
        if (this.maxConnectionsPerHosts == 0) {
            return createConnection(new Key(inetSocketAddress, inetSocketAddress2, -1));
        }
        Key key = new Key(inetSocketAddress, inetSocketAddress2, i);
        CompletableFuture<ClientCnx> computeIfAbsent = this.pool.computeIfAbsent(key, key2 -> {
            return createConnection(key);
        });
        if (!computeIfAbsent.isCompletedExceptionally()) {
            return computeIfAbsent.thenCompose(clientCnx -> {
                if (clientCnx.getIdleState().isReleased()) {
                    this.pool.remove(key, computeIfAbsent);
                    return this.pool.computeIfAbsent(key, key3 -> {
                        return createConnection(key);
                    });
                }
                if (clientCnx.getIdleState().tryMarkUsingAndClearIdleTime()) {
                    return CompletableFuture.supplyAsync(() -> {
                        return clientCnx;
                    }, clientCnx.ctx().executor());
                }
                this.pool.remove(key, computeIfAbsent);
                return this.pool.computeIfAbsent(key, key4 -> {
                    return createConnection(key);
                });
            });
        }
        this.pool.remove(key, computeIfAbsent);
        return computeIfAbsent;
    }

    private CompletableFuture<ClientCnx> createConnection(Key key) {
        if (log.isDebugEnabled()) {
            log.debug("Connection for {} not found in cache", key.logicalAddress);
        }
        CompletableFuture<ClientCnx> completableFuture = new CompletableFuture<>();
        createConnection(key.logicalAddress, key.physicalAddress).thenAccept(channel -> {
            log.info("[{}] Connected to server", channel);
            channel.closeFuture().addListener2(future -> {
                if (log.isDebugEnabled()) {
                    log.debug("Removing closed connection from pool: {}", future);
                }
                this.pool.remove(key, completableFuture);
            });
            ClientCnx clientCnx = (ClientCnx) channel.pipeline().get(V1RuntimeClass.SERIALIZED_NAME_HANDLER);
            if (channel.isActive() && clientCnx != null) {
                clientCnx.connectionFuture().thenRun(() -> {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Connection handshake completed", clientCnx.channel());
                    }
                    completableFuture.complete(clientCnx);
                }).exceptionally(th -> {
                    this.connectionsHandshakeFailureCounter.increment();
                    log.warn("[{}] Connection handshake failed: {}", clientCnx.channel(), th.getMessage());
                    completableFuture.completeExceptionally(th);
                    this.pool.remove(key, completableFuture);
                    clientCnx.ctx().close();
                    return null;
                });
                return;
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] Connection was already closed by the time we got notified", channel);
            }
            completableFuture.completeExceptionally(new ChannelException("Connection already closed"));
        }).exceptionally(th -> {
            this.connectionsTcpFailureCounter.increment();
            this.eventLoopGroup.execute(() -> {
                log.warn("Failed to open connection to {} : {}", key.physicalAddress, th.getMessage());
                this.pool.remove(key, completableFuture);
                completableFuture.completeExceptionally(new PulsarClientException(th));
            });
            return null;
        });
        return completableFuture;
    }

    private CompletableFuture<Channel> createConnection(InetSocketAddress inetSocketAddress, InetSocketAddress inetSocketAddress2) {
        CompletableFuture<List<InetSocketAddress>> resolveName;
        try {
            if (this.isSniProxy) {
                URI uri = new URI(this.clientConfig.getProxyServiceUrl());
                resolveName = resolveName(InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort()));
            } else {
                resolveName = resolveName(inetSocketAddress2);
            }
            return resolveName.thenCompose(list -> {
                return connectToResolvedAddresses(inetSocketAddress, inetSocketAddress2, list.iterator(), this.isSniProxy ? inetSocketAddress2 : null);
            });
        } catch (URISyntaxException e) {
            log.error("Invalid Proxy url {}", this.clientConfig.getProxyServiceUrl(), e);
            return FutureUtil.failedFuture(new PulsarClientException.InvalidServiceURL("Invalid url " + this.clientConfig.getProxyServiceUrl(), e));
        }
    }

    private CompletableFuture<Channel> connectToResolvedAddresses(InetSocketAddress inetSocketAddress, InetSocketAddress inetSocketAddress2, Iterator<InetSocketAddress> it2, InetSocketAddress inetSocketAddress3) {
        CompletableFuture<Channel> completableFuture = new CompletableFuture<>();
        CompletableFuture<Channel> connectToAddress = connectToAddress(inetSocketAddress, it2.next(), inetSocketAddress2, inetSocketAddress3);
        Objects.requireNonNull(completableFuture);
        connectToAddress.thenAccept((v1) -> {
            r1.complete(v1);
        }).exceptionally(th -> {
            if (!it2.hasNext()) {
                completableFuture.completeExceptionally(th);
                return null;
            }
            CompletableFuture<Channel> connectToResolvedAddresses = connectToResolvedAddresses(inetSocketAddress, inetSocketAddress2, it2, inetSocketAddress3);
            Objects.requireNonNull(completableFuture);
            connectToResolvedAddresses.thenAccept((v1) -> {
                r1.complete(v1);
            }).exceptionally(th -> {
                completableFuture.completeExceptionally(th);
                return null;
            });
            return null;
        });
        return completableFuture;
    }

    CompletableFuture<List<InetSocketAddress>> resolveName(InetSocketAddress inetSocketAddress) {
        CompletableFuture<List<InetSocketAddress>> completableFuture = new CompletableFuture<>();
        this.addressResolver.resolveAll(inetSocketAddress).addListener2(future -> {
            if (future.isSuccess()) {
                completableFuture.complete((List) future.get());
            } else {
                completableFuture.completeExceptionally(future.cause());
            }
        });
        return completableFuture;
    }

    private CompletableFuture<Channel> connectToAddress(InetSocketAddress inetSocketAddress, InetSocketAddress inetSocketAddress2, InetSocketAddress inetSocketAddress3, InetSocketAddress inetSocketAddress4) {
        if (this.clientConfig.isUseTls()) {
            CompletableFuture<U> thenCompose = ChannelFutures.toCompletableFuture(this.bootstrap.register()).thenCompose(channel -> {
                return this.channelInitializerHandler.initTls(channel, inetSocketAddress4 != null ? inetSocketAddress4 : inetSocketAddress2);
            });
            PulsarChannelInitializer pulsarChannelInitializer = this.channelInitializerHandler;
            Objects.requireNonNull(pulsarChannelInitializer);
            return thenCompose.thenCompose((Function<? super U, ? extends CompletionStage<U>>) pulsarChannelInitializer::initSocks5IfConfig).thenCompose(channel2 -> {
                return this.channelInitializerHandler.initializeClientCnx(channel2, inetSocketAddress, inetSocketAddress3);
            }).thenCompose(channel3 -> {
                return ChannelFutures.toCompletableFuture(channel3.connect(inetSocketAddress2));
            });
        }
        CompletableFuture<Channel> completableFuture = ChannelFutures.toCompletableFuture(this.bootstrap.register());
        PulsarChannelInitializer pulsarChannelInitializer2 = this.channelInitializerHandler;
        Objects.requireNonNull(pulsarChannelInitializer2);
        return completableFuture.thenCompose(pulsarChannelInitializer2::initSocks5IfConfig).thenCompose((Function<? super U, ? extends CompletionStage<U>>) channel4 -> {
            return this.channelInitializerHandler.initializeClientCnx(channel4, inetSocketAddress, inetSocketAddress3);
        }).thenCompose(channel5 -> {
            return ChannelFutures.toCompletableFuture(channel5.connect(inetSocketAddress2));
        });
    }

    public void releaseConnection(ClientCnx clientCnx) {
        if (this.maxConnectionsPerHosts == 0 && clientCnx.channel().isActive()) {
            if (log.isDebugEnabled()) {
                log.debug("close connection due to pooling disabled.");
            }
            clientCnx.close();
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        closeAllConnections();
        if (this.shouldCloseDnsResolver) {
            this.addressResolver.close();
        }
        if (this.asyncReleaseUselessConnectionsTask == null || this.asyncReleaseUselessConnectionsTask.isCancelled()) {
            return;
        }
        this.asyncReleaseUselessConnectionsTask.cancel(false);
    }

    @VisibleForTesting
    int getPoolSize() {
        return this.pool.size();
    }

    public void doMarkAndReleaseUselessConnections() {
        ClientCnx join;
        if (this.autoReleaseIdleConnectionsEnabled) {
            ArrayList arrayList = new ArrayList();
            for (Map.Entry<Key, CompletableFuture<ClientCnx>> entry : this.pool.entrySet()) {
                CompletableFuture<ClientCnx> value = entry.getValue();
                if (value.isDone() && !value.isCompletedExceptionally() && (join = value.join()) != null) {
                    join.getIdleState().doIdleDetect(this.connectionMaxIdleSeconds);
                    if (join.getIdleState().isReleasing()) {
                        arrayList.add(() -> {
                            if (join.getIdleState().tryMarkReleasedAndCloseConnection()) {
                                this.pool.remove(entry.getKey(), value);
                            }
                        });
                    }
                }
            }
            arrayList.forEach((v0) -> {
                v0.run();
            });
        }
    }

    public Set<CompletableFuture<ClientCnx>> getConnections() {
        return Collections.unmodifiableSet((Set) this.pool.values().stream().collect(Collectors.toSet()));
    }
}
