package org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta;

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.base.Preconditions;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.BKException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.LedgerManager;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.metastore.MSException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.metastore.MSWatchedEvent;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.metastore.MetaStore;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.metastore.MetastoreCallback;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.metastore.MetastoreCursor;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.metastore.MetastoreException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.metastore.MetastoreFactory;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.metastore.MetastoreScannableTable;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.metastore.MetastoreTable;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.metastore.MetastoreTableItem;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.metastore.MetastoreUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.metastore.MetastoreWatcher;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.metastore.Value;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.replication.ReplicationException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.util.StringUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.versioning.Version;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.versioning.Versioned;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.configuration.ConfigurationException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.AsyncCallback;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.KeeperException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.ZKUtil;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/bookkeeper/meta/MSLedgerManagerFactory.class */
public class MSLedgerManagerFactory extends AbstractZkLedgerManagerFactory {
    private static final Logger log = LoggerFactory.getLogger(MSLedgerManagerFactory.class);
    private static final Logger LOG = LoggerFactory.getLogger(MSLedgerManagerFactory.class);
    private static final int MS_CONNECT_BACKOFF_MS = 200;
    public static final int CUR_VERSION = 1;
    public static final String NAME = "ms";
    public static final String TABLE_NAME = "LEDGER";
    public static final String META_FIELD = ".META";
    AbstractConfiguration conf;
    MetaStore metastore;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/bookkeeper/meta/MSLedgerManagerFactory$AsyncSetProcessor.class */
    public static class AsyncSetProcessor<T> {
        ScheduledExecutorService scheduler;

        public AsyncSetProcessor(ScheduledExecutorService scheduledExecutorService) {
            this.scheduler = scheduledExecutorService;
        }

