package org.apache.pulsar.client.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.HandlerState;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.lookup.GetTopicsResult;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-client-original-3.0.8.2-shaded.jar:org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.class */
public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T> implements TimerTask {
    private final Pattern topicsPattern;
    final TopicsChangedListener topicsChangeListener;
    private final CommandGetTopicsOfNamespace.Mode subscriptionMode;
    private final CompletableFuture<TopicListWatcher> watcherFuture;
    protected NamespaceName namespaceName;
    private final Backoff recheckPatternTaskBackoff;
    private final AtomicInteger recheckPatternEpoch;
    private volatile Timeout recheckPatternTimeout;
    private volatile String topicsHash;
    private PatternConsumerUpdateQueue updateTaskQueue;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PatternMultiTopicsConsumerImpl.class);

    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-client-original-3.0.8.2-shaded.jar:org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl$PatternTopicsChangedListener.class */
    private class PatternTopicsChangedListener implements TopicsChangedListener {
        private PatternTopicsChangedListener() {
        }

        @Override // org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl.TopicsChangedListener
        public CompletableFuture<Void> onTopicsRemoved(Collection<String> collection) {
            if (collection.isEmpty()) {
                return CompletableFuture.completedFuture(null);
            }
            ArrayList arrayList = new ArrayList(collection.size());
            HashSet hashSet = new HashSet(collection.size());
            HashSet hashSet2 = new HashSet(collection.size());
            Iterator<String> it = collection.iterator();
            while (it.hasNext()) {
                TopicName topicName = TopicName.get(it.next());
                ConsumerImpl<T> consumerImpl = PatternMultiTopicsConsumerImpl.this.consumers.get(topicName.toString());
                if (consumerImpl != null) {
                    CompletableFuture completableFuture = new CompletableFuture();
                    consumerImpl.closeAsync().whenComplete((r11, th) -> {
                        if (th != null) {
                            PatternMultiTopicsConsumerImpl.log.error("Pattern consumer [{}] failed to unsubscribe from topics: {}", PatternMultiTopicsConsumerImpl.this.getSubscription(), topicName.toString(), th);
                            completableFuture.completeExceptionally(th);
                        } else {
                            PatternMultiTopicsConsumerImpl.this.consumers.remove(topicName.toString(), consumerImpl);
                            completableFuture.complete(null);
                        }
                    });
                    arrayList.add(completableFuture);
                    hashSet.add(topicName.getPartitionedTopicName());
                    hashSet2.add(topicName.toString());
                }
            }
            if (PatternMultiTopicsConsumerImpl.log.isDebugEnabled()) {
                PatternMultiTopicsConsumerImpl.log.debug("Pattern consumer [{}] remove topics. {}", PatternMultiTopicsConsumerImpl.this.getSubscription(), hashSet2);
            }
            return FutureUtil.waitForAll(arrayList).handle((r9, th2) -> {
                ArrayList arrayList2 = new ArrayList();
                Iterator it2 = hashSet.iterator();
                while (it2.hasNext()) {
                    String str = (String) it2.next();
                    Integer num = PatternMultiTopicsConsumerImpl.this.partitionedTopics.get(str);
                    if (num != null) {
                        boolean z = true;
                        int i = 0;
                        while (true) {
                            if (i >= num.intValue()) {
                                break;
                            }
                            if (PatternMultiTopicsConsumerImpl.this.consumers.containsKey(TopicName.get(str).getPartition(i).toString())) {
                                z = false;
                                break;
                            }
                            i++;
                        }
                        if (z) {
                            arrayList2.add(String.format("%s with %s partitions", str, num));
                            PatternMultiTopicsConsumerImpl.this.partitionedTopics.remove(str, num);
                        }
                    }
                }
                if (!PatternMultiTopicsConsumerImpl.log.isDebugEnabled()) {
                    return null;
                }
                PatternMultiTopicsConsumerImpl.log.debug("Pattern consumer [{}] remove partitioned topics because all partitions have been removed. {}", PatternMultiTopicsConsumerImpl.this.getSubscription(), arrayList2);
                return null;
            });
        }

        @Override // org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl.TopicsChangedListener
        public CompletableFuture<Void> onTopicsAdded(Collection<String> collection) {
            if (collection.isEmpty()) {
                return CompletableFuture.completedFuture(null);
            }
            ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(collection.size());
            HashSet<String> hashSet = new HashSet();
            ArrayList arrayList = new ArrayList();
            Iterator<String> it = collection.iterator();
            while (it.hasNext()) {
                hashSet.add(TopicName.get(it.next()).getPartitionedTopicName());
            }
            Iterator<String> it2 = collection.iterator();
            while (it2.hasNext()) {
                TopicName topicName = TopicName.get(it2.next());
                if (PatternMultiTopicsConsumerImpl.this.partitionedTopics.containsKey(topicName.getPartitionedTopicName())) {
                    if (!PatternMultiTopicsConsumerImpl.this.consumers.containsKey(topicName.toString())) {
                        if (topicName.getPartitionIndex() < 0) {
                            PatternMultiTopicsConsumerImpl.log.error("Pattern consumer [{}] skip to subscribe to the non-partitioned topic {}, because hassubscribed a partitioned topic with the same name", PatternMultiTopicsConsumerImpl.this.getSubscription(), topicName.toString());
                        } else {
                            if (topicName.getPartitionIndex() + 1 > PatternMultiTopicsConsumerImpl.this.partitionedTopics.get(topicName.getPartitionedTopicName()).intValue()) {
                                PatternMultiTopicsConsumerImpl.this.partitionedTopics.put(topicName.getPartitionedTopicName(), Integer.valueOf(topicName.getPartitionIndex() + 1));
                            }
                            arrayList.add(topicName.toString());
                            CompletableFuture<Void> subscribeAsync = PatternMultiTopicsConsumerImpl.this.subscribeAsync(topicName.toString(), 0);
                            subscribeAsync.whenComplete((obj, obj2) -> {
                                if (obj2 != null) {
                                    PatternMultiTopicsConsumerImpl.log.warn("Pattern consumer [{}] Failed to subscribe to topics: {}", PatternMultiTopicsConsumerImpl.this.getSubscription(), topicName, obj2);
                                }
                            });
                            newArrayListWithExpectedSize.add(subscribeAsync);
                        }
                    }
                    hashSet.remove(topicName.getPartitionedTopicName());
                } else if (PatternMultiTopicsConsumerImpl.this.consumers.containsKey(topicName.toString())) {
                    hashSet.remove(topicName.getPartitionedTopicName());
                } else if (PatternMultiTopicsConsumerImpl.this.consumers.containsKey(topicName.getPartitionedTopicName()) && topicName.getPartitionIndex() >= 0) {
                    PatternMultiTopicsConsumerImpl.log.error("Pattern consumer [{}] skip to subscribe to the partitioned topic {}, because hassubscribed a non-partitioned topic with the same name", PatternMultiTopicsConsumerImpl.this.getSubscription(), topicName);
                    hashSet.remove(topicName.getPartitionedTopicName());
                }
            }
            for (String str : hashSet) {
                CompletableFuture<Void> subscribeAsync2 = PatternMultiTopicsConsumerImpl.this.subscribeAsync(str, false);
                subscribeAsync2.whenComplete((obj3, obj4) -> {
                    if (obj4 != null) {
                        PatternMultiTopicsConsumerImpl.log.warn("Pattern consumer [{}] Failed to subscribe to topics: {}", PatternMultiTopicsConsumerImpl.this.getSubscription(), str, obj4);
                    }
                });
                newArrayListWithExpectedSize.add(subscribeAsync2);
            }
            if (PatternMultiTopicsConsumerImpl.log.isDebugEnabled()) {
                PatternMultiTopicsConsumerImpl.log.debug("Pattern consumer [{}] add topics. expend partitions {}, new subscribing {}", PatternMultiTopicsConsumerImpl.this.getSubscription(), arrayList, hashSet);
            }
            return FutureUtil.waitForAll(newArrayListWithExpectedSize);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-client-original-3.0.8.2-shaded.jar:org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl$TopicsChangedListener.class */
    public interface TopicsChangedListener {
        CompletableFuture<Void> onTopicsRemoved(Collection<String> collection);

        CompletableFuture<Void> onTopicsAdded(Collection<String> collection);
    }

    public PatternMultiTopicsConsumerImpl(Pattern pattern, String str, PulsarClientImpl pulsarClientImpl, ConsumerConfigurationData<T> consumerConfigurationData, ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> completableFuture, Schema<T> schema, CommandGetTopicsOfNamespace.Mode mode, ConsumerInterceptors<T> consumerInterceptors) {
        super(pulsarClientImpl, consumerConfigurationData, executorProvider, completableFuture, schema, consumerInterceptors, false);
        this.watcherFuture = new CompletableFuture<>();
        this.recheckPatternEpoch = new AtomicInteger();
        this.recheckPatternTimeout = null;
        this.topicsPattern = pattern;
        this.topicsHash = str;
        this.subscriptionMode = mode;
        this.recheckPatternTaskBackoff = new BackoffBuilder().setInitialTime(pulsarClientImpl.getConfiguration().getInitialBackoffIntervalNanos(), TimeUnit.NANOSECONDS).setMax(pulsarClientImpl.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS).setMandatoryStop(0L, TimeUnit.SECONDS).create();
        if (this.namespaceName == null) {
            this.namespaceName = getNameSpaceFromPattern(pattern);
        }
        Preconditions.checkArgument(getNameSpaceFromPattern(pattern).toString().equals(this.namespaceName.toString()));
        this.topicsChangeListener = new PatternTopicsChangedListener();
        this.updateTaskQueue = new PatternConsumerUpdateQueue(this);
        this.recheckPatternTimeout = pulsarClientImpl.timer().newTimeout(this, Math.max(1, consumerConfigurationData.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
        if (mode != CommandGetTopicsOfNamespace.Mode.PERSISTENT) {
            log.debug("Pattern consumer [{}] not creating topic list watcher for subscription mode {}", consumerConfigurationData.getSubscriptionName(), mode);
            this.watcherFuture.complete(null);
        } else {
            new TopicListWatcher(this.updateTaskQueue, pulsarClientImpl, pattern, pulsarClientImpl.newTopicListWatcherId(), this.namespaceName, str, this.watcherFuture, () -> {
                recheckTopicsChangeAfterReconnect();
            });
            this.watcherFuture.thenAccept(topicListWatcher -> {
                this.recheckPatternTimeout.cancel();
            }).exceptionally(th -> {
                log.warn("Pattern consumer [{}] unable to create topic list watcher. Falling back to only polling for new topics", consumerConfigurationData.getSubscriptionName(), th);
                return null;
            });
        }
    }

    public static NamespaceName getNameSpaceFromPattern(Pattern pattern) {
        return TopicName.get(pattern.pattern()).getNamespaceObject();
    }

    private void recheckTopicsChangeAfterReconnect() {
        if (getState() == HandlerState.State.Closing || getState() == HandlerState.State.Closed) {
            return;
        }
        this.updateTaskQueue.appendRecheckOp();
    }

    @Override // io.netty.util.TimerTask
    public void run(Timeout timeout) throws Exception {
        if (timeout.isCancelled()) {
            return;
        }
        this.updateTaskQueue.appendRecheckOp();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Void> recheckTopicsChange() {
        String pattern = this.topicsPattern.pattern();
        int incrementAndGet = this.recheckPatternEpoch.incrementAndGet();
        return this.client.getLookup().getTopicsUnderNamespace(this.namespaceName, this.subscriptionMode, pattern, this.topicsHash).thenCompose(getTopicsResult -> {
            synchronized (this.recheckPatternTaskBackoff) {
                if (this.recheckPatternEpoch.get() > incrementAndGet) {
                    return CompletableFuture.completedFuture(null);
                }
                if (log.isDebugEnabled()) {
                    log.debug("Pattern consumer [{}] get topics under namespace {}, topics.size: {}, topicsHash: {}, filtered: {}", getSubscription(), this.namespaceName, Integer.valueOf(getTopicsResult.getTopics().size()), getTopicsResult.getTopicsHash(), Boolean.valueOf(getTopicsResult.isFiltered()));
                    getTopicsResult.getTopics().forEach(str -> {
                        log.debug("Get topics under namespace {}, topic: {}", this.namespaceName, str);
                    });
                }
                return updateSubscriptions(this.topicsPattern, this::setTopicsHash, getTopicsResult, this.topicsChangeListener, new ArrayList(getPartitions()), this.subscription);
            }
        });
    }

    static CompletableFuture<Void> updateSubscriptions(Pattern pattern, java.util.function.Consumer<String> consumer, GetTopicsResult getTopicsResult, TopicsChangedListener topicsChangedListener, List<String> list, String str) {
        consumer.accept(getTopicsResult.getTopicsHash());
        if (!getTopicsResult.isChanged()) {
            return CompletableFuture.completedFuture(null);
        }
        List<String> nonPartitionedOrPartitionTopics = getTopicsResult.isFiltered() ? getTopicsResult.getNonPartitionedOrPartitionTopics() : getTopicsResult.filterTopics(pattern).getNonPartitionedOrPartitionTopics();
        ArrayList arrayList = new ArrayList(2);
        Set<String> minus = TopicList.minus(nonPartitionedOrPartitionTopics, list);
        Set<String> minus2 = TopicList.minus(list, nonPartitionedOrPartitionTopics);
        if (log.isDebugEnabled()) {
            log.debug("Pattern consumer [{}] Recheck pattern consumer's topics. topicsAdded: {}, topicsRemoved: {}", str, minus, minus2);
        }
        arrayList.add(topicsChangedListener.onTopicsAdded(minus));
        arrayList.add(topicsChangedListener.onTopicsRemoved(minus2));
        return FutureUtil.waitForAll(Collections.unmodifiableList(arrayList));
    }

    public Pattern getPattern() {
        return this.topicsPattern;
    }

    @VisibleForTesting
    void setTopicsHash(String str) {
        this.topicsHash = str;
    }

    @Override // org.apache.pulsar.client.impl.MultiTopicsConsumerImpl, org.apache.pulsar.client.impl.ConsumerBase, org.apache.pulsar.client.api.Consumer
    @SuppressFBWarnings
    public CompletableFuture<Void> closeAsync() {
        TopicListWatcher now;
        Timeout timeout = this.recheckPatternTimeout;
        if (timeout != null) {
            timeout.cancel();
            this.recheckPatternTimeout = null;
        }
        ArrayList arrayList = new ArrayList(2);
        if (this.watcherFuture.isDone() && !this.watcherFuture.isCompletedExceptionally() && (now = this.watcherFuture.getNow(null)) != null) {
            arrayList.add(now.closeAsync());
        }
        arrayList.add(this.updateTaskQueue.cancelAllAndWaitForTheRunningTask().thenCompose(r3 -> {
            return super.closeAsync();
        }));
        return FutureUtil.waitForAll(arrayList);
    }

    @VisibleForTesting
    Timeout getRecheckPatternTimeout() {
        return this.recheckPatternTimeout;
    }

    @Override // org.apache.pulsar.client.impl.MultiTopicsConsumerImpl
    protected void handleSubscribeOneTopicError(String str, Throwable th, CompletableFuture<Void> completableFuture) {
        completableFuture.completeExceptionally(th);
    }
}
