package org.apache.pulsar.client.impl;

import com.mysql.cj.CharsetMapping;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.opentelemetry.api.common.Attributes;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
import org.apache.pulsar.common.lookup.GetTopicsResult;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-client-original-3.3.2.4.jar:org/apache/pulsar/client/impl/BinaryProtoLookupService.class */
public class BinaryProtoLookupService implements LookupService {
    private final PulsarClientImpl client;
    private final ServiceNameResolver serviceNameResolver;
    private final boolean useTls;
    private final ExecutorService scheduleExecutor;
    private final String listenerName;
    private final int maxLookupRedirects;
    private final ExecutorService lookupPinnedExecutor;
    private final boolean createdLookupPinnedExecutor;
    private final ConcurrentHashMap<TopicName, CompletableFuture<LookupTopicResult>> lookupInProgress;
    private final ConcurrentHashMap<TopicName, CompletableFuture<PartitionedTopicMetadata>> partitionedMetadataInProgress;
    private final LatencyHistogram histoGetBroker;
    private final LatencyHistogram histoGetTopicMetadata;
    private final LatencyHistogram histoGetSchema;
    private final LatencyHistogram histoListTopics;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) BinaryProtoLookupService.class);

    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-client-original-3.3.2.4.jar:org/apache/pulsar/client/impl/BinaryProtoLookupService$LookupDataResult.class */
    public static class LookupDataResult {
        public final String brokerUrl;
        public final String brokerUrlTls;
        public final int partitions;
        public final boolean authoritative;
        public final boolean proxyThroughServiceUrl;
        public final boolean redirect;

        public LookupDataResult(CommandLookupTopicResponse commandLookupTopicResponse) {
            this.brokerUrl = commandLookupTopicResponse.hasBrokerServiceUrl() ? commandLookupTopicResponse.getBrokerServiceUrl() : null;
            this.brokerUrlTls = commandLookupTopicResponse.hasBrokerServiceUrlTls() ? commandLookupTopicResponse.getBrokerServiceUrlTls() : null;
            this.authoritative = commandLookupTopicResponse.isAuthoritative();
            this.redirect = commandLookupTopicResponse.hasResponse() && commandLookupTopicResponse.getResponse() == CommandLookupTopicResponse.LookupType.Redirect;
            this.proxyThroughServiceUrl = commandLookupTopicResponse.isProxyThroughServiceUrl();
            this.partitions = -1;
        }

        public LookupDataResult(int i) {
            this.partitions = i;
            this.brokerUrl = null;
            this.brokerUrlTls = null;
            this.authoritative = false;
            this.proxyThroughServiceUrl = false;
            this.redirect = false;
        }
    }

    @Deprecated
    public BinaryProtoLookupService(PulsarClientImpl pulsarClientImpl, String str, boolean z, ExecutorService executorService) throws PulsarClientException {
        this(pulsarClientImpl, str, null, z, executorService);
    }

    @Deprecated
    public BinaryProtoLookupService(PulsarClientImpl pulsarClientImpl, String str, String str2, boolean z, ExecutorService executorService) throws PulsarClientException {
        this(pulsarClientImpl, str, str2, z, executorService, null);
    }

    public BinaryProtoLookupService(PulsarClientImpl pulsarClientImpl, String str, String str2, boolean z, ExecutorService executorService, ExecutorService executorService2) throws PulsarClientException {
        this.lookupInProgress = new ConcurrentHashMap<>();
        this.partitionedMetadataInProgress = new ConcurrentHashMap<>();
        this.client = pulsarClientImpl;
        this.useTls = z;
        this.scheduleExecutor = executorService;
        this.maxLookupRedirects = pulsarClientImpl.getConfiguration().getMaxLookupRedirects();
        this.serviceNameResolver = new PulsarServiceNameResolver();
        this.listenerName = str2;
        updateServiceUrl(str);
        LatencyHistogram newLatencyHistogram = pulsarClientImpl.instrumentProvider().newLatencyHistogram("pulsar.client.lookup.duration", "Duration of lookup operations", null, Attributes.builder().put("pulsar.lookup.transport-type", CharsetMapping.MYSQL_CHARSET_NAME_binary).build());
        this.histoGetBroker = newLatencyHistogram.withAttributes(Attributes.builder().put("pulsar.lookup.type", "topic").build());
        this.histoGetTopicMetadata = newLatencyHistogram.withAttributes(Attributes.builder().put("pulsar.lookup.type", "metadata").build());
        this.histoGetSchema = newLatencyHistogram.withAttributes(Attributes.builder().put("pulsar.lookup.type", "schema").build());
        this.histoListTopics = newLatencyHistogram.withAttributes(Attributes.builder().put("pulsar.lookup.type", "list-topics").build());
        if (executorService2 == null) {
            this.createdLookupPinnedExecutor = true;
            this.lookupPinnedExecutor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("pulsar-client-binary-proto-lookup"));
        } else {
            this.createdLookupPinnedExecutor = false;
            this.lookupPinnedExecutor = executorService2;
        }
    }

    @Override // org.apache.pulsar.client.impl.LookupService
    public void updateServiceUrl(String str) throws PulsarClientException {
        this.serviceNameResolver.updateServiceUrl(str);
    }

    @Override // org.apache.pulsar.client.impl.LookupService
    public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName) {
        long nanoTime = System.nanoTime();
        MutableObject mutableObject = new MutableObject();
        try {
            CompletableFuture<LookupTopicResult> computeIfAbsent = this.lookupInProgress.computeIfAbsent(topicName, topicName2 -> {
                CompletableFuture<LookupTopicResult> findBroker = findBroker(this.serviceNameResolver.resolveHost(), false, topicName, 0);
                mutableObject.setValue(findBroker);
                findBroker.thenRun(() -> {
                    this.histoGetBroker.recordSuccess(System.nanoTime() - nanoTime);
                }).exceptionally(th -> {
                    this.histoGetBroker.recordFailure(System.nanoTime() - nanoTime);
                    return null;
                });
                return findBroker;
            });
            if (mutableObject.getValue2() != null) {
                ((CompletableFuture) mutableObject.getValue2()).whenComplete((obj, obj2) -> {
                    this.lookupInProgress.remove(topicName, mutableObject.getValue2());
                });
            }
            return computeIfAbsent;
        } catch (Throwable th) {
            if (mutableObject.getValue2() != null) {
                ((CompletableFuture) mutableObject.getValue2()).whenComplete((obj3, obj22) -> {
                    this.lookupInProgress.remove(topicName, mutableObject.getValue2());
                });
            }
            throw th;
        }
    }

    @Override // org.apache.pulsar.client.impl.LookupService
    public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName, boolean z, boolean z2) {
        MutableObject mutableObject = new MutableObject();
        try {
            CompletableFuture<PartitionedTopicMetadata> computeIfAbsent = this.partitionedMetadataInProgress.computeIfAbsent(topicName, topicName2 -> {
                CompletableFuture<PartitionedTopicMetadata> partitionedTopicMetadata = getPartitionedTopicMetadata(this.serviceNameResolver.resolveHost(), topicName, z, z2);
                mutableObject.setValue(partitionedTopicMetadata);
                return partitionedTopicMetadata;
            });
            if (mutableObject.getValue2() != null) {
                ((CompletableFuture) mutableObject.getValue2()).whenComplete((obj, obj2) -> {
                    this.partitionedMetadataInProgress.remove(topicName, mutableObject.getValue2());
                });
            }
            return computeIfAbsent;
        } catch (Throwable th) {
            if (mutableObject.getValue2() != null) {
                ((CompletableFuture) mutableObject.getValue2()).whenComplete((obj3, obj22) -> {
                    this.partitionedMetadataInProgress.remove(topicName, mutableObject.getValue2());
                });
            }
            throw th;
        }
    }

    private CompletableFuture<LookupTopicResult> findBroker(InetSocketAddress inetSocketAddress, boolean z, TopicName topicName, int i) {
        CompletableFuture<LookupTopicResult> completableFuture = new CompletableFuture<>();
        if (this.maxLookupRedirects <= 0 || i <= this.maxLookupRedirects) {
            this.client.getCnxPool().getConnection(inetSocketAddress).thenAcceptAsync(clientCnx -> {
                long newRequestId = this.client.newRequestId();
                clientCnx.newLookup(Commands.newLookup(topicName.toString(), this.listenerName, z, newRequestId), newRequestId).whenComplete((lookupDataResult, th) -> {
                    if (th != null) {
                        log.warn("[{}] failed to send lookup request : {}", topicName, th.getMessage());
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] Lookup response exception: {}", topicName, th);
                        }
                        completableFuture.completeExceptionally(th);
                    } else {
                        try {
                            URI uri = this.useTls ? new URI(lookupDataResult.brokerUrlTls) : new URI(lookupDataResult.brokerUrl);
                            InetSocketAddress createUnresolved = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
                            if (lookupDataResult.redirect) {
                                CompletableFuture<LookupTopicResult> findBroker = findBroker(createUnresolved, lookupDataResult.authoritative, topicName, i + 1);
                                Objects.requireNonNull(completableFuture);
                                findBroker.thenAccept((v1) -> {
                                    r1.complete(v1);
                                }).exceptionally(th -> {
                                    Throwable unwrapCompletionException = FutureUtil.unwrapCompletionException(th);
                                    if (i <= 0) {
                                        log.warn("[{}] lookup failed : {}", topicName, unwrapCompletionException.getMessage(), unwrapCompletionException);
                                    } else if (log.isDebugEnabled()) {
                                        log.debug("[{}] lookup redirection failed ({}) : {}", topicName, Integer.valueOf(i), unwrapCompletionException.getMessage());
                                    }
                                    completableFuture.completeExceptionally(unwrapCompletionException);
                                    return null;
                                });
                            } else if (lookupDataResult.proxyThroughServiceUrl) {
                                completableFuture.complete(new LookupTopicResult(createUnresolved, inetSocketAddress, true));
                            } else {
                                completableFuture.complete(new LookupTopicResult(createUnresolved, createUnresolved, false));
                            }
                        } catch (Exception e) {
                            log.warn("[{}] invalid url {} : {}", topicName, null, e.getMessage(), e);
                            completableFuture.completeExceptionally(e);
                        }
                    }
                    this.client.getCnxPool().releaseConnection(clientCnx);
                });
            }, (Executor) this.lookupPinnedExecutor).exceptionally(th -> {
                completableFuture.completeExceptionally(FutureUtil.unwrapCompletionException(th));
                return null;
            });
            return completableFuture;
        }
        completableFuture.completeExceptionally(new PulsarClientException.LookupException("Too many redirects: " + this.maxLookupRedirects));
        return completableFuture;
    }

    private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(InetSocketAddress inetSocketAddress, TopicName topicName, boolean z, boolean z2) {
        long nanoTime = System.nanoTime();
        CompletableFuture<PartitionedTopicMetadata> completableFuture = new CompletableFuture<>();
        this.client.getCnxPool().getConnection(inetSocketAddress).thenAcceptAsync(clientCnx -> {
            boolean z3 = z;
            if (!z && !clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()) {
                if (!z2) {
                    completableFuture.completeExceptionally(new PulsarClientException.FeatureNotSupportedException("The feature of getting partitions without auto-creation is not supported by the broker. Please upgrade the broker to version that supports PIP-344 to resolve this issue.", PulsarClientException.FailedFeatureCheck.SupportsGetPartitionedMetadataWithoutAutoCreation));
                    return;
                } else {
                    log.info("[{}] Using original behavior of getPartitionedTopicMetadata(topic) in getPartitionedTopicMetadata(topic, false) since the target broker does not support PIP-344 and fallback is enabled.", topicName);
                    z3 = true;
                }
            }
            long newRequestId = this.client.newRequestId();
            clientCnx.newLookup(Commands.newPartitionMetadataRequest(topicName.toString(), newRequestId, z3), newRequestId).whenComplete((lookupDataResult, th) -> {
                if (th != null) {
                    this.histoGetTopicMetadata.recordFailure(System.nanoTime() - nanoTime);
                    log.warn("[{}] failed to get Partitioned metadata : {}", topicName, th.getMessage(), th);
                    completableFuture.completeExceptionally(th);
                } else {
                    try {
                        this.histoGetTopicMetadata.recordSuccess(System.nanoTime() - nanoTime);
                        completableFuture.complete(new PartitionedTopicMetadata(lookupDataResult.partitions));
                    } catch (Exception e) {
                        completableFuture.completeExceptionally(new PulsarClientException.LookupException(String.format("Failed to parse partition-response redirect=%s, topic=%s, partitions with %s, error message %s", Boolean.valueOf(lookupDataResult.redirect), topicName, Integer.valueOf(lookupDataResult.partitions), e.getMessage())));
                    }
                }
                this.client.getCnxPool().releaseConnection(clientCnx);
            });
        }, (Executor) this.lookupPinnedExecutor).exceptionally(th -> {
            completableFuture.completeExceptionally(FutureUtil.unwrapCompletionException(th));
            return null;
        });
        return completableFuture;
    }

    @Override // org.apache.pulsar.client.impl.LookupService
    public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName) {
        return getSchema(topicName, null);
    }

    @Override // org.apache.pulsar.client.impl.LookupService
    public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, byte[] bArr) {
        long nanoTime = System.nanoTime();
        CompletableFuture<Optional<SchemaInfo>> completableFuture = new CompletableFuture<>();
        if (bArr != null && bArr.length == 0) {
            completableFuture.completeExceptionally(new SchemaSerializationException("Empty schema version"));
            return completableFuture;
        }
        this.client.getCnxPool().getConnection(this.serviceNameResolver.resolveHost()).thenAcceptAsync(clientCnx -> {
            long newRequestId = this.client.newRequestId();
            clientCnx.sendGetSchema(Commands.newGetSchema(newRequestId, topicName.toString(), Optional.ofNullable(BytesSchemaVersion.of(bArr))), newRequestId).whenComplete((optional, th) -> {
                if (th != null) {
                    this.histoGetSchema.recordFailure(System.nanoTime() - nanoTime);
                    log.warn("[{}] failed to get schema : {}", topicName, th.getMessage(), th);
                    completableFuture.completeExceptionally(th);
                } else {
                    this.histoGetSchema.recordSuccess(System.nanoTime() - nanoTime);
                    completableFuture.complete(optional);
                }
                this.client.getCnxPool().releaseConnection(clientCnx);
            });
        }, (Executor) this.lookupPinnedExecutor).exceptionally(th -> {
            completableFuture.completeExceptionally(FutureUtil.unwrapCompletionException(th));
            return null;
        });
        return completableFuture;
    }

    @Override // org.apache.pulsar.client.impl.LookupService
    public String getServiceUrl() {
        return this.serviceNameResolver.getServiceUrl();
    }

    @Override // org.apache.pulsar.client.impl.LookupService
    public InetSocketAddress resolveHost() {
        return this.serviceNameResolver.resolveHost();
    }

    @Override // org.apache.pulsar.client.impl.LookupService
    public CompletableFuture<GetTopicsResult> getTopicsUnderNamespace(NamespaceName namespaceName, CommandGetTopicsOfNamespace.Mode mode, String str, String str2) {
        CompletableFuture<GetTopicsResult> completableFuture = new CompletableFuture<>();
        AtomicLong atomicLong = new AtomicLong(this.client.getConfiguration().getOperationTimeoutMs());
        getTopicsUnderNamespace(this.serviceNameResolver.resolveHost(), namespaceName, new BackoffBuilder().setInitialTime(100L, TimeUnit.MILLISECONDS).setMandatoryStop(atomicLong.get() * 2, TimeUnit.MILLISECONDS).setMax(1L, TimeUnit.MINUTES).create(), atomicLong, completableFuture, mode, str, str2);
        return completableFuture;
    }

    private void getTopicsUnderNamespace(InetSocketAddress inetSocketAddress, NamespaceName namespaceName, Backoff backoff, AtomicLong atomicLong, CompletableFuture<GetTopicsResult> completableFuture, CommandGetTopicsOfNamespace.Mode mode, String str, String str2) {
        long nanoTime = System.nanoTime();
        this.client.getCnxPool().getConnection(inetSocketAddress).thenAcceptAsync(clientCnx -> {
            long newRequestId = this.client.newRequestId();
            clientCnx.newGetTopicsOfNamespace(Commands.newGetTopicsOfNamespaceRequest(namespaceName.toString(), newRequestId, mode, str, str2), newRequestId).whenComplete((getTopicsResult, th) -> {
                if (th != null) {
                    this.histoListTopics.recordFailure(System.nanoTime() - nanoTime);
                    completableFuture.completeExceptionally(th);
                } else {
                    this.histoListTopics.recordSuccess(System.nanoTime() - nanoTime);
                    if (log.isDebugEnabled()) {
                        log.debug("[namespace: {}] Success get topics list in request: {}", namespaceName, Long.valueOf(newRequestId));
                    }
                    completableFuture.complete(getTopicsResult);
                }
                this.client.getCnxPool().releaseConnection(clientCnx);
            });
        }, (Executor) this.lookupPinnedExecutor).exceptionally(th -> {
            long min = Math.min(backoff.next(), atomicLong.get());
            if (min <= 0) {
                completableFuture.completeExceptionally(new PulsarClientException.TimeoutException(String.format("Could not get topics of namespace %s within configured timeout", namespaceName.toString())));
                return null;
            }
            ((ScheduledExecutorService) this.scheduleExecutor).schedule(() -> {
                log.warn("[namespace: {}] Could not get connection while getTopicsUnderNamespace -- Will try again in {} ms", namespaceName, Long.valueOf(min));
                atomicLong.addAndGet(-min);
                getTopicsUnderNamespace(inetSocketAddress, namespaceName, backoff, atomicLong, completableFuture, mode, str, str2);
            }, min, TimeUnit.MILLISECONDS);
            return null;
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (!this.createdLookupPinnedExecutor || this.lookupPinnedExecutor == null || this.lookupPinnedExecutor.isShutdown()) {
            return;
        }
        this.lookupPinnedExecutor.shutdown();
    }
}
