package org.apache.pulsar.metadata.bookkeeper;

import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-metadata-master-rc-daily.jar:org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.class */
public class PulsarRegistrationClient implements RegistrationClient {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PulsarRegistrationClient.class);
    private final MetadataStore store;
    private final String ledgersRootPath;
    private final String bookieRegistrationPath;
    private final String bookieAllRegistrationPath;
    private final String bookieReadonlyRegistrationPath;
    private final MetadataCache<BookieServiceInfo> bookieServiceInfoMetadataCache;
    private final ConcurrentHashMap<BookieId, Versioned<BookieServiceInfo>> bookieServiceInfoCache = new ConcurrentHashMap<>();
    private final Set<RegistrationClient.RegistrationListener> writableBookiesWatchers = new CopyOnWriteArraySet();
    private final Set<RegistrationClient.RegistrationListener> readOnlyBookiesWatchers = new CopyOnWriteArraySet();
    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-registration-client"));

    public PulsarRegistrationClient(MetadataStore metadataStore, String str) {
        this.store = metadataStore;
        this.ledgersRootPath = str;
        this.bookieServiceInfoMetadataCache = metadataStore.getMetadataCache(BookieServiceInfoSerde.INSTANCE);
        this.bookieRegistrationPath = str + "/available";
        this.bookieAllRegistrationPath = str + "/cookies";
        this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + "/readonly";
        metadataStore.registerListener(this::updatedBookies);
    }

    @Override // org.apache.bookkeeper.discover.RegistrationClient, java.lang.AutoCloseable
    public void close() {
        this.executor.shutdownNow();
    }

    @Override // org.apache.bookkeeper.discover.RegistrationClient
    public CompletableFuture<Versioned<Set<BookieId>>> getWritableBookies() {
        return getChildren(this.bookieRegistrationPath);
    }

    @Override // org.apache.bookkeeper.discover.RegistrationClient
    public CompletableFuture<Versioned<Set<BookieId>>> getAllBookies() {
        return getChildren(this.bookieAllRegistrationPath);
    }

    @Override // org.apache.bookkeeper.discover.RegistrationClient
    public CompletableFuture<Versioned<Set<BookieId>>> getReadOnlyBookies() {
        return getChildren(this.bookieReadonlyRegistrationPath);
    }

    private CompletableFuture<Versioned<Set<BookieId>>> getChildren(String str) {
        return this.store.getChildren(str).thenComposeAsync(list -> {
            Set<BookieId> convertToBookieAddresses = convertToBookieAddresses(list);
            ArrayList arrayList = new ArrayList(convertToBookieAddresses.size());
            for (BookieId bookieId : convertToBookieAddresses) {
                if (!this.bookieServiceInfoCache.containsKey(bookieId)) {
                    arrayList.add(readBookieServiceInfoAsync(bookieId));
                }
            }
            return arrayList.isEmpty() ? CompletableFuture.completedFuture(convertToBookieAddresses) : FutureUtil.waitForAll(arrayList).thenApply(r3 -> {
                return convertToBookieAddresses;
            });
        }).thenApply((Function<? super U, ? extends U>) set -> {
            return new Versioned(set, Version.NEW);
        });
    }

    @Override // org.apache.bookkeeper.discover.RegistrationClient
    public CompletableFuture<Void> watchWritableBookies(RegistrationClient.RegistrationListener registrationListener) {
        this.writableBookiesWatchers.add(registrationListener);
        CompletableFuture<Versioned<Set<BookieId>>> writableBookies = getWritableBookies();
        Objects.requireNonNull(registrationListener);
        return writableBookies.thenAcceptAsync(registrationListener::onBookiesChanged, (Executor) this.executor);
    }

    @Override // org.apache.bookkeeper.discover.RegistrationClient
    public void unwatchWritableBookies(RegistrationClient.RegistrationListener registrationListener) {
        this.writableBookiesWatchers.remove(registrationListener);
    }

    @Override // org.apache.bookkeeper.discover.RegistrationClient
    public CompletableFuture<Void> watchReadOnlyBookies(RegistrationClient.RegistrationListener registrationListener) {
        this.readOnlyBookiesWatchers.add(registrationListener);
        CompletableFuture<Versioned<Set<BookieId>>> readOnlyBookies = getReadOnlyBookies();
        Objects.requireNonNull(registrationListener);
        return readOnlyBookies.thenAcceptAsync(registrationListener::onBookiesChanged, (Executor) this.executor);
    }

    @Override // org.apache.bookkeeper.discover.RegistrationClient
    public void unwatchReadOnlyBookies(RegistrationClient.RegistrationListener registrationListener) {
        this.readOnlyBookiesWatchers.remove(registrationListener);
    }

    private void handleDeletedBookieNode(Notification notification) {
        BookieId stripBookieIdFromPath;
        if (notification.getType() != NotificationType.Deleted || (stripBookieIdFromPath = stripBookieIdFromPath(notification.getPath())) == null) {
            return;
        }
        log.info("Bookie {} disappeared", stripBookieIdFromPath);
        this.bookieServiceInfoCache.remove(stripBookieIdFromPath);
    }

    private void handleUpdatedBookieNode(Notification notification) {
        BookieId stripBookieIdFromPath = stripBookieIdFromPath(notification.getPath());
        if (stripBookieIdFromPath != null) {
            log.info("Bookie {} info updated", stripBookieIdFromPath);
            readBookieServiceInfoAsync(stripBookieIdFromPath);
        }
    }

    private void updatedBookies(Notification notification) {
        if (notification.getType() != NotificationType.Created && notification.getType() != NotificationType.Deleted) {
            if (notification.getType() == NotificationType.Modified) {
                if (notification.getPath().startsWith(this.bookieReadonlyRegistrationPath) || notification.getPath().startsWith(this.bookieRegistrationPath)) {
                    handleUpdatedBookieNode(notification);
                    return;
                }
                return;
            }
            return;
        }
        if (notification.getPath().startsWith(this.bookieReadonlyRegistrationPath)) {
            getReadOnlyBookies().thenAccept(versioned -> {
                this.readOnlyBookiesWatchers.forEach(registrationListener -> {
                    this.executor.execute(() -> {
                        registrationListener.onBookiesChanged(versioned);
                    });
                });
            });
            handleDeletedBookieNode(notification);
        } else if (notification.getPath().startsWith(this.bookieRegistrationPath)) {
            getWritableBookies().thenAccept(versioned2 -> {
                this.writableBookiesWatchers.forEach(registrationListener -> {
                    this.executor.execute(() -> {
                        registrationListener.onBookiesChanged(versioned2);
                    });
                });
            });
            handleDeletedBookieNode(notification);
        }
    }

    private static BookieId stripBookieIdFromPath(String str) {
        int lastIndexOf;
        if (str == null || (lastIndexOf = str.lastIndexOf(47)) < 0) {
            return null;
        }
        try {
            return BookieId.parse(str.substring(lastIndexOf + 1));
        } catch (IllegalArgumentException e) {
            log.warn("Cannot decode bookieId from {}", str, e);
            return null;
        }
    }

    private static Set<BookieId> convertToBookieAddresses(List<String> list) {
        HashSet hashSet = new HashSet();
        for (String str : list) {
            if (!BookKeeperConstants.READONLY.equals(str)) {
                hashSet.add(BookieId.parse(str));
            }
        }
        return hashSet;
    }

    @Override // org.apache.bookkeeper.discover.RegistrationClient
    public CompletableFuture<Versioned<BookieServiceInfo>> getBookieServiceInfo(BookieId bookieId) {
        Versioned<BookieServiceInfo> versioned = this.bookieServiceInfoCache.get(bookieId);
        if (log.isDebugEnabled()) {
            log.debug("getBookieServiceInfo {} -> {}", bookieId, versioned);
        }
        return versioned != null ? CompletableFuture.completedFuture(versioned) : FutureUtils.exception(new BKException.BKBookieHandleNotAvailableException());
    }

    public CompletableFuture<Void> readBookieServiceInfoAsync(BookieId bookieId) {
        return this.bookieServiceInfoMetadataCache.get(this.bookieRegistrationPath + "/" + bookieId).thenCompose(optional -> {
            if (!optional.isPresent()) {
                return readBookieInfoAsReadonlyBookie(bookieId);
            }
            Versioned<BookieServiceInfo> versioned = new Versioned<>((BookieServiceInfo) optional.get(), new LongVersion(-1L));
            log.info("Update BookieInfoCache (writable bookie) {} -> {}", bookieId, optional.get());
            this.bookieServiceInfoCache.put(bookieId, versioned);
            return CompletableFuture.completedFuture(null);
        });
    }

    final CompletableFuture<Void> readBookieInfoAsReadonlyBookie(BookieId bookieId) {
        return this.bookieServiceInfoMetadataCache.get(this.bookieReadonlyRegistrationPath + "/" + bookieId).thenApply(optional -> {
            if (!optional.isPresent()) {
                return null;
            }
            Versioned<BookieServiceInfo> versioned = new Versioned<>((BookieServiceInfo) optional.get(), new LongVersion(-1L));
            log.info("Update BookieInfoCache (readonly bookie) {} -> {}", bookieId, optional.get());
            this.bookieServiceInfoCache.put(bookieId, versioned);
            return null;
        });
    }
}