        public void process(Set<T> set, final BookkeeperInternalCallbacks.Processor<T> processor, final AsyncCallback.VoidCallback voidCallback, final Object obj, final int i, final int i2) {
            if (set == null || set.size() == 0) {
                voidCallback.processResult(i, null, obj);
                return;
            }
            final Iterator<T> it = set.iterator();
            processor.process(it.next(), new AsyncCallback.VoidCallback() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.MSLedgerManagerFactory.AsyncSetProcessor.1
                @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.AsyncCallback.VoidCallback
                public void processResult(int i3, String str, Object obj2) {
                    if (i3 != i) {
                        voidCallback.processResult(i2, null, obj);
                    } else if (!it.hasNext()) {
                        voidCallback.processResult(i, null, obj);
                    } else {
                        final Object next = it.next();
                        AsyncSetProcessor.this.scheduler.submit(new Runnable() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.MSLedgerManagerFactory.AsyncSetProcessor.1.1
                            /* JADX WARN: Multi-variable type inference failed */
                            @Override // java.lang.Runnable
                            public void run() {
                                processor.process(next, this);
                            }
                        });
                    }
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/bookkeeper/meta/MSLedgerManagerFactory$MsLedgerManager.class */
    public static class MsLedgerManager implements LedgerManager, MetastoreWatcher {
        final ZooKeeper zk;
        final AbstractConfiguration conf;
        final MetaStore metastore;
        final MetastoreScannableTable ledgerTable;
        final int maxEntriesPerScan;
        static final String IDGEN_ZNODE = "ms-idgen";
        ScheduledExecutorService scheduler;
        protected final ConcurrentMap<Long, Set<BookkeeperInternalCallbacks.LedgerMetadataListener>> listeners = new ConcurrentHashMap();
        private final LedgerMetadataSerDe serDe = new LedgerMetadataSerDe();

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/bookkeeper/meta/MSLedgerManagerFactory$MsLedgerManager$MSLedgerRangeIterator.class */
        public class MSLedgerRangeIterator implements LedgerManager.LedgerRangeIterator {
            final CountDownLatch openCursorLatch = new CountDownLatch(1);
            MetastoreCursor cursor = null;

            MSLedgerRangeIterator() {
                MsLedgerManager.this.ledgerTable.openCursor(MetastoreTable.NON_FIELDS, new MetastoreCallback<MetastoreCursor>() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.MSLedgerManagerFactory.MsLedgerManager.MSLedgerRangeIterator.1
                    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.metastore.MetastoreCallback
                    public void complete(int i, MetastoreCursor metastoreCursor, Object obj) {
                        if (MSException.Code.OK.getCode() != i) {
                            MSLedgerManagerFactory.LOG.error("Error opening cursor for ledger range iterator {}", Integer.valueOf(i));
                        } else {
                            MSLedgerRangeIterator.this.cursor = metastoreCursor;
                        }
                        MSLedgerRangeIterator.this.openCursorLatch.countDown();
                    }
                }, null);
            }

            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator
            public boolean hasNext() throws IOException {
                try {
                    this.openCursorLatch.await();
                    if (this.cursor == null) {
                        throw new IOException("Failed to open ledger range cursor, check logs");
                    }
                    return this.cursor.hasMoreEntries();
                } catch (InterruptedException e) {
                    MSLedgerManagerFactory.LOG.error("Interrupted waiting for cursor to open", e);
                    Thread.currentThread().interrupt();
                    throw new IOException("Interrupted waiting to read range", e);
                }
            }

            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator
            public LedgerManager.LedgerRange next() throws IOException {
                try {
                    TreeSet treeSet = new TreeSet();
                    Iterator<MetastoreTableItem> readEntries = this.cursor.readEntries(MsLedgerManager.this.maxEntriesPerScan);
                    while (readEntries.hasNext()) {
                        treeSet.add(MSLedgerManagerFactory.key2LedgerId(readEntries.next().getKey()));
                    }
                    return new LedgerManager.LedgerRange(treeSet);
                } catch (MSException e) {
                    MSLedgerManagerFactory.LOG.error("Exception occurred reading from metastore", e);
                    throw new IOException("Couldn't read from metastore", e);
                }
            }
        }

        /* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/bookkeeper/meta/MSLedgerManagerFactory$MsLedgerManager$ReadLedgerMetadataTask.class */
        protected class ReadLedgerMetadataTask implements Runnable {
            final long ledgerId;

            ReadLedgerMetadataTask(long j) {
                this.ledgerId = j;
            }

            @Override // java.lang.Runnable
            public void run() {
                if (null != MsLedgerManager.this.listeners.get(Long.valueOf(this.ledgerId))) {
                    if (MSLedgerManagerFactory.LOG.isDebugEnabled()) {
                        MSLedgerManagerFactory.LOG.debug("Re-read ledger metadata for {}.", Long.valueOf(this.ledgerId));
                    }
                    MsLedgerManager.this.readLedgerMetadata(this.ledgerId).whenComplete((versioned, th) -> {
                        handleMetadata(versioned, th);
                    });
                } else if (MSLedgerManagerFactory.LOG.isDebugEnabled()) {
                    MSLedgerManagerFactory.LOG.debug("Ledger metadata listener for ledger {} is already removed.", Long.valueOf(this.ledgerId));
                }
            }

            private void handleMetadata(Versioned<LedgerMetadata> versioned, Throwable th) {
                if (th == null) {
                    Set<BookkeeperInternalCallbacks.LedgerMetadataListener> set = MsLedgerManager.this.listeners.get(Long.valueOf(this.ledgerId));
                    if (null != set) {
                        if (MSLedgerManagerFactory.LOG.isDebugEnabled()) {
                            MSLedgerManagerFactory.LOG.debug("Ledger metadata is changed for {} : {}.", Long.valueOf(this.ledgerId), versioned.getValue());
                        }
                        MsLedgerManager.this.scheduler.submit(() -> {
                            synchronized (set) {
                                Iterator it = set.iterator();
                                while (it.hasNext()) {
                                    ((BookkeeperInternalCallbacks.LedgerMetadataListener) it.next()).onChanged(this.ledgerId, versioned);
                                }
                            }
                        });
                        return;
                    }
                    return;
                }
                if (BKException.getExceptionCode(th) != -25) {
                    MSLedgerManagerFactory.LOG.warn("Failed on read ledger metadata of ledger {}: {}", Long.valueOf(this.ledgerId), Integer.valueOf(BKException.getExceptionCode(th)));
                    MsLedgerManager.this.scheduler.schedule(this, 200L, TimeUnit.MILLISECONDS);
                    return;
                }
                Set<BookkeeperInternalCallbacks.LedgerMetadataListener> remove = MsLedgerManager.this.listeners.remove(Long.valueOf(this.ledgerId));
                if (null == remove || !MSLedgerManagerFactory.LOG.isDebugEnabled()) {
                    return;
                }
                MSLedgerManagerFactory.LOG.debug("Removed ledger metadata listener set on ledger {} as its ledger is deleted : {}", Long.valueOf(this.ledgerId), Integer.valueOf(remove.size()));
            }
        }

        MsLedgerManager(AbstractConfiguration abstractConfiguration, ZooKeeper zooKeeper, MetaStore metaStore) {
            this.conf = abstractConfiguration;
            this.zk = zooKeeper;
            this.metastore = metaStore;
            try {
                this.ledgerTable = metaStore.createScannableTable(MSLedgerManagerFactory.TABLE_NAME);
                this.maxEntriesPerScan = abstractConfiguration.getMetastoreMaxEntriesPerScan();
                this.scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("MSLedgerManagerScheduler-%d").build());
            } catch (MetastoreException e) {
                MSLedgerManagerFactory.LOG.error("Failed to instantiate table LEDGER in metastore " + metaStore.getName());
                throw new RuntimeException("Failed to instantiate table LEDGER in metastore " + metaStore.getName());
            }
        }

        @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.metastore.MetastoreWatcher
        public void process(MSWatchedEvent mSWatchedEvent) {
            long longValue = MSLedgerManagerFactory.key2LedgerId(mSWatchedEvent.getKey()).longValue();
            switch (mSWatchedEvent.getType()) {
                case CHANGED:
                    new ReadLedgerMetadataTask(MSLedgerManagerFactory.key2LedgerId(mSWatchedEvent.getKey()).longValue()).run();
                    return;
                case REMOVED:
                    Set<BookkeeperInternalCallbacks.LedgerMetadataListener> set = this.listeners.get(Long.valueOf(longValue));
                    if (set != null) {
                        synchronized (set) {
                            for (BookkeeperInternalCallbacks.LedgerMetadataListener ledgerMetadataListener : set) {
                                unregisterLedgerMetadataListener(longValue, ledgerMetadataListener);
                                ledgerMetadataListener.onChanged(longValue, null);
                            }
                        }
                        return;
                    }
                    return;
                default:
                    MSLedgerManagerFactory.LOG.warn("Unknown type: {}", mSWatchedEvent.getType());
                    return;
            }
        }

        @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.LedgerManager
        public void registerLedgerMetadataListener(long j, BookkeeperInternalCallbacks.LedgerMetadataListener ledgerMetadataListener) {
            if (null != ledgerMetadataListener) {
                MSLedgerManagerFactory.LOG.info("Registered ledger metadata listener {} on ledger {}.", ledgerMetadataListener, Long.valueOf(j));
                Set<BookkeeperInternalCallbacks.LedgerMetadataListener> set = this.listeners.get(Long.valueOf(j));
                if (set == null) {
                    HashSet hashSet = new HashSet();
                    Set<BookkeeperInternalCallbacks.LedgerMetadataListener> putIfAbsent = this.listeners.putIfAbsent(Long.valueOf(j), hashSet);
                    set = null != putIfAbsent ? putIfAbsent : hashSet;
                }
                synchronized (set) {
                    set.add(ledgerMetadataListener);
                }
                new ReadLedgerMetadataTask(j).run();
            }
        }

        @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.LedgerManager
        public void unregisterLedgerMetadataListener(long j, BookkeeperInternalCallbacks.LedgerMetadataListener ledgerMetadataListener) {
            Set<BookkeeperInternalCallbacks.LedgerMetadataListener> set = this.listeners.get(Long.valueOf(j));
            if (set != null) {
                synchronized (set) {
                    if (set.remove(ledgerMetadataListener)) {
                        MSLedgerManagerFactory.LOG.info("Unregistered ledger metadata listener {} on ledger {}.", ledgerMetadataListener, Long.valueOf(j));
                    }
                    if (set.isEmpty()) {
                        this.listeners.remove(Long.valueOf(j), set);
                    }
                }
            }
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            try {
                this.scheduler.shutdown();
            } catch (Exception e) {
                MSLedgerManagerFactory.LOG.warn("Error when closing MsLedgerManager : ", e);
            }
            this.ledgerTable.close();
        }

        @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.LedgerManager
        public CompletableFuture<Versioned<LedgerMetadata>> createLedgerMetadata(final long j, final LedgerMetadata ledgerMetadata) {
            final CompletableFuture<Versioned<LedgerMetadata>> completableFuture = new CompletableFuture<>();
            try {
                this.ledgerTable.put(MSLedgerManagerFactory.ledgerId2Key(Long.valueOf(j)), new Value().setField(MSLedgerManagerFactory.META_FIELD, this.serDe.serialize(ledgerMetadata)), Version.NEW, new MetastoreCallback<Version>() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.MSLedgerManagerFactory.MsLedgerManager.1
                    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.metastore.MetastoreCallback
                    public void complete(int i, Version version, Object obj) {
                        if (MSException.Code.BadVersion.getCode() == i) {
                            completableFuture.completeExceptionally(new BKException.BKMetadataVersionException());
                        } else {
                            if (MSException.Code.OK.getCode() != i) {
                                completableFuture.completeExceptionally(new BKException.MetaStoreException());
                                return;
                            }
                            if (MSLedgerManagerFactory.LOG.isDebugEnabled()) {
                                MSLedgerManagerFactory.LOG.debug("Create ledger {} with version {} successfully.", Long.valueOf(j), version);
                            }
                            completableFuture.complete(new Versioned(ledgerMetadata, version));
                        }
                    }
                }, null);
                return completableFuture;
            } catch (IOException e) {
                completableFuture.completeExceptionally(new BKException.BKMetadataSerializationException(e));
                return completableFuture;
            }
        }

