package org.apache.pulsar.client.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.netty.channel.EventLoopGroup;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.net.InetSocketAddress;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.client.api.transaction.TransactionBuilder;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider;
import org.apache.pulsar.client.impl.transaction.TransactionBuilderImpl;
import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.rocksdb.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-client-original-2.10.0-rc-202112302205.jar:org/apache/pulsar/client/impl/PulsarClientImpl.class */
public class PulsarClientImpl implements PulsarClient {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PulsarClientImpl.class);
    protected final ClientConfigurationData conf;
    private LookupService lookup;
    private final ConnectionPool cnxPool;
    private final Timer timer;
    private boolean needStopTimer;
    private final ExecutorProvider externalExecutorProvider;
    private final ExecutorProvider internalExecutorService;
    private final boolean createdEventLoopGroup;
    private final boolean createdCnxPool;
    private final AtomicReference<State> state;
    private final Set<ProducerBase<?>> producers;
    private final Set<ConsumerBase<?>> consumers;
    private final AtomicLong producerIdGenerator;
    private final AtomicLong consumerIdGenerator;
    private final AtomicLong requestIdGenerator;
    protected final EventLoopGroup eventLoopGroup;
    private final MemoryLimitController memoryLimitController;
    private final LoadingCache<String, SchemaInfoProvider> schemaProviderLoadingCache;
    private final Clock clientClock;
    private TransactionCoordinatorClientImpl tcClient;

    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-client-original-2.10.0-rc-202112302205.jar:org/apache/pulsar/client/impl/PulsarClientImpl$State.class */
    public enum State {
        Open,
        Closing,
        Closed
    }

    public PulsarClientImpl(ClientConfigurationData clientConfigurationData) throws PulsarClientException {
        this(clientConfigurationData, getEventLoopGroup(clientConfigurationData), true);
    }

    public PulsarClientImpl(ClientConfigurationData clientConfigurationData, EventLoopGroup eventLoopGroup) throws PulsarClientException {
        this(clientConfigurationData, eventLoopGroup, new ConnectionPool(clientConfigurationData, eventLoopGroup), null, false, true);
    }

    public PulsarClientImpl(ClientConfigurationData clientConfigurationData, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool) throws PulsarClientException {
        this(clientConfigurationData, eventLoopGroup, connectionPool, null, false, false);
    }

    public PulsarClientImpl(ClientConfigurationData clientConfigurationData, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool, Timer timer) throws PulsarClientException {
        this(clientConfigurationData, eventLoopGroup, connectionPool, timer, false, false);
    }

    private PulsarClientImpl(ClientConfigurationData clientConfigurationData, EventLoopGroup eventLoopGroup, boolean z) throws PulsarClientException {
        this(clientConfigurationData, eventLoopGroup, new ConnectionPool(clientConfigurationData, eventLoopGroup), null, z, true);
    }

    private PulsarClientImpl(ClientConfigurationData clientConfigurationData, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool, Timer timer, boolean z, boolean z2) throws PulsarClientException {
        this.state = new AtomicReference<>();
        this.producers = Collections.newSetFromMap(new ConcurrentHashMap());
        this.consumers = Collections.newSetFromMap(new ConcurrentHashMap());
        this.producerIdGenerator = new AtomicLong();
        this.consumerIdGenerator = new AtomicLong();
        this.requestIdGenerator = new AtomicLong(ThreadLocalRandom.current().nextLong(0L, 4611686018427387903L));
        this.schemaProviderLoadingCache = CacheBuilder.newBuilder().maximumSize(RateLimiter.DEFAULT_REFILL_PERIOD_MICROS).expireAfterAccess(30L, TimeUnit.MINUTES).build(new CacheLoader<String, SchemaInfoProvider>() { // from class: org.apache.pulsar.client.impl.PulsarClientImpl.1
            @Override // com.google.common.cache.CacheLoader
            public SchemaInfoProvider load(String str) {
                return PulsarClientImpl.this.newSchemaProvider(str);
            }
        });
        try {
            this.createdEventLoopGroup = z;
            this.createdCnxPool = z2;
            if (clientConfigurationData == null || StringUtils.isBlank(clientConfigurationData.getServiceUrl()) || eventLoopGroup == null) {
                throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration");
            }
            this.eventLoopGroup = eventLoopGroup;
            setAuth(clientConfigurationData);
            this.conf = clientConfigurationData;
            this.clientClock = clientConfigurationData.getClock();
            clientConfigurationData.getAuthentication().start();
            this.cnxPool = connectionPool;
            this.externalExecutorProvider = new ExecutorProvider(clientConfigurationData.getNumListenerThreads(), "pulsar-external-listener");
            this.internalExecutorService = new ExecutorProvider(clientConfigurationData.getNumIoThreads(), "pulsar-client-internal");
            if (clientConfigurationData.getServiceUrl().startsWith("http")) {
                this.lookup = new HttpLookupService(clientConfigurationData, eventLoopGroup);
            } else {
                this.lookup = new BinaryProtoLookupService(this, clientConfigurationData.getServiceUrl(), clientConfigurationData.getListenerName(), clientConfigurationData.isUseTls(), this.externalExecutorProvider.getExecutor());
            }
            if (timer == null) {
                this.timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1L, TimeUnit.MILLISECONDS);
                this.needStopTimer = true;
            } else {
                this.timer = timer;
            }
            if (clientConfigurationData.isEnableTransaction()) {
                this.tcClient = new TransactionCoordinatorClientImpl(this);
                try {
                    this.tcClient.start();
                } catch (Throwable th) {
                    log.error("Start transactionCoordinatorClient error.", th);
                    throw new PulsarClientException(th);
                }
            }
            this.memoryLimitController = new MemoryLimitController(clientConfigurationData.getMemoryLimitBytes());
            this.state.set(State.Open);
        } catch (Throwable th2) {
            shutdown();
            shutdownEventLoopGroup(eventLoopGroup);
            closeCnxPool(connectionPool);
            throw th2;
        }
    }

    private void setAuth(ClientConfigurationData clientConfigurationData) throws PulsarClientException {
        if (StringUtils.isBlank(clientConfigurationData.getAuthPluginClassName())) {
            return;
        }
        if (StringUtils.isBlank(clientConfigurationData.getAuthParams()) && clientConfigurationData.getAuthParamMap() == null) {
            return;
        }
        if (StringUtils.isNotBlank(clientConfigurationData.getAuthParams())) {
            clientConfigurationData.setAuthentication(AuthenticationFactory.create(clientConfigurationData.getAuthPluginClassName(), clientConfigurationData.getAuthParams()));
        } else if (clientConfigurationData.getAuthParamMap() != null) {
            clientConfigurationData.setAuthentication(AuthenticationFactory.create(clientConfigurationData.getAuthPluginClassName(), clientConfigurationData.getAuthParamMap()));
        }
    }

    public ClientConfigurationData getConfiguration() {
        return this.conf;
    }

    public Clock getClientClock() {
        return this.clientClock;
    }

    public AtomicReference<State> getState() {
        return this.state;
    }

    @Override // org.apache.pulsar.client.api.PulsarClient
    public ProducerBuilder<byte[]> newProducer() {
        return new ProducerBuilderImpl(this, Schema.BYTES);
    }

    @Override // org.apache.pulsar.client.api.PulsarClient
    public <T> ProducerBuilder<T> newProducer(Schema<T> schema) {
        return new ProducerBuilderImpl(this, schema);
    }

    @Override // org.apache.pulsar.client.api.PulsarClient
    public ConsumerBuilder<byte[]> newConsumer() {
        return new ConsumerBuilderImpl(this, Schema.BYTES);
    }

    @Override // org.apache.pulsar.client.api.PulsarClient
    public <T> ConsumerBuilder<T> newConsumer(Schema<T> schema) {
        return new ConsumerBuilderImpl(this, schema);
    }

    @Override // org.apache.pulsar.client.api.PulsarClient
    public ReaderBuilder<byte[]> newReader() {
        return new ReaderBuilderImpl(this, Schema.BYTES);
    }

    @Override // org.apache.pulsar.client.api.PulsarClient
    public <T> ReaderBuilder<T> newReader(Schema<T> schema) {
        return new ReaderBuilderImpl(this, schema);
    }

    public CompletableFuture<Producer<byte[]>> createProducerAsync(ProducerConfigurationData producerConfigurationData) {
        return createProducerAsync(producerConfigurationData, Schema.BYTES, null);
    }

    public <T> CompletableFuture<Producer<T>> createProducerAsync(ProducerConfigurationData producerConfigurationData, Schema<T> schema) {
        return createProducerAsync(producerConfigurationData, schema, null);
    }

    public <T> CompletableFuture<Producer<T>> createProducerAsync(ProducerConfigurationData producerConfigurationData, Schema<T> schema, ProducerInterceptors producerInterceptors) {
        if (producerConfigurationData == null) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException("Producer configuration undefined"));
        }
        if (schema instanceof AutoConsumeSchema) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException("AutoConsumeSchema is only used by consumers to detect schemas automatically"));
        }
        if (this.state.get() != State.Open) {
            return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed : state = " + this.state.get()));
        }
        String topicName = producerConfigurationData.getTopicName();
        if (!TopicName.isValid(topicName)) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Invalid topic name: '" + topicName + "'"));
        }
        if (!(schema instanceof AutoProduceBytesSchema)) {
            return createProducerAsync(topicName, producerConfigurationData, schema, producerInterceptors);
        }
        AutoProduceBytesSchema autoProduceBytesSchema = (AutoProduceBytesSchema) schema;
        return autoProduceBytesSchema.schemaInitialized() ? createProducerAsync(topicName, producerConfigurationData, schema, producerInterceptors) : (CompletableFuture<Producer<T>>) this.lookup.getSchema(TopicName.get(producerConfigurationData.getTopicName())).thenCompose(optional -> {
            if (optional.isPresent()) {
                autoProduceBytesSchema.setSchema(Schema.getSchema((SchemaInfo) optional.get()));
            } else {
                autoProduceBytesSchema.setSchema(Schema.BYTES);
            }
            return createProducerAsync(topicName, producerConfigurationData, schema, producerInterceptors);
        });
    }

    private <T> CompletableFuture<Producer<T>> createProducerAsync(String str, ProducerConfigurationData producerConfigurationData, Schema<T> schema, ProducerInterceptors producerInterceptors) {
        CompletableFuture<Producer<T>> completableFuture = new CompletableFuture<>();
        getPartitionedTopicMetadata(str).thenAccept(partitionedTopicMetadata -> {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Received topic metadata. partitions: {}", str, Integer.valueOf(partitionedTopicMetadata.partitions));
            }
            this.producers.add(partitionedTopicMetadata.partitions > 0 ? newPartitionedProducerImpl(str, producerConfigurationData, schema, producerInterceptors, completableFuture, partitionedTopicMetadata) : newProducerImpl(str, -1, producerConfigurationData, schema, producerInterceptors, completableFuture));
        }).exceptionally(th -> {
            log.warn("[{}] Failed to get partitioned topic metadata: {}", str, th.getMessage());
            completableFuture.completeExceptionally(th);
            return null;
        });
        return completableFuture;
    }

    protected <T> PartitionedProducerImpl<T> newPartitionedProducerImpl(String str, ProducerConfigurationData producerConfigurationData, Schema<T> schema, ProducerInterceptors producerInterceptors, CompletableFuture<Producer<T>> completableFuture, PartitionedTopicMetadata partitionedTopicMetadata) {
        return new PartitionedProducerImpl<>(this, str, producerConfigurationData, partitionedTopicMetadata.partitions, completableFuture, schema, producerInterceptors);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> ProducerImpl<T> newProducerImpl(String str, int i, ProducerConfigurationData producerConfigurationData, Schema<T> schema, ProducerInterceptors producerInterceptors, CompletableFuture<Producer<T>> completableFuture) {
        return new ProducerImpl<>(this, str, producerConfigurationData, completableFuture, i, schema, producerInterceptors);
    }

    public CompletableFuture<Consumer<byte[]>> subscribeAsync(ConsumerConfigurationData<byte[]> consumerConfigurationData) {
        return subscribeAsync(consumerConfigurationData, Schema.BYTES, null);
    }

    public <T> CompletableFuture<Consumer<T>> subscribeAsync(ConsumerConfigurationData<T> consumerConfigurationData, Schema<T> schema, ConsumerInterceptors<T> consumerInterceptors) {
        if (this.state.get() != State.Open) {
            return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
        }
        if (consumerConfigurationData == null) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined"));
        }
        for (String str : consumerConfigurationData.getTopicNames()) {
            if (!TopicName.isValid(str)) {
                return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Invalid topic name: '" + str + "'"));
            }
        }
        return StringUtils.isBlank(consumerConfigurationData.getSubscriptionName()) ? FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException("Empty subscription name")) : (!consumerConfigurationData.isReadCompacted() || (consumerConfigurationData.getTopicNames().stream().allMatch(str2 -> {
            return TopicName.get(str2).getDomain() == TopicDomain.persistent;
        }) && (consumerConfigurationData.getSubscriptionType() == SubscriptionType.Exclusive || consumerConfigurationData.getSubscriptionType() == SubscriptionType.Failover))) ? (consumerConfigurationData.getConsumerEventListener() == null || consumerConfigurationData.getSubscriptionType() == SubscriptionType.Failover) ? consumerConfigurationData.getTopicsPattern() != null ? !consumerConfigurationData.getTopicNames().isEmpty() ? FutureUtil.failedFuture(new IllegalArgumentException("Topic names list must be null when use topicsPattern")) : patternTopicSubscribeAsync(consumerConfigurationData, schema, consumerInterceptors) : consumerConfigurationData.getTopicNames().size() == 1 ? singleTopicSubscribeAsync(consumerConfigurationData, schema, consumerInterceptors) : multiTopicSubscribeAsync(consumerConfigurationData, schema, consumerInterceptors) : FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException("Active consumer listener is only supported for failover subscription")) : FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException("Read compacted can only be used with exclusive or failover persistent subscriptions"));
    }

    private <T> CompletableFuture<Consumer<T>> singleTopicSubscribeAsync(ConsumerConfigurationData<T> consumerConfigurationData, Schema<T> schema, ConsumerInterceptors<T> consumerInterceptors) {
        return (CompletableFuture<Consumer<T>>) preProcessSchemaBeforeSubscribe(this, schema, consumerConfigurationData.getSingleTopic()).thenCompose(schema2 -> {
            return doSingleTopicSubscribeAsync(consumerConfigurationData, schema2, consumerInterceptors);
        });
    }

    private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerConfigurationData<T> consumerConfigurationData, Schema<T> schema, ConsumerInterceptors<T> consumerInterceptors) {
        CompletableFuture<Consumer<T>> completableFuture = new CompletableFuture<>();
        String singleTopic = consumerConfigurationData.getSingleTopic();
        getPartitionedTopicMetadata(singleTopic).thenAccept(partitionedTopicMetadata -> {
            ConsumerBase newConsumerImpl;
            if (log.isDebugEnabled()) {
                log.debug("[{}] Received topic metadata. partitions: {}", singleTopic, Integer.valueOf(partitionedTopicMetadata.partitions));
            }
            if (partitionedTopicMetadata.partitions > 0) {
                newConsumerImpl = MultiTopicsConsumerImpl.createPartitionedConsumer(this, consumerConfigurationData, this.externalExecutorProvider, completableFuture, partitionedTopicMetadata.partitions, schema, consumerInterceptors);
            } else {
                newConsumerImpl = ConsumerImpl.newConsumerImpl(this, singleTopic, consumerConfigurationData, this.externalExecutorProvider, TopicName.getPartitionIndex(singleTopic), false, completableFuture, null, schema, consumerInterceptors, true);
            }
            this.consumers.add(newConsumerImpl);
        }).exceptionally(th -> {
            log.warn("[{}] Failed to get partitioned topic metadata", singleTopic, th);
            completableFuture.completeExceptionally(th);
            return null;
        });
        return completableFuture;
    }

    private <T> CompletableFuture<Consumer<T>> multiTopicSubscribeAsync(ConsumerConfigurationData<T> consumerConfigurationData, Schema<T> schema, ConsumerInterceptors<T> consumerInterceptors) {
        CompletableFuture<Consumer<T>> completableFuture = new CompletableFuture<>();
        this.consumers.add(new MultiTopicsConsumerImpl(this, consumerConfigurationData, this.externalExecutorProvider, completableFuture, schema, consumerInterceptors, true));
        return completableFuture;
    }

    public CompletableFuture<Consumer<byte[]>> patternTopicSubscribeAsync(ConsumerConfigurationData<byte[]> consumerConfigurationData) {
        return patternTopicSubscribeAsync(consumerConfigurationData, Schema.BYTES, null);
    }

    private <T> CompletableFuture<Consumer<T>> patternTopicSubscribeAsync(ConsumerConfigurationData<T> consumerConfigurationData, Schema<T> schema, ConsumerInterceptors<T> consumerInterceptors) {
        String pattern = consumerConfigurationData.getTopicsPattern().pattern();
        CommandGetTopicsOfNamespace.Mode convertRegexSubscriptionMode = convertRegexSubscriptionMode(consumerConfigurationData.getRegexSubscriptionMode());
        NamespaceName namespaceObject = TopicName.get(pattern).getNamespaceObject();
        CompletableFuture<Consumer<T>> completableFuture = new CompletableFuture<>();
        this.lookup.getTopicsUnderNamespace(namespaceObject, convertRegexSubscriptionMode).thenAccept(list -> {
            if (log.isDebugEnabled()) {
                log.debug("Get topics under namespace {}, topics.size: {}", namespaceObject.toString(), Integer.valueOf(list.size()));
                list.forEach(str -> {
                    log.debug("Get topics under namespace {}, topic: {}", namespaceObject.toString(), str);
                });
            }
            consumerConfigurationData.getTopicNames().addAll(topicsPatternFilter(list, consumerConfigurationData.getTopicsPattern()));
            this.consumers.add(new PatternMultiTopicsConsumerImpl(consumerConfigurationData.getTopicsPattern(), this, consumerConfigurationData, this.externalExecutorProvider, completableFuture, schema, convertRegexSubscriptionMode, consumerInterceptors));
        }).exceptionally(th -> {
            log.warn("[{}] Failed to get topics under namespace", namespaceObject);
            completableFuture.completeExceptionally(th);
            return null;
        });
        return completableFuture;
    }

    public static List<String> topicsPatternFilter(List<String> list, Pattern pattern) {
        Pattern compile = pattern.toString().contains("://") ? Pattern.compile(pattern.toString().split("\\:\\/\\/")[1]) : pattern;
        return (List) list.stream().map(TopicName::get).map((v0) -> {
            return v0.toString();
        }).filter(str -> {
            return compile.matcher(str.split("\\:\\/\\/")[1]).matches();
        }).collect(Collectors.toList());
    }

    public CompletableFuture<Reader<byte[]>> createReaderAsync(ReaderConfigurationData<byte[]> readerConfigurationData) {
        return createReaderAsync(readerConfigurationData, Schema.BYTES);
    }

    public <T> CompletableFuture<Reader<T>> createReaderAsync(ReaderConfigurationData<T> readerConfigurationData, Schema<T> schema) {
        if (this.state.get() != State.Open) {
            return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
        }
        if (readerConfigurationData == null) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined"));
        }
        for (String str : readerConfigurationData.getTopicNames()) {
            if (!TopicName.isValid(str)) {
                return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Invalid topic name: '" + str + "'"));
            }
        }
        return readerConfigurationData.getStartMessageId() == null ? FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException("Invalid startMessageId")) : readerConfigurationData.getTopicNames().size() == 1 ? (CompletableFuture<Reader<T>>) preProcessSchemaBeforeSubscribe(this, schema, readerConfigurationData.getTopicName()).thenCompose(schema2 -> {
            return createSingleTopicReaderAsync(readerConfigurationData, schema2);
        }) : createMultiTopicReaderAsync(readerConfigurationData, schema);
    }

    protected <T> CompletableFuture<Reader<T>> createMultiTopicReaderAsync(ReaderConfigurationData<T> readerConfigurationData, Schema<T> schema) {
        CompletableFuture<Reader<T>> completableFuture = new CompletableFuture<>();
        CompletableFuture completableFuture2 = new CompletableFuture();
        MultiTopicsReaderImpl multiTopicsReaderImpl = new MultiTopicsReaderImpl(this, readerConfigurationData, this.externalExecutorProvider, completableFuture2, schema);
        this.consumers.add(multiTopicsReaderImpl.getMultiTopicsConsumer());
        completableFuture2.thenRun(() -> {
            completableFuture.complete(multiTopicsReaderImpl);
        }).exceptionally(th -> {
            log.warn("Failed to create multiTopicReader", th);
            completableFuture.completeExceptionally(th);
            return null;
        });
        return completableFuture;
    }

    protected <T> CompletableFuture<Reader<T>> createSingleTopicReaderAsync(ReaderConfigurationData<T> readerConfigurationData, Schema<T> schema) {
        String topicName = readerConfigurationData.getTopicName();
        CompletableFuture<Reader<T>> completableFuture = new CompletableFuture<>();
        getPartitionedTopicMetadata(topicName).thenAccept(partitionedTopicMetadata -> {
            Reader readerImpl;
            ConsumerBase consumer;
            if (log.isDebugEnabled()) {
                log.debug("[{}] Received topic metadata. partitions: {}", topicName, Integer.valueOf(partitionedTopicMetadata.partitions));
            }
            if (partitionedTopicMetadata.partitions > 0 && MultiTopicsConsumerImpl.isIllegalMultiTopicsMessageId(readerConfigurationData.getStartMessageId())) {
                completableFuture.completeExceptionally(new PulsarClientException("The partitioned topic startMessageId is illegal"));
                return;
            }
            CompletableFuture completableFuture2 = new CompletableFuture();
            if (partitionedTopicMetadata.partitions > 0) {
                readerImpl = new MultiTopicsReaderImpl(this, readerConfigurationData, this.externalExecutorProvider, completableFuture2, schema);
                consumer = ((MultiTopicsReaderImpl) readerImpl).getMultiTopicsConsumer();
            } else {
                readerImpl = new ReaderImpl(this, readerConfigurationData, this.externalExecutorProvider, completableFuture2, schema);
                consumer = ((ReaderImpl) readerImpl).getConsumer();
            }
            this.consumers.add(consumer);
            Reader reader = readerImpl;
            completableFuture2.thenRun(() -> {
                completableFuture.complete(reader);
            }).exceptionally(th -> {
                log.warn("[{}] Failed to get create topic reader", topicName, th);
                completableFuture.completeExceptionally(th);
                return null;
            });
        }).exceptionally(th -> {
            log.warn("[{}] Failed to get partitioned topic metadata", topicName, th);
            completableFuture.completeExceptionally(th);
            return null;
        });
        return completableFuture;
    }

    public CompletableFuture<Optional<SchemaInfo>> getSchema(String str) {
        try {
            return this.lookup.getSchema(TopicName.get(str));
        } catch (Throwable th) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Invalid topic name: '" + str + "'"));
        }
    }

    @Override // org.apache.pulsar.client.api.PulsarClient, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws PulsarClientException {
        try {
            closeAsync().get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw PulsarClientException.unwrap(e);
        } catch (ExecutionException e2) {
            PulsarClientException unwrap = PulsarClientException.unwrap(e2);
            if (!(unwrap instanceof PulsarClientException.AlreadyClosedException)) {
                throw unwrap;
            }
        }
    }

    @Override // org.apache.pulsar.client.api.PulsarClient
    public CompletableFuture<Void> closeAsync() {
        log.info("Client closing. URL: {}", this.lookup.getServiceUrl());
        if (!this.state.compareAndSet(State.Open, State.Closing)) {
            return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        ArrayList arrayList = new ArrayList();
        this.producers.forEach(producerBase -> {
            arrayList.add(producerBase.closeAsync());
        });
        this.consumers.forEach(consumerBase -> {
            arrayList.add(consumerBase.closeAsync());
        });
        FutureUtil.waitForAll(arrayList).thenRun(() -> {
            new Thread(() -> {
                try {
                    shutdown();
                    completableFuture.complete(null);
                    this.state.set(State.Closed);
                } catch (PulsarClientException e) {
                    completableFuture.completeExceptionally(e);
                }
            }, "pulsar-client-shutdown-thread").start();
        }).exceptionally(th -> {
            completableFuture.completeExceptionally(th);
            return null;
        });
        return completableFuture;
    }

    @Override // org.apache.pulsar.client.api.PulsarClient
    public void shutdown() throws PulsarClientException {
        try {
            Throwable th = null;
            if (this.lookup != null) {
                try {
                    this.lookup.close();
                } catch (Throwable th2) {
                    log.warn("Failed to shutdown lookup", th2);
                    th = th2;
                }
            }
            if (this.tcClient != null) {
                try {
                    this.tcClient.close();
                } catch (Throwable th3) {
                    log.warn("Failed to close tcClient");
                    th = th3;
                }
            }
            try {
                shutdownEventLoopGroup(this.eventLoopGroup);
            } catch (PulsarClientException e) {
                log.warn("Failed to shutdown eventLoopGroup", (Throwable) e);
                th = e;
            }
            try {
                closeCnxPool(this.cnxPool);
            } catch (PulsarClientException e2) {
                log.warn("Failed to shutdown cnxPool", (Throwable) e2);
                th = e2;
            }
            if (this.timer != null && this.needStopTimer) {
                try {
                    this.timer.stop();
                } catch (Throwable th4) {
                    log.warn("Failed to shutdown timer", th4);
                    th = th4;
                }
            }
            try {
                shutdownExecutors();
            } catch (PulsarClientException e3) {
                th = e3;
            }
            if (this.conf != null && this.conf.getAuthentication() != null) {
                try {
                    this.conf.getAuthentication().close();
                } catch (Throwable th5) {
                    log.warn("Failed to close authentication", th5);
                    th = th5;
                }
            }
            if (th != null) {
                throw th;
            }
        } catch (Throwable th6) {
            log.warn("Failed to shutdown Pulsar client", th6);
            throw PulsarClientException.unwrap(th6);
        }
    }

    private void closeCnxPool(ConnectionPool connectionPool) throws PulsarClientException {
        if (!this.createdCnxPool || connectionPool == null) {
            return;
        }
        try {
            connectionPool.close();
        } catch (Throwable th) {
            throw PulsarClientException.unwrap(th);
        }
    }

    private void shutdownEventLoopGroup(EventLoopGroup eventLoopGroup) throws PulsarClientException {
        if (!this.createdEventLoopGroup || eventLoopGroup == null || eventLoopGroup.isShutdown()) {
            return;
        }
        try {
            eventLoopGroup.shutdownGracefully().get();
        } catch (Throwable th) {
            throw PulsarClientException.unwrap(th);
        }
    }

    private void shutdownExecutors() throws PulsarClientException {
        PulsarClientException pulsarClientException = null;
        if (this.externalExecutorProvider != null && !this.externalExecutorProvider.isShutdown()) {
            try {
                this.externalExecutorProvider.shutdownNow();
            } catch (Throwable th) {
                log.warn("Failed to shutdown externalExecutorProvider", th);
                pulsarClientException = PulsarClientException.unwrap(th);
            }
        }
        if (this.internalExecutorService != null && !this.internalExecutorService.isShutdown()) {
            try {
                this.internalExecutorService.shutdownNow();
            } catch (Throwable th2) {
                log.warn("Failed to shutdown internalExecutorService", th2);
                pulsarClientException = PulsarClientException.unwrap(th2);
            }
        }
        if (pulsarClientException != null) {
            throw pulsarClientException;
        }
    }

    @Override // org.apache.pulsar.client.api.PulsarClient
    public boolean isClosed() {
        State state = this.state.get();
        return state == State.Closed || state == State.Closing;
    }

    @Override // org.apache.pulsar.client.api.PulsarClient
    public synchronized void updateServiceUrl(String str) throws PulsarClientException {
        log.info("Updating service URL to {}", str);
        this.conf.setServiceUrl(str);
        this.lookup.updateServiceUrl(str);
        this.cnxPool.closeAllConnections();
    }

    public CompletableFuture<ClientCnx> getConnection(String str) {
        return this.lookup.getBroker(TopicName.get(str)).thenCompose(pair -> {
            return this.cnxPool.getConnection((InetSocketAddress) pair.getLeft(), (InetSocketAddress) pair.getRight());
        });
    }

    public Timer timer() {
        return this.timer;
    }

    public ExecutorProvider externalExecutorProvider() {
        return this.externalExecutorProvider;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long newProducerId() {
        return this.producerIdGenerator.getAndIncrement();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long newConsumerId() {
        return this.consumerIdGenerator.getAndIncrement();
    }

    public long newRequestId() {
        return this.requestIdGenerator.getAndIncrement();
    }

    public ConnectionPool getCnxPool() {
        return this.cnxPool;
    }

    public EventLoopGroup eventLoopGroup() {
        return this.eventLoopGroup;
    }

    public LookupService getLookup() {
        return this.lookup;
    }

    public void reloadLookUp() throws PulsarClientException {
        if (this.conf.getServiceUrl().startsWith("http")) {
            this.lookup = new HttpLookupService(this.conf, this.eventLoopGroup);
        } else {
            this.lookup = new BinaryProtoLookupService(this, this.conf.getServiceUrl(), this.conf.getListenerName(), this.conf.isUseTls(), this.externalExecutorProvider.getExecutor());
        }
    }

    public CompletableFuture<Integer> getNumberOfPartitions(String str) {
        return getPartitionedTopicMetadata(str).thenApply(partitionedTopicMetadata -> {
            return Integer.valueOf(partitionedTopicMetadata.partitions);
        });
    }

    public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(String str) {
        CompletableFuture<PartitionedTopicMetadata> completableFuture = new CompletableFuture<>();
        try {
            TopicName topicName = TopicName.get(str);
            AtomicLong atomicLong = new AtomicLong(this.conf.getLookupTimeoutMs());
            getPartitionedTopicMetadata(topicName, new BackoffBuilder().setInitialTime(100L, TimeUnit.MILLISECONDS).setMandatoryStop(atomicLong.get() * 2, TimeUnit.MILLISECONDS).setMax(1L, TimeUnit.MINUTES).create(), atomicLong, completableFuture, new ArrayList());
            return completableFuture;
        } catch (IllegalArgumentException e) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(e.getMessage()));
        }
    }

    private void getPartitionedTopicMetadata(TopicName topicName, Backoff backoff, AtomicLong atomicLong, CompletableFuture<PartitionedTopicMetadata> completableFuture, List<Throwable> list) {
        long nanoTime = System.nanoTime();
        CompletableFuture<PartitionedTopicMetadata> partitionedTopicMetadata = this.lookup.getPartitionedTopicMetadata(topicName);
        completableFuture.getClass();
        partitionedTopicMetadata.thenAccept((v1) -> {
            r1.complete(v1);
        }).exceptionally(th -> {
            atomicLong.addAndGet((-1) * TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime));
            long min = Math.min(backoff.next(), atomicLong.get());
            boolean z = !PulsarClientException.isRetriableError(th.getCause()) || (th.getCause() instanceof PulsarClientException.AuthenticationException);
            if (min <= 0 || z) {
                PulsarClientException.setPreviousExceptions(th, list);
                completableFuture.completeExceptionally(th);
                return null;
            }
            list.add(th);
            ((ScheduledExecutorService) this.externalExecutorProvider.getExecutor()).schedule(() -> {
                log.warn("[topic: {}] Could not get connection while getPartitionedTopicMetadata -- Will try again in {} ms", topicName, Long.valueOf(min));
                atomicLong.addAndGet(-min);
                getPartitionedTopicMetadata(topicName, backoff, atomicLong, completableFuture, list);
            }, min, TimeUnit.MILLISECONDS);
            return null;
        });
    }

    @Override // org.apache.pulsar.client.api.PulsarClient
    public CompletableFuture<List<String>> getPartitionsForTopic(String str) {
        return getPartitionedTopicMetadata(str).thenApply(partitionedTopicMetadata -> {
            if (partitionedTopicMetadata.partitions <= 0) {
                return Collections.singletonList(str);
            }
            TopicName topicName = TopicName.get(str);
            ArrayList arrayList = new ArrayList(partitionedTopicMetadata.partitions);
            for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
                arrayList.add(topicName.getPartition(i).toString());
            }
            return arrayList;
        });
    }

    private static EventLoopGroup getEventLoopGroup(ClientConfigurationData clientConfigurationData) {
        return EventLoopUtil.newEventLoopGroup(clientConfigurationData.getNumIoThreads(), clientConfigurationData.isEnableBusyWait(), getThreadFactory("pulsar-client-io"));
    }

    private static ThreadFactory getThreadFactory(String str) {
        return new DefaultThreadFactory(str, Thread.currentThread().isDaemon());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void cleanupProducer(ProducerBase<?> producerBase) {
        this.producers.remove(producerBase);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void cleanupConsumer(ConsumerBase<?> consumerBase) {
        this.consumers.remove(consumerBase);
    }

    @VisibleForTesting
    int producersCount() {
        return this.producers.size();
    }

    @VisibleForTesting
    int consumersCount() {
        return this.consumers.size();
    }

    private static CommandGetTopicsOfNamespace.Mode convertRegexSubscriptionMode(RegexSubscriptionMode regexSubscriptionMode) {
        switch (regexSubscriptionMode) {
            case PersistentOnly:
                return CommandGetTopicsOfNamespace.Mode.PERSISTENT;
            case NonPersistentOnly:
                return CommandGetTopicsOfNamespace.Mode.NON_PERSISTENT;
            case AllTopics:
                return CommandGetTopicsOfNamespace.Mode.ALL;
            default:
                return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public SchemaInfoProvider newSchemaProvider(String str) {
        return new MultiVersionSchemaInfoProvider(TopicName.get(str), this);
    }

    public LoadingCache<String, SchemaInfoProvider> getSchemaProviderLoadingCache() {
        return this.schemaProviderLoadingCache;
    }

    public MemoryLimitController getMemoryLimitController() {
        return this.memoryLimitController;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> CompletableFuture<Schema<T>> preProcessSchemaBeforeSubscribe(PulsarClientImpl pulsarClientImpl, Schema<T> schema, String str) {
        if (schema != null && schema.supportSchemaVersioning()) {
            try {
                SchemaInfoProvider schemaInfoProvider = pulsarClientImpl.getSchemaProviderLoadingCache().get(str);
                schema = schema.m7956clone();
                if (schema.requireFetchingSchemaInfo()) {
                    return (CompletableFuture<Schema<T>>) schemaInfoProvider.getLatestSchema().thenCompose(schemaInfo -> {
                        if (null == schemaInfo && !(schema instanceof AutoConsumeSchema) && !(schema instanceof KeyValueSchema)) {
                            return FutureUtil.failedFuture(new PulsarClientException.NotFoundException("No latest schema found for topic " + str));
                        }
                        try {
                            log.info("Configuring schema for topic {} : {}", str, schemaInfo);
                            schema.configureSchemaInfo(str, "topic", schemaInfo);
                            schema.setSchemaInfoProvider(schemaInfoProvider);
                            return CompletableFuture.completedFuture(schema);
                        } catch (RuntimeException e) {
                            return FutureUtil.failedFuture(e);
                        }
                    });
                }
                schema.setSchemaInfoProvider(schemaInfoProvider);
            } catch (ExecutionException e) {
                log.error("Failed to load schema info provider for topic {}", str, e);
                return FutureUtil.failedFuture(e.getCause());
            }
        }
        return CompletableFuture.completedFuture(schema);
    }

    public ExecutorService getInternalExecutorService() {
        return this.internalExecutorService.getExecutor();
    }

    @Override // org.apache.pulsar.client.api.PulsarClient
    public TransactionBuilder newTransaction() throws PulsarClientException {
        if (this.conf.isEnableTransaction()) {
            return new TransactionBuilderImpl(this, this.tcClient);
        }
        throw new PulsarClientException.InvalidConfigurationException("Transactions are not enabled");
    }

    public Timer getTimer() {
        return this.timer;
    }

    public TransactionCoordinatorClientImpl getTcClient() {
        return this.tcClient;
    }
}
