package org.apache.pulsar.metadata.impl;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.bookkeeper.net.NodeBase;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataCacheConfig;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver;
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-metadata-2.10.1.11-rc-iterable.jar:org/apache/pulsar/metadata/impl/AbstractMetadataStore.class */
public abstract class AbstractMetadataStore implements MetadataStoreExtended, Consumer<Notification> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) AbstractMetadataStore.class);
    private static final long CACHE_REFRESH_TIME_MILLIS = TimeUnit.MINUTES.toMillis(5);
    private final AsyncLoadingCache<String, List<String>> childrenCache;
    private final AsyncLoadingCache<String, Boolean> existsCache;
    private final CopyOnWriteArrayList<Consumer<Notification>> listeners = new CopyOnWriteArrayList<>();
    private final CopyOnWriteArrayList<Consumer<SessionEvent>> sessionListeners = new CopyOnWriteArrayList<>();
    private final CopyOnWriteArrayList<MetadataCacheImpl<?>> metadataCaches = new CopyOnWriteArrayList<>();
    private boolean isConnected = true;
    protected final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory(AbstractMetadataDriver.METADATA_STORE_SCHEME));

    protected abstract CompletableFuture<List<String>> getChildrenFromStore(String str);

    protected abstract CompletableFuture<Boolean> existsFromStore(String str);

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractMetadataStore() {
        registerListener(this);
        this.childrenCache = Caffeine.newBuilder().refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS, TimeUnit.MILLISECONDS).expireAfterWrite(CACHE_REFRESH_TIME_MILLIS * 2, TimeUnit.MILLISECONDS).buildAsync(new AsyncCacheLoader<String, List<String>>() { // from class: org.apache.pulsar.metadata.impl.AbstractMetadataStore.1
            @Override // com.github.benmanes.caffeine.cache.AsyncCacheLoader
            public CompletableFuture<List<String>> asyncLoad(String str, Executor executor) {
                return AbstractMetadataStore.this.getChildrenFromStore(str);
            }

            @Override // com.github.benmanes.caffeine.cache.AsyncCacheLoader
            public CompletableFuture<List<String>> asyncReload(String str, List<String> list, Executor executor) {
                return AbstractMetadataStore.this.isConnected ? AbstractMetadataStore.this.getChildrenFromStore(str) : CompletableFuture.completedFuture(list);
            }
        });
        this.existsCache = Caffeine.newBuilder().refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS, TimeUnit.MILLISECONDS).expireAfterWrite(CACHE_REFRESH_TIME_MILLIS * 2, TimeUnit.MILLISECONDS).buildAsync(new AsyncCacheLoader<String, Boolean>() { // from class: org.apache.pulsar.metadata.impl.AbstractMetadataStore.2
            @Override // com.github.benmanes.caffeine.cache.AsyncCacheLoader
            public CompletableFuture<Boolean> asyncLoad(String str, Executor executor) {
                return AbstractMetadataStore.this.existsFromStore(str);
            }

            @Override // com.github.benmanes.caffeine.cache.AsyncCacheLoader
            public CompletableFuture<Boolean> asyncReload(String str, Boolean bool, Executor executor) {
                return AbstractMetadataStore.this.isConnected ? AbstractMetadataStore.this.existsFromStore(str) : CompletableFuture.completedFuture(bool);
            }
        });
    }

    @Override // org.apache.pulsar.metadata.api.MetadataStore
    public <T> MetadataCache<T> getMetadataCache(Class<T> cls, MetadataCacheConfig metadataCacheConfig) {
        MetadataCacheImpl<?> metadataCacheImpl = new MetadataCacheImpl<>(this, TypeFactory.defaultInstance().constructSimpleType(cls, null), metadataCacheConfig);
        this.metadataCaches.add(metadataCacheImpl);
        return metadataCacheImpl;
    }

    @Override // org.apache.pulsar.metadata.api.MetadataStore
    public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeReference, MetadataCacheConfig metadataCacheConfig) {
        MetadataCacheImpl<?> metadataCacheImpl = new MetadataCacheImpl<>((MetadataStore) this, (TypeReference<?>) typeReference, metadataCacheConfig);
        this.metadataCaches.add(metadataCacheImpl);
        return metadataCacheImpl;
    }

    @Override // org.apache.pulsar.metadata.api.MetadataStore
    public <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> metadataSerde, MetadataCacheConfig metadataCacheConfig) {
        MetadataCacheImpl<?> metadataCacheImpl = new MetadataCacheImpl<>((MetadataStore) this, (MetadataSerde<?>) metadataSerde, metadataCacheConfig);
        this.metadataCaches.add(metadataCacheImpl);
        return metadataCacheImpl;
    }

    @Override // org.apache.pulsar.metadata.api.MetadataStore
    public CompletableFuture<Optional<GetResult>> get(String str) {
        return !isValidPath(str) ? FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(str)) : storeGet(str);
    }

    protected abstract CompletableFuture<Optional<GetResult>> storeGet(String str);

    @Override // org.apache.pulsar.metadata.api.MetadataStore
    public CompletableFuture<Stat> put(String str, byte[] bArr, Optional<Long> optional) {
        return put(str, bArr, optional, EnumSet.noneOf(CreateOption.class));
    }

    @Override // org.apache.pulsar.metadata.api.MetadataStore
    public final CompletableFuture<List<String>> getChildren(String str) {
        return !isValidPath(str) ? FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(str)) : this.childrenCache.get(str);
    }

    @Override // org.apache.pulsar.metadata.api.MetadataStore
    public final CompletableFuture<Boolean> exists(String str) {
        return !isValidPath(str) ? FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(str)) : this.existsCache.get(str);
    }

    @Override // org.apache.pulsar.metadata.api.MetadataStore
    public void registerListener(Consumer<Notification> consumer) {
        this.listeners.add(consumer);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<Void> receivedNotification(Notification notification) {
        try {
            return CompletableFuture.supplyAsync(() -> {
                this.listeners.forEach(consumer -> {
                    try {
                        consumer.accept(notification);
                    } catch (Throwable th) {
                        log.error("Failed to process metadata store notification", th);
                    }
                });
                return null;
            }, this.executor);
        } catch (RejectedExecutionException e) {
            return FutureUtil.failedFuture(e);
        }
    }

    @Override // java.util.function.Consumer
    public void accept(Notification notification) {
        String path = notification.getPath();
        NotificationType type = notification.getType();
        if (type == NotificationType.Created || type == NotificationType.Deleted) {
            this.existsCache.synchronous().invalidate(path);
            String parent = parent(path);
            if (parent != null) {
                this.childrenCache.synchronous().invalidate(parent);
            }
        }
        if (type == NotificationType.ChildrenChanged) {
            this.childrenCache.synchronous().invalidate(path);
        }
        if (type == NotificationType.Created || type == NotificationType.Deleted || type == NotificationType.Modified) {
            this.metadataCaches.forEach(metadataCacheImpl -> {
                metadataCacheImpl.accept(notification);
            });
        }
    }

    protected abstract CompletableFuture<Void> storeDelete(String str, Optional<Long> optional);

    @Override // org.apache.pulsar.metadata.api.MetadataStore
    public final CompletableFuture<Void> delete(String str, Optional<Long> optional) {
        return !isValidPath(str) ? FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(str)) : storeDelete(str, optional).thenRun(() -> {
            this.existsCache.synchronous().invalidate(str);
            String parent = parent(str);
            if (parent != null) {
                this.childrenCache.synchronous().invalidate(parent);
            }
            this.metadataCaches.forEach(metadataCacheImpl -> {
                metadataCacheImpl.invalidate(str);
            });
        });
    }

    @Override // org.apache.pulsar.metadata.api.MetadataStore
    public CompletableFuture<Void> deleteRecursive(String str) {
        return getChildren(str).thenCompose(list -> {
            return FutureUtil.waitForAll((List<? extends CompletableFuture<?>>) list.stream().map(str2 -> {
                return deleteRecursive(str + NodeBase.PATH_SEPARATOR_STR + str2);
            }).collect(Collectors.toList()));
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r5 -> {
            return exists(str);
        }).thenCompose(bool -> {
            return bool.booleanValue() ? delete(str, Optional.empty()) : CompletableFuture.completedFuture(null);
        });
    }

    protected abstract CompletableFuture<Stat> storePut(String str, byte[] bArr, Optional<Long> optional, EnumSet<CreateOption> enumSet);

    @Override // org.apache.pulsar.metadata.api.extended.MetadataStoreExtended
    public final CompletableFuture<Stat> put(String str, byte[] bArr, Optional<Long> optional, EnumSet<CreateOption> enumSet) {
        return !isValidPath(str) ? FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(str)) : storePut(str, bArr, optional, enumSet).thenApply(stat -> {
            if ((stat.getVersion() == 0 ? NotificationType.Created : NotificationType.Modified) == NotificationType.Created) {
                this.existsCache.synchronous().invalidate(str);
                String parent = parent(str);
                if (parent != null) {
                    this.childrenCache.synchronous().invalidate(parent);
                }
            }
            this.metadataCaches.forEach(metadataCacheImpl -> {
                metadataCacheImpl.refresh(str);
            });
            return stat;
        });
    }

    @Override // org.apache.pulsar.metadata.api.extended.MetadataStoreExtended
    public void registerSessionListener(Consumer<SessionEvent> consumer) {
        this.sessionListeners.add(consumer);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void receivedSessionEvent(SessionEvent sessionEvent) {
        this.isConnected = sessionEvent.isConnected();
        this.sessionListeners.forEach(consumer -> {
            try {
                consumer.accept(sessionEvent);
            } catch (Throwable th) {
                log.warn("Error in processing session event", th);
            }
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.executor.shutdownNow();
        this.executor.awaitTermination(10L, TimeUnit.SECONDS);
    }

    @VisibleForTesting
    public void invalidateAll() {
        this.childrenCache.synchronous().invalidateAll();
        this.existsCache.synchronous().invalidateAll();
    }

    @VisibleForTesting
    public void execute(Runnable runnable, CompletableFuture<?> completableFuture) {
        try {
            this.executor.execute(runnable);
        } catch (Throwable th) {
            completableFuture.completeExceptionally(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static String parent(String str) {
        int lastIndexOf = str.lastIndexOf(47);
        if (lastIndexOf <= 0) {
            return null;
        }
        return str.substring(0, lastIndexOf);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean isValidPath(String str) {
        return StringUtils.equals(str, NodeBase.PATH_SEPARATOR_STR) || (StringUtils.isNotBlank(str) && str.startsWith(NodeBase.PATH_SEPARATOR_STR) && !str.endsWith(NodeBase.PATH_SEPARATOR_STR));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void notifyParentChildrenChanged(String str) {
        String parent = parent(str);
        while (true) {
            String str2 = parent;
            if (str2 == null) {
                return;
            }
            receivedNotification(new Notification(NotificationType.ChildrenChanged, str2));
            parent = parent(str2);
        }
    }

    public boolean isConnected() {
        return this.isConnected;
    }
}
