package org.apache.pulsar.metadata.bookkeeper;

import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerMetadataBuilder;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerMetadataSerDe;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.util.StringUtils;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.CacheGetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.zookeeper.AsyncCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-metadata-2.10.0-rc-202201272205.jar:org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManager.class */
public class PulsarLedgerManager implements LedgerManager {
    private final String ledgerRootPath;
    private final MetadataStore store;
    private final MetadataCache<LedgerMetadata> cache;
    private final LegacyHierarchicalLedgerManager legacyLedgerManager;
    private final LongHierarchicalLedgerManager longLedgerManager;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PulsarLedgerManager.class);
    private static final Pattern ledgerPathRegex = Pattern.compile("/L[0-9]+$");
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-bk-ledger-manager-scheduler"));
    protected final ConcurrentMap<Long, Set<BookkeeperInternalCallbacks.LedgerMetadataListener>> listeners = new ConcurrentHashMap();
    private final LedgerMetadataSerDe serde = new LedgerMetadataSerDe();

    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-metadata-2.10.0-rc-202201272205.jar:org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManager$ReadLedgerMetadataTask.class */
    private class ReadLedgerMetadataTask implements Runnable {
        final long ledgerId;

        ReadLedgerMetadataTask(long j) {
            this.ledgerId = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (null != PulsarLedgerManager.this.listeners.get(Long.valueOf(this.ledgerId))) {
                if (PulsarLedgerManager.log.isDebugEnabled()) {
                    PulsarLedgerManager.log.debug("Re-read ledger metadata for {}.", Long.valueOf(this.ledgerId));
                }
                PulsarLedgerManager.this.readLedgerMetadata(this.ledgerId).whenComplete((versioned, th) -> {
                    handleMetadata(versioned, th);
                });
            } else if (PulsarLedgerManager.log.isDebugEnabled()) {
                PulsarLedgerManager.log.debug("Ledger metadata listener for ledger {} is already removed.", Long.valueOf(this.ledgerId));
            }
        }

        private void handleMetadata(Versioned<LedgerMetadata> versioned, Throwable th) {
            if (th == null) {
                Set<BookkeeperInternalCallbacks.LedgerMetadataListener> set = PulsarLedgerManager.this.listeners.get(Long.valueOf(this.ledgerId));
                if (null != set) {
                    if (PulsarLedgerManager.log.isDebugEnabled()) {
                        PulsarLedgerManager.log.debug("Ledger metadata is changed for {} : {}.", Long.valueOf(this.ledgerId), versioned);
                    }
                    PulsarLedgerManager.this.scheduler.submit(() -> {
                        synchronized (set) {
                            Iterator it = set.iterator();
                            while (it.hasNext()) {
                                ((BookkeeperInternalCallbacks.LedgerMetadataListener) it.next()).onChanged(this.ledgerId, versioned);
                            }
                        }
                    });
                    return;
                }
                return;
            }
            if (BKException.getExceptionCode(th) != -25) {
                PulsarLedgerManager.log.warn("Failed on read ledger metadata of ledger {}: {}", Long.valueOf(this.ledgerId), Integer.valueOf(BKException.getExceptionCode(th)));
                PulsarLedgerManager.this.scheduler.schedule(this, 10L, TimeUnit.SECONDS);
                return;
            }
            Set<BookkeeperInternalCallbacks.LedgerMetadataListener> remove = PulsarLedgerManager.this.listeners.remove(Long.valueOf(this.ledgerId));
            if (null != remove) {
                if (PulsarLedgerManager.log.isDebugEnabled()) {
                    PulsarLedgerManager.log.debug("Removed ledger metadata listener set on ledger {} as its ledger is deleted : {}", Long.valueOf(this.ledgerId), Integer.valueOf(remove.size()));
                }
                synchronized (remove) {
                    Iterator<BookkeeperInternalCallbacks.LedgerMetadataListener> it = remove.iterator();
                    while (it.hasNext()) {
                        it.next().onChanged(this.ledgerId, null);
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PulsarLedgerManager(MetadataStore metadataStore, String str) {
        this.ledgerRootPath = str;
        this.store = metadataStore;
        this.legacyLedgerManager = new LegacyHierarchicalLedgerManager(metadataStore, this.scheduler, this.ledgerRootPath);
        this.longLedgerManager = new LongHierarchicalLedgerManager(metadataStore, this.scheduler, this.ledgerRootPath);
        this.cache = metadataStore.getMetadataCache(new MetadataSerde<LedgerMetadata>() { // from class: org.apache.pulsar.metadata.bookkeeper.PulsarLedgerManager.1
            @Override // org.apache.pulsar.metadata.api.MetadataSerde
            public byte[] serialize(String str2, LedgerMetadata ledgerMetadata) throws IOException {
                return PulsarLedgerManager.this.serde.serialize(ledgerMetadata);
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.pulsar.metadata.api.MetadataSerde
            public LedgerMetadata deserialize(String str2, byte[] bArr, Stat stat) throws IOException {
                return PulsarLedgerManager.this.serde.parseConfig(bArr, PulsarLedgerManager.this.getLedgerId(str2), Optional.of(Long.valueOf(stat.getCreationTimestamp())));
            }
        });
        metadataStore.registerListener(this::handleDataNotification);
    }

    @Override // org.apache.bookkeeper.meta.LedgerManager
    public CompletableFuture<Versioned<LedgerMetadata>> createLedgerMetadata(long j, LedgerMetadata ledgerMetadata) {
        LedgerMetadata build = ledgerMetadata.getMetadataFormatVersion() > 2 ? LedgerMetadataBuilder.from(ledgerMetadata).withId(j).build() : ledgerMetadata;
        try {
            LedgerMetadata ledgerMetadata2 = build;
            CompletableFuture thenApply = this.store.put(getLedgerPath(j), this.serde.serialize(build), Optional.of(-1L)).thenApply(stat -> {
                return new Versioned(ledgerMetadata2, new LongVersion(stat.getVersion()));
            });
            thenApply.exceptionally(th -> {
                log.error("Failed to create ledger {}: {}", Long.valueOf(j), th.getMessage());
                return null;
            });
            return thenApply;
        } catch (IOException e) {
            return FutureUtil.failedFuture(new BKException.BKMetadataSerializationException(e));
        }
    }

    @Override // org.apache.bookkeeper.meta.LedgerManager
    public CompletableFuture<Void> removeLedgerMetadata(long j, Version version) {
        Optional<Long> empty = Optional.empty();
        if (Version.NEW == version) {
            log.error("Request to delete ledger {} metadata with version set to the initial one", Long.valueOf(j));
            return FutureUtil.failedFuture(new BKException.BKMetadataVersionException());
        }
        if (Version.ANY != version) {
            if (!(version instanceof LongVersion)) {
                log.info("Not an instance of ZKVersion: {}", Long.valueOf(j));
                return FutureUtil.failedFuture(new BKException.BKMetadataVersionException());
            }
            empty = Optional.of(Long.valueOf(((LongVersion) version).getLongVersion()));
        }
        return this.store.delete(getLedgerPath(j), empty).thenRun(() -> {
            if (null != this.listeners.remove(Long.valueOf(j))) {
                if (log.isDebugEnabled()) {
                    log.debug("Remove registered ledger metadata listeners on ledger {} after ledger is deleted.", Long.valueOf(j));
                }
            } else if (log.isDebugEnabled()) {
                log.debug("No ledger metadata listeners to remove from ledger {} when it's being deleted.", Long.valueOf(j));
            }
        });
    }

    @Override // org.apache.bookkeeper.meta.LedgerManager
    public CompletableFuture<Versioned<LedgerMetadata>> readLedgerMetadata(long j) {
        CompletableFuture<Versioned<LedgerMetadata>> completableFuture = new CompletableFuture<>();
        String ledgerPath = getLedgerPath(j);
        this.cache.getWithStats(ledgerPath).thenAccept(optional -> {
            if (!optional.isPresent()) {
                if (log.isDebugEnabled()) {
                    log.debug("No such ledger: {} at path {}", Long.valueOf(j), ledgerPath);
                }
                completableFuture.completeExceptionally(new BKException.BKNoSuchLedgerExistsOnMetadataServerException());
            }
            completableFuture.complete(new Versioned((LedgerMetadata) ((CacheGetResult) optional.get()).getValue(), new LongVersion(((CacheGetResult) optional.get()).getStat().getVersion())));
        }).exceptionally(th -> {
            log.error("Could not read metadata for ledger: {}: {}", Long.valueOf(j), th.getMessage());
            completableFuture.completeExceptionally(new BKException.ZKException(th.getCause()));
            return null;
        });
        return completableFuture;
    }

    @Override // org.apache.bookkeeper.meta.LedgerManager
    public CompletableFuture<Versioned<LedgerMetadata>> writeLedgerMetadata(long j, LedgerMetadata ledgerMetadata, Version version) {
        if (!(version instanceof LongVersion)) {
            return FutureUtil.failedFuture(new BKException.BKMetadataVersionException());
        }
        LongVersion longVersion = (LongVersion) version;
        try {
            byte[] serialize = this.serde.serialize(ledgerMetadata);
            CompletableFuture<Versioned<LedgerMetadata>> completableFuture = new CompletableFuture<>();
            this.store.put(getLedgerPath(j), serialize, Optional.of(Long.valueOf(longVersion.getLongVersion()))).thenAccept(stat -> {
                completableFuture.complete(new Versioned(ledgerMetadata, new LongVersion(stat.getVersion())));
            }).exceptionally(th -> {
                if (th.getCause() instanceof MetadataStoreException.BadVersionException) {
                    completableFuture.completeExceptionally(new BKException.BKMetadataVersionException());
                    return null;
                }
                log.warn("Conditional update ledger metadata failed: {}", th.getMessage());
                completableFuture.completeExceptionally(new BKException.ZKException(th.getCause()));
                return null;
            });
            return completableFuture;
        } catch (IOException e) {
            return FutureUtil.failedFuture(new BKException.BKMetadataSerializationException(e));
        }
    }

    @Override // org.apache.bookkeeper.meta.LedgerManager
    public void registerLedgerMetadataListener(long j, BookkeeperInternalCallbacks.LedgerMetadataListener ledgerMetadataListener) {
        if (ledgerMetadataListener == null) {
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("Registered ledger metadata listener {} on ledger {}.", ledgerMetadataListener, Long.valueOf(j));
        }
        Set<BookkeeperInternalCallbacks.LedgerMetadataListener> computeIfAbsent = this.listeners.computeIfAbsent(Long.valueOf(j), l -> {
            return new HashSet();
        });
        synchronized (computeIfAbsent) {
            computeIfAbsent.add(ledgerMetadataListener);
        }
        new ReadLedgerMetadataTask(j).run();
    }

    @Override // org.apache.bookkeeper.meta.LedgerManager
    public void unregisterLedgerMetadataListener(long j, BookkeeperInternalCallbacks.LedgerMetadataListener ledgerMetadataListener) {
        Set<BookkeeperInternalCallbacks.LedgerMetadataListener> set = this.listeners.get(Long.valueOf(j));
        if (set == null) {
            return;
        }
        synchronized (set) {
            if (set.remove(ledgerMetadataListener) && log.isDebugEnabled()) {
                log.debug("Unregistered ledger metadata listener {} on ledger {}.", ledgerMetadataListener, Long.valueOf(j));
            }
            if (set.isEmpty()) {
                this.listeners.remove(Long.valueOf(j), set);
            }
        }
    }

    private void handleDataNotification(Notification notification) {
        if (notification.getPath().startsWith(this.ledgerRootPath) && ledgerPathRegex.matcher(notification.getPath()).matches()) {
            try {
                long ledgerId = getLedgerId(notification.getPath());
                switch (notification.getType()) {
                    case Modified:
                        new ReadLedgerMetadataTask(ledgerId).run();
                        return;
                    case Deleted:
                        Set<BookkeeperInternalCallbacks.LedgerMetadataListener> set = this.listeners.get(Long.valueOf(ledgerId));
                        if (set == null) {
                            if (log.isDebugEnabled()) {
                                log.debug("No ledger metadata listeners to remove from ledger {} after it's deleted.", Long.valueOf(ledgerId));
                                return;
                            }
                            return;
                        }
                        synchronized (set) {
                            if (log.isDebugEnabled()) {
                                log.debug("Removed ledger metadata listeners on ledger {} : {}", Long.valueOf(ledgerId), set);
                            }
                            Iterator<BookkeeperInternalCallbacks.LedgerMetadataListener> it = set.iterator();
                            while (it.hasNext()) {
                                it.next().onChanged(ledgerId, null);
                            }
                            this.listeners.remove(Long.valueOf(ledgerId), set);
                        }
                        return;
                    case Created:
                    case ChildrenChanged:
                    default:
                        if (log.isDebugEnabled()) {
                            log.debug("Received event {} on {}.", notification.getType(), notification.getPath());
                            return;
                        }
                        return;
                }
            } catch (IOException e) {
                log.warn("Received invalid ledger path {} : ", notification.getPath(), e);
            }
        }
    }

    @Override // org.apache.bookkeeper.meta.LedgerManager
    public void asyncProcessLedgers(BookkeeperInternalCallbacks.Processor<Long> processor, AsyncCallback.VoidCallback voidCallback, Object obj, int i, int i2) {
        this.legacyLedgerManager.asyncProcessLedgers(processor, (i3, str, obj2) -> {
            if (i3 == i2) {
                voidCallback.processResult(i3, str, obj2);
            } else {
                this.longLedgerManager.asyncProcessLedgers(processor, voidCallback, obj, i, i2);
            }
        }, obj, i, i2);
    }

    @Override // org.apache.bookkeeper.meta.LedgerManager
    public LedgerManager.LedgerRangeIterator getLedgerRanges(long j) {
        return new CombinedLedgerRangeIterator(new LegacyHierarchicalLedgerRangeIterator(this.store, this.ledgerRootPath), new LongHierarchicalLedgerRangeIterator(this.store, this.ledgerRootPath));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.scheduler.shutdownNow();
        try {
            this.scheduler.awaitTermination(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IOException(e);
        }
    }

    private String getLedgerPath(long j) {
        return this.ledgerRootPath + StringUtils.getHybridHierarchicalLedgerPath(j);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long getLedgerId(String str) throws IOException {
        if (str.startsWith(this.ledgerRootPath)) {
            return StringUtils.stringToLongHierarchicalLedgerId(str.substring(this.ledgerRootPath.length() + 1));
        }
        throw new IOException("it is not a valid hashed path name : " + str);
    }

    public boolean isLedgerParentNode(String str) {
        return str.matches(StringUtils.HIERARCHICAL_LEDGER_PARENT_NODE_REGEX);
    }
}
