package org.apache.pulsar.metadata.tableview.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.function.Predicate;
import lombok.NonNull;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.CacheGetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataCacheConfig;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreTableView;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-metadata-4.0.0-preview.1.jar:org/apache/pulsar/metadata/tableview/impl/MetadataStoreTableViewImpl.class */
public class MetadataStoreTableViewImpl<T> implements MetadataStoreTableView<T> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MetadataStoreTableViewImpl.class);
    private static final int FILL_TIMEOUT_IN_MILLIS = 300000;
    private static final int MAX_CONCURRENT_METADATA_OPS_DURING_FILL = 50;
    private static final long CACHE_REFRESH_FREQUENCY_IN_MILLIS = 600000;
    private final ConcurrentMap<String, T> data;
    private final Map<String, T> immutableData;
    private final String name;
    private final MetadataStore store;
    private final MetadataCache<T> cache;
    private final Predicate<String> listenPathValidator;
    private final BiPredicate<T, T> conflictResolver;
    private final List<BiConsumer<String, T>> tailItemListeners;
    private final List<BiConsumer<String, T>> existingItemListeners;
    private final long timeoutInMillis;
    private final String pathPrefix;

    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-metadata-4.0.0-preview.1.jar:org/apache/pulsar/metadata/tableview/impl/MetadataStoreTableViewImpl$MetadataStoreTableViewImplBuilder.class */
    public static class MetadataStoreTableViewImplBuilder<T> {
        private Class<T> clazz;
        private String name;
        private MetadataStore store;
        private String pathPrefix;
        private BiPredicate<T, T> conflictResolver;
        private Predicate<String> listenPathValidator;
        private List<BiConsumer<String, T>> tailItemListeners;
        private List<BiConsumer<String, T>> existingItemListeners;
        private long timeoutInMillis;

        MetadataStoreTableViewImplBuilder() {
        }

        public MetadataStoreTableViewImplBuilder<T> clazz(@NonNull Class<T> cls) {
            if (cls == null) {
                throw new NullPointerException("clazz is marked non-null but is null");
            }
            this.clazz = cls;
            return this;
        }

        public MetadataStoreTableViewImplBuilder<T> name(@NonNull String str) {
            if (str == null) {
                throw new NullPointerException("name is marked non-null but is null");
            }
            this.name = str;
            return this;
        }

        public MetadataStoreTableViewImplBuilder<T> store(@NonNull MetadataStore metadataStore) {
            if (metadataStore == null) {
                throw new NullPointerException("store is marked non-null but is null");
            }
            this.store = metadataStore;
            return this;
        }

        public MetadataStoreTableViewImplBuilder<T> pathPrefix(@NonNull String str) {
            if (str == null) {
                throw new NullPointerException("pathPrefix is marked non-null but is null");
            }
            this.pathPrefix = str;
            return this;
        }

        public MetadataStoreTableViewImplBuilder<T> conflictResolver(@NonNull BiPredicate<T, T> biPredicate) {
            if (biPredicate == null) {
                throw new NullPointerException("conflictResolver is marked non-null but is null");
            }
            this.conflictResolver = biPredicate;
            return this;
        }

        public MetadataStoreTableViewImplBuilder<T> listenPathValidator(Predicate<String> predicate) {
            this.listenPathValidator = predicate;
            return this;
        }

        public MetadataStoreTableViewImplBuilder<T> tailItemListeners(List<BiConsumer<String, T>> list) {
            this.tailItemListeners = list;
            return this;
        }

        public MetadataStoreTableViewImplBuilder<T> existingItemListeners(List<BiConsumer<String, T>> list) {
            this.existingItemListeners = list;
            return this;
        }

        public MetadataStoreTableViewImplBuilder<T> timeoutInMillis(long j) {
            this.timeoutInMillis = j;
            return this;
        }

        public MetadataStoreTableViewImpl<T> build() {
            return new MetadataStoreTableViewImpl<>(this.clazz, this.name, this.store, this.pathPrefix, this.conflictResolver, this.listenPathValidator, this.tailItemListeners, this.existingItemListeners, this.timeoutInMillis);
        }

        public String toString() {
            return "MetadataStoreTableViewImpl.MetadataStoreTableViewImplBuilder(clazz=" + this.clazz + ", name=" + this.name + ", store=" + this.store + ", pathPrefix=" + this.pathPrefix + ", conflictResolver=" + this.conflictResolver + ", listenPathValidator=" + this.listenPathValidator + ", tailItemListeners=" + this.tailItemListeners + ", existingItemListeners=" + this.existingItemListeners + ", timeoutInMillis=" + this.timeoutInMillis + ")";
        }
    }

    public MetadataStoreTableViewImpl(@NonNull Class<T> cls, @NonNull String str, @NonNull MetadataStore metadataStore, @NonNull String str2, @NonNull BiPredicate<T, T> biPredicate, Predicate<String> predicate, List<BiConsumer<String, T>> list, List<BiConsumer<String, T>> list2, long j) {
        if (cls == null) {
            throw new NullPointerException("clazz is marked non-null but is null");
        }
        if (str == null) {
            throw new NullPointerException("name is marked non-null but is null");
        }
        if (metadataStore == null) {
            throw new NullPointerException("store is marked non-null but is null");
        }
        if (str2 == null) {
            throw new NullPointerException("pathPrefix is marked non-null but is null");
        }
        if (biPredicate == null) {
            throw new NullPointerException("conflictResolver is marked non-null but is null");
        }
        this.name = str;
        this.data = new ConcurrentHashMap();
        this.immutableData = Collections.unmodifiableMap(this.data);
        this.pathPrefix = str2;
        this.conflictResolver = biPredicate;
        this.listenPathValidator = predicate;
        this.tailItemListeners = new ArrayList();
        if (list != null) {
            this.tailItemListeners.addAll(list);
        }
        this.existingItemListeners = new ArrayList();
        if (list2 != null) {
            this.existingItemListeners.addAll(list2);
        }
        this.timeoutInMillis = j;
        this.store = metadataStore;
        this.cache = metadataStore.getMetadataCache(cls, MetadataCacheConfig.builder().expireAfterWriteMillis(-1L).refreshAfterWriteMillis(CACHE_REFRESH_FREQUENCY_IN_MILLIS).asyncReloadConsumer(this::consumeAsyncReload).build());
        metadataStore.registerListener(this::handleNotification);
    }

    @Override // org.apache.pulsar.metadata.api.MetadataStoreTableView
    public void start() throws MetadataStoreException {
        fill();
    }

    private void consumeAsyncReload(String str, Optional<CacheGetResult<T>> optional) {
        if (isValidPath(str)) {
            handleTailItem(getKey(str), getValue(optional));
        }
    }

    private boolean isValidPath(String str) {
        return this.listenPathValidator == null || this.listenPathValidator.test(str);
    }

    private T getValue(Optional<CacheGetResult<T>> optional) {
        return (T) optional.map((v0) -> {
            return v0.getValue();
        }).orElse(null);
    }

    boolean updateData(String str, T t) {
        MutableBoolean mutableBoolean = new MutableBoolean();
        this.data.compute(str, (str2, obj) -> {
            if (!Objects.equals(obj, t)) {
                mutableBoolean.setValue(true);
                return t;
            }
            if (log.isDebugEnabled()) {
                log.debug("{} skipped item key={} value={} prev={}", this.name, str, t, obj);
            }
            mutableBoolean.setValue(false);
            return obj;
        });
        return mutableBoolean.booleanValue();
    }

    private void handleTailItem(String str, T t) {
        if (updateData(str, t)) {
            if (log.isDebugEnabled()) {
                log.debug("{} applying item key={} value={}", this.name, str, t);
            }
            Iterator<BiConsumer<String, T>> it = this.tailItemListeners.iterator();
            while (it.hasNext()) {
                try {
                    it.next().accept(str, t);
                } catch (Throwable th) {
                    log.error("{} failed to listen tail item key:{}, val:{}", this.name, str, t, th);
                }
            }
        }
    }

    private CompletableFuture<Void> doHandleNotification(String str) {
        return !isValidPath(str) ? CompletableFuture.completedFuture(null) : this.cache.get(str).thenAccept(optional -> {
            handleTailItem(getKey(str), optional.orElse(null));
        }).exceptionally(th -> {
            log.error("{} failed to handle notification for path:{}", this.name, str, th);
            return null;
        });
    }

    private void handleNotification(Notification notification) {
        if (notification.getType() == NotificationType.ChildrenChanged) {
            return;
        }
        doHandleNotification(notification.getPath());
    }

    private CompletableFuture<Void> handleExisting(String str) {
        return !isValidPath(str) ? CompletableFuture.completedFuture(null) : this.cache.get(str).thenAccept(optional -> {
            optional.ifPresent(obj -> {
                String key = getKey(str);
                updateData(key, obj);
                if (log.isDebugEnabled()) {
                    log.debug("{} applying existing item key={} value={}", this.name, key, obj);
                }
                Iterator<BiConsumer<String, T>> it = this.existingItemListeners.iterator();
                while (it.hasNext()) {
                    try {
                        it.next().accept(key, obj);
                    } catch (Throwable th) {
                        log.error("{} failed to listen existing item key:{}, val:{}", this.name, key, obj, th);
                        throw th;
                    }
                }
            });
        });
    }

    private void fill() throws MetadataStoreException {
        long currentTimeMillis = System.currentTimeMillis() + 300000;
        log.info("{} start filling existing items under the pathPrefix:{}", this.name, this.pathPrefix);
        ConcurrentLinkedDeque concurrentLinkedDeque = new ConcurrentLinkedDeque();
        ArrayList arrayList = new ArrayList();
        concurrentLinkedDeque.add(this.pathPrefix);
        LongAdder longAdder = new LongAdder();
        while (!concurrentLinkedDeque.isEmpty()) {
            long currentTimeMillis2 = System.currentTimeMillis();
            if (currentTimeMillis2 >= currentTimeMillis) {
                String str = this.name;
                long seconds = TimeUnit.MILLISECONDS.toSeconds(300000L);
                longAdder.sum();
                String str2 = str + " failed to fill existing items in " + seconds + " secs. Filled count:" + str;
                log.error(str2);
                throw new MetadataStoreException(str2);
            }
            int min = Math.min(50, concurrentLinkedDeque.size());
            for (int i = 0; i < min; i++) {
                String str3 = (String) concurrentLinkedDeque.poll();
                arrayList.add(this.store.getChildren(str3).thenCompose(list -> {
                    if (list.isEmpty()) {
                        longAdder.increment();
                        return handleExisting(str3);
                    }
                    Iterator it = list.iterator();
                    while (it.hasNext()) {
                        concurrentLinkedDeque.add(str3 + "/" + ((String) it.next()));
                    }
                    return CompletableFuture.completedFuture(null);
                }));
            }
            try {
                FutureUtil.waitForAll(arrayList).get(Math.min(this.timeoutInMillis, currentTimeMillis - currentTimeMillis2), TimeUnit.MILLISECONDS);
                arrayList.clear();
            } catch (Throwable th) {
                Throwable unwrapCompletionException = FutureUtil.unwrapCompletionException(th);
                log.error("{} failed to fill existing items", this.name, unwrapCompletionException);
                throw new MetadataStoreException(unwrapCompletionException);
            }
        }
        log.info("{} completed filling existing items with size:{}", this.name, Long.valueOf(longAdder.sum()));
    }

    private String getPath(String str) {
        return this.pathPrefix + "/" + str;
    }

    private String getKey(String str) {
        return str.replaceFirst(this.pathPrefix + "/", "");
    }

    public boolean exists(String str) {
        return this.immutableData.containsKey(str);
    }

    @Override // org.apache.pulsar.metadata.api.MetadataStoreTableView
    public T get(String str) {
        return this.data.get(str);
    }

    @Override // org.apache.pulsar.metadata.api.MetadataStoreTableView
    public CompletableFuture<Void> put(String str, T t) {
        String path = getPath(str);
        return this.cache.readModifyUpdateOrCreate(path, optional -> {
            if (this.conflictResolver.test(optional.orElse(null), t)) {
                return t;
            }
            throw new MetadataStoreTableView.ConflictException(String.format("Failed to update from old:%s to value:%s", optional, t));
        }).thenCompose((Function) obj -> {
            return doHandleNotification(path);
        });
    }

    @Override // org.apache.pulsar.metadata.api.MetadataStoreTableView
    public CompletableFuture<Void> delete(String str) {
        String path = getPath(str);
        return this.cache.delete(path).thenCompose(r5 -> {
            return doHandleNotification(path);
        });
    }

    public int size() {
        return this.immutableData.size();
    }

    public boolean isEmpty() {
        return this.immutableData.isEmpty();
    }

    @Override // org.apache.pulsar.metadata.api.MetadataStoreTableView
    public Set<Map.Entry<String, T>> entrySet() {
        return this.immutableData.entrySet();
    }

    public Set<String> keySet() {
        return this.immutableData.keySet();
    }

    public Collection<T> values() {
        return this.immutableData.values();
    }

    public void forEach(BiConsumer<String, T> biConsumer) {
        this.immutableData.forEach(biConsumer);
    }

    public static <T> MetadataStoreTableViewImplBuilder<T> builder() {
        return new MetadataStoreTableViewImplBuilder<>();
    }
}