        @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.LedgerManager
        public CompletableFuture<Void> removeLedgerMetadata(final long j, Version version) {
            final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            this.ledgerTable.remove(MSLedgerManagerFactory.ledgerId2Key(Long.valueOf(j)), version, new MetastoreCallback<Void>() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.MSLedgerManagerFactory.MsLedgerManager.2
                @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.metastore.MetastoreCallback
                public void complete(int i, Void r7, Object obj) {
                    if (MSException.Code.NoKey.getCode() == i) {
                        MSLedgerManagerFactory.LOG.warn("Ledger entry does not exist in meta table: ledgerId={}", Long.valueOf(j));
                        completableFuture.completeExceptionally(new BKException.BKNoSuchLedgerExistsOnMetadataServerException());
                    } else if (MSException.Code.OK.getCode() == i) {
                        FutureUtils.complete(completableFuture, null);
                    } else {
                        completableFuture.completeExceptionally(new BKException.BKMetadataSerializationException());
                    }
                }
            }, null);
            return completableFuture;
        }

        @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.LedgerManager
        public CompletableFuture<Versioned<LedgerMetadata>> readLedgerMetadata(final long j) {
            final String ledgerId2Key = MSLedgerManagerFactory.ledgerId2Key(Long.valueOf(j));
            final CompletableFuture<Versioned<LedgerMetadata>> completableFuture = new CompletableFuture<>();
            this.ledgerTable.get(ledgerId2Key, this, new MetastoreCallback<Versioned<Value>>() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.MSLedgerManagerFactory.MsLedgerManager.3
                @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.metastore.MetastoreCallback
                public void complete(int i, Versioned<Value> versioned, Object obj) {
                    if (MSException.Code.NoKey.getCode() == i) {
                        MSLedgerManagerFactory.LOG.error("No ledger metadata found for ledger " + j + " : ", MSException.create(MSException.Code.get(i), "No key " + ledgerId2Key + " found."));
                        completableFuture.completeExceptionally(new BKException.BKNoSuchLedgerExistsOnMetadataServerException());
                    } else {
                        if (MSException.Code.OK.getCode() != i) {
                            MSLedgerManagerFactory.LOG.error("Could not read metadata for ledger " + j + " : ", MSException.create(MSException.Code.get(i), "Failed to get key " + ledgerId2Key));
                            completableFuture.completeExceptionally(new BKException.MetaStoreException());
                            return;
                        }
                        try {
                            completableFuture.complete(new Versioned(MsLedgerManager.this.serDe.parseConfig(versioned.getValue().getField(MSLedgerManagerFactory.META_FIELD), j, Optional.empty()), versioned.getVersion()));
                        } catch (IOException e) {
                            MSLedgerManagerFactory.LOG.error("Could not parse ledger metadata for ledger " + j + " : ", e);
                            completableFuture.completeExceptionally(new BKException.MetaStoreException());
                        }
                    }
                }
            }, MetastoreTable.ALL_FIELDS);
            return completableFuture;
        }

        @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.LedgerManager
        public CompletableFuture<Versioned<LedgerMetadata>> writeLedgerMetadata(final long j, final LedgerMetadata ledgerMetadata, Version version) {
            final CompletableFuture<Versioned<LedgerMetadata>> completableFuture = new CompletableFuture<>();
            try {
                Value field = new Value().setField(MSLedgerManagerFactory.META_FIELD, this.serDe.serialize(ledgerMetadata));
                if (MSLedgerManagerFactory.LOG.isDebugEnabled()) {
                    MSLedgerManagerFactory.LOG.debug("Writing ledger {} metadata, version {}", new Object[]{Long.valueOf(j), version});
                }
                final String ledgerId2Key = MSLedgerManagerFactory.ledgerId2Key(Long.valueOf(j));
                this.ledgerTable.put(ledgerId2Key, field, version, new MetastoreCallback<Version>() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.MSLedgerManagerFactory.MsLedgerManager.4
                    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.metastore.MetastoreCallback
                    public void complete(int i, Version version2, Object obj) {
                        if (MSException.Code.BadVersion.getCode() == i) {
                            MSLedgerManagerFactory.LOG.info("Bad version provided to updat metadata for ledger {}", Long.valueOf(j));
                            completableFuture.completeExceptionally(new BKException.BKMetadataVersionException());
                        } else if (MSException.Code.NoKey.getCode() == i) {
                            MSLedgerManagerFactory.LOG.warn("Ledger {} doesn't exist when writing its ledger metadata.", Long.valueOf(j));
                            completableFuture.completeExceptionally(new BKException.BKNoSuchLedgerExistsOnMetadataServerException());
                        } else if (MSException.Code.OK.getCode() == i) {
                            completableFuture.complete(new Versioned(ledgerMetadata, version2));
                        } else {
                            MSLedgerManagerFactory.LOG.warn("Conditional update ledger metadata failed: ", MSException.create(MSException.Code.get(i), "Failed to put key " + ledgerId2Key));
                            completableFuture.completeExceptionally(new BKException.MetaStoreException());
                        }
                    }
                }, null);
                return completableFuture;
            } catch (IOException e) {
                completableFuture.completeExceptionally(new BKException.MetaStoreException(e));
                return completableFuture;
            }
        }

        @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.LedgerManager
        public void asyncProcessLedgers(final BookkeeperInternalCallbacks.Processor<Long> processor, final AsyncCallback.VoidCallback voidCallback, final Object obj, final int i, final int i2) {
            this.ledgerTable.openCursor(MetastoreTable.NON_FIELDS, new MetastoreCallback<MetastoreCursor>() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.MSLedgerManagerFactory.MsLedgerManager.5
                @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.metastore.MetastoreCallback
                public void complete(int i3, MetastoreCursor metastoreCursor, Object obj2) {
                    if (MSException.Code.OK.getCode() != i3) {
                        voidCallback.processResult(i2, null, obj);
                    } else if (metastoreCursor.hasMoreEntries()) {
                        MsLedgerManager.this.asyncProcessLedgers(metastoreCursor, processor, voidCallback, obj, i, i2);
                    } else {
                        voidCallback.processResult(i, null, obj);
                    }
                }
            }, null);
        }

        void asyncProcessLedgers(final MetastoreCursor metastoreCursor, final BookkeeperInternalCallbacks.Processor<Long> processor, final AsyncCallback.VoidCallback voidCallback, final Object obj, final int i, final int i2) {
            this.scheduler.submit(new Runnable() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.MSLedgerManagerFactory.MsLedgerManager.6
                @Override // java.lang.Runnable
                public void run() {
                    MsLedgerManager.this.doAsyncProcessLedgers(metastoreCursor, processor, voidCallback, obj, i, i2);
                }
            });
        }

        void doAsyncProcessLedgers(final MetastoreCursor metastoreCursor, final BookkeeperInternalCallbacks.Processor<Long> processor, final AsyncCallback.VoidCallback voidCallback, final Object obj, final int i, final int i2) {
            if (!metastoreCursor.hasMoreEntries()) {
                voidCallback.processResult(i, null, obj);
            } else {
                metastoreCursor.asyncReadEntries(this.maxEntriesPerScan, new MetastoreCursor.ReadEntriesCallback() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.MSLedgerManagerFactory.MsLedgerManager.7
                    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.metastore.MetastoreCallback
                    public void complete(int i3, Iterator<MetastoreTableItem> it, Object obj2) {
                        if (MSException.Code.OK.getCode() != i3) {
                            voidCallback.processResult(i2, null, obj);
                            return;
                        }
                        TreeSet treeSet = new TreeSet();
                        while (it.hasNext()) {
                            MetastoreTableItem next = it.next();
                            try {
                                treeSet.add(MSLedgerManagerFactory.key2LedgerId(next.getKey()));
                            } catch (NumberFormatException e) {
                                MSLedgerManagerFactory.LOG.warn("Found invalid ledger key {}", next.getKey());
                            }
                        }
                        if (0 == treeSet.size()) {
                            MsLedgerManager.this.asyncProcessLedgers(metastoreCursor, processor, voidCallback, obj, i, i2);
                            return;
                        }
                        final long longValue = ((Long) treeSet.first()).longValue();
                        final long longValue2 = ((Long) treeSet.last()).longValue();
                        new AsyncSetProcessor(MsLedgerManager.this.scheduler).process(treeSet, processor, new AsyncCallback.VoidCallback() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.MSLedgerManagerFactory.MsLedgerManager.7.1
                            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.AsyncCallback.VoidCallback
                            public void processResult(int i4, String str, Object obj3) {
                                if (i == i4) {
                                    MsLedgerManager.this.asyncProcessLedgers(metastoreCursor, processor, voidCallback, obj, i, i2);
                                } else {
                                    MSLedgerManagerFactory.LOG.error("Failed when processing range " + MSLedgerManagerFactory.rangeToString(Long.valueOf(longValue), true, Long.valueOf(longValue2), true));
                                    voidCallback.processResult(i2, null, obj);
                                }
                            }
                        }, obj, i, i2);
                    }
                }, null);
            }
        }

        @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.LedgerManager
        public LedgerManager.LedgerRangeIterator getLedgerRanges(long j) {
            return new MSLedgerRangeIterator();
        }

        public static boolean isSpecialZnode(String str) {
            return BookKeeperConstants.AVAILABLE_NODE.equals(str) || BookKeeperConstants.COOKIE_NODE.equals(str) || BookKeeperConstants.LAYOUT_ZNODE.equals(str) || BookKeeperConstants.INSTANCEID.equals(str) || BookKeeperConstants.UNDER_REPLICATION_NODE.equals(str) || IDGEN_ZNODE.equals(str);
        }
    }

    /* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/bookkeeper/meta/MSLedgerManagerFactory$SyncResult.class */
    static class SyncResult<T> {
        T value;
        int rc;
        boolean finished = false;

        SyncResult() {
        }

        public synchronized void complete(int i, T t) {
            this.rc = i;
            this.value = t;
            this.finished = true;
            notify();
        }

        public synchronized void block() {
            while (!this.finished) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }

        public synchronized int getRetCode() {
            return this.rc;
        }

        public synchronized T getResult() {
            return this.value;
        }
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.LedgerManagerFactory
    public int getCurrentVersion() {
        return 1;
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.LedgerManagerFactory
    public LedgerManagerFactory initialize(AbstractConfiguration abstractConfiguration, LayoutManager layoutManager, int i) throws IOException {
        Preconditions.checkArgument(layoutManager instanceof ZkLayoutManager);
        ZkLayoutManager zkLayoutManager = (ZkLayoutManager) layoutManager;
        if (1 != i) {
            throw new IOException("Incompatible layout version found : " + i);
        }
        this.conf = abstractConfiguration;
        this.zk = zkLayoutManager.getZk();
        String metastoreImplClass = abstractConfiguration.getMetastoreImplClass();
        try {
            this.metastore = MetastoreFactory.createMetaStore(metastoreImplClass);
            this.metastore.init(abstractConfiguration, this.metastore.getVersion());
            return this;
        } catch (Throwable th) {
            throw new IOException("Failed to initialize metastore " + metastoreImplClass + " : ", th);
        }
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.LedgerManagerFactory, java.lang.AutoCloseable
    public void close() throws IOException {
        this.metastore.close();
    }

    static Long key2LedgerId(String str) {
        if (null == str) {
            return null;
        }
        return Long.valueOf(Long.parseLong(str, 10));
    }

    static String ledgerId2Key(Long l) {
        if (null == l) {
            return null;
        }
        return StringUtils.getZKStringId(l.longValue());
    }

    static String rangeToString(Long l, boolean z, Long l2, boolean z2) {
        StringBuilder sb = new StringBuilder();
        sb.append(z ? "[ " : "( ").append(l).append(" ~ ").append(l2).append(z2 ? " ]" : " )");
        return sb.toString();
    }

    static SortedSet<Long> entries2Ledgers(Iterator<MetastoreTableItem> it) {
        TreeSet treeSet = new TreeSet();
        while (it.hasNext()) {
            MetastoreTableItem next = it.next();
            try {
                treeSet.add(key2LedgerId(next.getKey()));
            } catch (NumberFormatException e) {
                LOG.warn("Found invalid ledger key {}", next.getKey());
            }
        }
        return treeSet;
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.LedgerManagerFactory
    public LedgerIdGenerator newLedgerIdGenerator() {
        return new ZkLedgerIdGenerator(this.zk, ZKMetadataDriverBase.resolveZkLedgersRootPath(this.conf), "ms-idgen", ZkUtils.getACLs(this.conf));
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.LedgerManagerFactory
    public LedgerManager newLedgerManager() {
        return new MsLedgerManager(this.conf, this.zk, this.metastore);
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.LedgerManagerFactory
    public LedgerUnderreplicationManager newLedgerUnderreplicationManager() throws ReplicationException.UnavailableException, InterruptedException, ReplicationException.CompatibilityException {
        return new ZkLedgerUnderreplicationManager(this.conf, this.zk);
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory, org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.LedgerManagerFactory
    public void format(AbstractConfiguration<?> abstractConfiguration, LayoutManager layoutManager) throws InterruptedException, KeeperException, IOException {
        try {
            try {
                MetastoreUtils.cleanTable(this.metastore.createScannableTable(TABLE_NAME), abstractConfiguration.getMetastoreMaxEntriesPerScan());
                LOG.info("Finished cleaning up table {}.", TABLE_NAME);
                try {
                    Class<? extends LedgerManagerFactory> ledgerManagerFactoryClass = abstractConfiguration.getLedgerManagerFactoryClass();
                    layoutManager.deleteLedgerLayout();
                    createNewLMFactory(abstractConfiguration, layoutManager, ledgerManagerFactoryClass);
                } catch (ConfigurationException e) {
                    throw new IOException("Failed to get ledger manager factory class from configuration : ", e);
                }
            } catch (MSException e2) {
                throw new IOException("Exception when cleanning up table LEDGER", e2);
            }
        } catch (MetastoreException e3) {
            throw new IOException("Failed to instantiate table LEDGER in metastore " + this.metastore.getName());
        }
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory, org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.LedgerManagerFactory
    public boolean validateAndNukeExistingCluster(AbstractConfiguration<?> abstractConfiguration, LayoutManager layoutManager) throws InterruptedException, KeeperException, IOException {
        String resolveZkLedgersRootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(abstractConfiguration);
        String resolveZkServers = ZKMetadataDriverBase.resolveZkServers(abstractConfiguration);
        for (String str : this.zk.getChildren(resolveZkLedgersRootPath, false)) {
            if (!MsLedgerManager.isSpecialZnode(str)) {
                log.error("Found unexpected znode : {} under ledgersRootPath : {} so exiting nuke operation", str, resolveZkLedgersRootPath);
                return false;
            }
        }
        format(abstractConfiguration, layoutManager);
        for (String str2 : this.zk.getChildren(resolveZkLedgersRootPath, false)) {
            if (!MsLedgerManager.isSpecialZnode(str2)) {
                log.error("Found unexpected znode : {} under ledgersRootPath : {} so exiting nuke operation", str2, resolveZkLedgersRootPath);
                return false;
            }
            ZKUtil.deleteRecursive(this.zk, resolveZkLedgersRootPath + "/" + str2);
        }
        this.zk.delete(resolveZkLedgersRootPath, -1);
        log.info("Successfully nuked existing cluster, ZKServers: {} ledger root path: {}", resolveZkServers, resolveZkLedgersRootPath);
        return true;
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.LedgerManagerFactory
    public LedgerAuditorManager newLedgerAuditorManager() {
        return new ZkLedgerAuditorManager(this.zk, new ServerConfiguration(this.conf), NullStatsLogger.INSTANCE);
    }
}
