package org.apache.pulsar.metadata.bookkeeper;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.ProtocolStringList;
import com.google.protobuf.TextFormat;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import okhttp3.internal.ws.WebSocketProtocol;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.UnderreplicatedLedger;
import org.apache.bookkeeper.net.DNS;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.proto.DataFormats;
import org.apache.bookkeeper.replication.ReplicationEnableCb;
import org.apache.bookkeeper.replication.ReplicationException;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-metadata-2.10.0-rc-202203082205.jar:org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.class */
public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicationManager {
    static final String LAYOUT = "BASIC";
    static final int LAYOUT_VERSION = 1;
    private final Map<Long, Lock> heldLocks = new ConcurrentHashMap();
    private final AbstractConfiguration conf;
    private final String basePath;
    private final String urLedgerPath;
    private final String urLockPath;
    private final String layoutPath;
    private final String lostBookieRecoveryDelayPath;
    private final String checkAllLedgersCtimePath;
    private final String placementPolicyCheckCtimePath;
    private final String replicasCheckCtimePath;
    private final MetadataStoreExtended store;
    private BookkeeperInternalCallbacks.GenericCallback<Void> replicationEnabledListener;
    private BookkeeperInternalCallbacks.GenericCallback<Void> lostBookieRecoveryDelayListener;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PulsarLedgerUnderreplicationManager.class);
    private static final byte[] LOCK_DATA = getLockData();
    private static final Pattern ID_EXTRACTION_PATTERN = Pattern.compile("urL(\\d+)$");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-metadata-2.10.0-rc-202203082205.jar:org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager$Lock.class */
    public static class Lock {
        private final String lockPath;
        private final long ledgerNodeVersion;

        Lock(String str, long j) {
            this.lockPath = str;
            this.ledgerNodeVersion = j;
        }

        String getLockPath() {
            return this.lockPath;
        }

        long getLedgerNodeVersion() {
            return this.ledgerNodeVersion;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-metadata-2.10.0-rc-202203082205.jar:org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager$PulsarUnderreplicatedLedger.class */
    public static class PulsarUnderreplicatedLedger extends UnderreplicatedLedger {
        PulsarUnderreplicatedLedger(long j) {
            super(j);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.bookkeeper.meta.UnderreplicatedLedger
        public void setCtime(long j) {
            super.setCtime(j);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.bookkeeper.meta.UnderreplicatedLedger
        public void setReplicaList(List<String> list) {
            super.setReplicaList(list);
        }
    }

    public PulsarLedgerUnderreplicationManager(AbstractConfiguration<?> abstractConfiguration, MetadataStoreExtended metadataStoreExtended, String str) throws ReplicationException.CompatibilityException {
        this.conf = abstractConfiguration;
        this.basePath = getBasePath(str);
        this.layoutPath = this.basePath + '/' + BookKeeperConstants.LAYOUT_ZNODE;
        this.urLedgerPath = this.basePath + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH;
        this.urLockPath = this.basePath + '/' + BookKeeperConstants.UNDER_REPLICATION_LOCK;
        this.lostBookieRecoveryDelayPath = this.basePath + '/' + BookKeeperConstants.LOSTBOOKIERECOVERYDELAY_NODE;
        this.checkAllLedgersCtimePath = this.basePath + '/' + BookKeeperConstants.CHECK_ALL_LEDGERS_CTIME;
        this.placementPolicyCheckCtimePath = this.basePath + '/' + BookKeeperConstants.PLACEMENT_POLICY_CHECK_CTIME;
        this.replicasCheckCtimePath = this.basePath + '/' + BookKeeperConstants.REPLICAS_CHECK_CTIME;
        this.store = metadataStoreExtended;
        metadataStoreExtended.registerListener(this::handleNotification);
        checkLayout();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static String getBasePath(String str) {
        return String.format("%s/%s", str, BookKeeperConstants.UNDER_REPLICATION_NODE);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static String getUrLockPath(String str) {
        return String.format("%s/%s", getBasePath(str), BookKeeperConstants.UNDER_REPLICATION_LOCK);
    }

    public static byte[] getLockData() {
        DataFormats.LockDataFormat.Builder newBuilder = DataFormats.LockDataFormat.newBuilder();
        try {
            newBuilder.setBookieId(DNS.getDefaultHost("default"));
        } catch (UnknownHostException e) {
        }
        return newBuilder.build().toString().getBytes(StandardCharsets.UTF_8);
    }

    private void checkLayout() throws ReplicationException.CompatibilityException {
        while (!this.store.exists(this.layoutPath).join().booleanValue()) {
            DataFormats.LedgerRereplicationLayoutFormat.Builder newBuilder = DataFormats.LedgerRereplicationLayoutFormat.newBuilder();
            newBuilder.setType("BASIC").setVersion(1);
            this.store.put(this.layoutPath, newBuilder.build().toString().getBytes(StandardCharsets.UTF_8), Optional.of(-1L)).join();
        }
        byte[] value = this.store.get(this.layoutPath).join().get().getValue();
        DataFormats.LedgerRereplicationLayoutFormat.Builder newBuilder2 = DataFormats.LedgerRereplicationLayoutFormat.newBuilder();
        try {
            TextFormat.merge(new String(value, StandardCharsets.UTF_8), newBuilder2);
            DataFormats.LedgerRereplicationLayoutFormat build = newBuilder2.build();
            if (build.getType().equals("BASIC") && build.getVersion() == 1) {
            } else {
                throw new ReplicationException.CompatibilityException("Incompatible layout found (BASIC:1)");
            }
        } catch (TextFormat.ParseException e) {
            throw new ReplicationException.CompatibilityException("Invalid data found", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long getLedgerId(String str) throws NumberFormatException {
        Matcher matcher = ID_EXTRACTION_PATTERN.matcher(str);
        if (matcher.find()) {
            return Long.parseLong(matcher.group(1));
        }
        throw new NumberFormatException("Couldn't find ledgerid in path");
    }

    private static String getParentPath(String str, long j) {
        return String.format("%s/%s/%s/%s/%s", str, String.format("%04x", Long.valueOf((j >> 48) & WebSocketProtocol.PAYLOAD_SHORT_MAX)), String.format("%04x", Long.valueOf((j >> 32) & WebSocketProtocol.PAYLOAD_SHORT_MAX)), String.format("%04x", Long.valueOf((j >> 16) & WebSocketProtocol.PAYLOAD_SHORT_MAX)), String.format("%04x", Long.valueOf(j & WebSocketProtocol.PAYLOAD_SHORT_MAX)));
    }

    public static String getUrLedgerPath(String str, long j) {
        return String.format("%s/urL%010d", getParentPath(str, j), Long.valueOf(j));
    }

    public static String getUrLedgerLockPath(String str, long j) {
        return String.format("%s/urL%010d", str, Long.valueOf(j));
    }

    private String getUrLedgerPath(long j) {
        return getUrLedgerPath(this.urLedgerPath, j);
    }

    private void handleNotification(Notification notification) {
        if (notification.getPath().startsWith(this.basePath)) {
            synchronized (this) {
                notifyAll();
                if (notification.getType() == NotificationType.Deleted) {
                    if (notification.getPath().equals(this.basePath + '/' + BookKeeperConstants.DISABLE_NODE)) {
                        log.info("LedgerReplication is enabled externally through MetadataStore, since DISABLE_NODE ZNode is deleted");
                        if (this.replicationEnabledListener != null) {
                            this.replicationEnabledListener.operationComplete(0, null);
                        }
                    } else if (notification.getPath().equals(this.lostBookieRecoveryDelayPath) && this.lostBookieRecoveryDelayListener != null) {
                        this.lostBookieRecoveryDelayListener.operationComplete(0, null);
                    }
                }
            }
        }
    }

    @Override // org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public UnderreplicatedLedger getLedgerUnreplicationInfo(long j) throws ReplicationException.UnavailableException {
        try {
            Optional<GetResult> optional = this.store.get(getUrLedgerPath(j)).get();
            if (!optional.isPresent()) {
                if (!log.isDebugEnabled()) {
                    return null;
                }
                log.debug("Ledger: {} is not marked underreplicated", Long.valueOf(j));
                return null;
            }
            byte[] value = optional.get().getValue();
            DataFormats.UnderreplicatedLedgerFormat.Builder newBuilder = DataFormats.UnderreplicatedLedgerFormat.newBuilder();
            TextFormat.merge(new String(value, StandardCharsets.UTF_8), newBuilder);
            DataFormats.UnderreplicatedLedgerFormat build = newBuilder.build();
            PulsarUnderreplicatedLedger pulsarUnderreplicatedLedger = new PulsarUnderreplicatedLedger(j);
            ProtocolStringList replicaList = build.getReplicaList();
            pulsarUnderreplicatedLedger.setCtime(build.hasCtime() ? build.getCtime() : -1L);
            pulsarUnderreplicatedLedger.setReplicaList(replicaList);
            return pulsarUnderreplicatedLedger;
        } catch (TextFormat.ParseException e) {
            throw new ReplicationException.UnavailableException("Error parsing proto message", e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while connecting metadata store", e2);
        } catch (ExecutionException e3) {
            throw new ReplicationException.UnavailableException("Error contacting with metadata store", e3);
        }
    }

    @Override // org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public CompletableFuture<Void> markLedgerUnderreplicatedAsync(long j, Collection<String> collection) {
        if (log.isDebugEnabled()) {
            log.debug("markLedgerUnderreplicated(ledgerId={}, missingReplica={})", Long.valueOf(j), collection);
        }
        String urLedgerPath = getUrLedgerPath(j);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        tryMarkLedgerUnderreplicatedAsync(urLedgerPath, collection, completableFuture);
        return completableFuture;
    }

    private void tryMarkLedgerUnderreplicatedAsync(String str, Collection<String> collection, CompletableFuture<Void> completableFuture) {
        DataFormats.UnderreplicatedLedgerFormat.Builder newBuilder = DataFormats.UnderreplicatedLedgerFormat.newBuilder();
        if (this.conf.getStoreSystemTimeAsLedgerUnderreplicatedMarkTime()) {
            newBuilder.setCtime(System.currentTimeMillis());
        }
        newBuilder.getClass();
        collection.forEach(newBuilder::addReplica);
        this.store.put(str, newBuilder.build().toString().getBytes(StandardCharsets.UTF_8), Optional.of(-1L)).thenRun(() -> {
            FutureUtils.complete(completableFuture, null);
        }).exceptionally(th -> {
            if (th.getCause() instanceof MetadataStoreException.BadVersionException) {
                handleLedgerUnderreplicatedAlreadyMarked(str, collection, completableFuture);
                return null;
            }
            FutureUtils.completeExceptionally(completableFuture, th);
            return null;
        });
    }

    private void handleLedgerUnderreplicatedAlreadyMarked(String str, Collection<String> collection, CompletableFuture<Void> completableFuture) {
        this.store.get(str).thenAccept(optional -> {
            if (!optional.isPresent()) {
                tryMarkLedgerUnderreplicatedAsync(str, collection, completableFuture);
                return;
            }
            byte[] value = ((GetResult) optional.get()).getValue();
            DataFormats.UnderreplicatedLedgerFormat.Builder newBuilder = DataFormats.UnderreplicatedLedgerFormat.newBuilder();
            try {
                TextFormat.merge(new String(value, StandardCharsets.UTF_8), newBuilder);
                DataFormats.UnderreplicatedLedgerFormat build = newBuilder.build();
                boolean z = false;
                Iterator it = collection.iterator();
                while (it.hasNext()) {
                    String str2 = (String) it.next();
                    if (!build.getReplicaList().contains(str2)) {
                        newBuilder.addReplica(str2);
                        z = true;
                    }
                }
                if (!z) {
                    FutureUtils.complete(completableFuture, null);
                    return;
                }
                if (this.conf.getStoreSystemTimeAsLedgerUnderreplicatedMarkTime()) {
                    newBuilder.setCtime(System.currentTimeMillis());
                }
                this.store.put(str, newBuilder.build().toString().getBytes(StandardCharsets.UTF_8), Optional.of(Long.valueOf(((GetResult) optional.get()).getStat().getVersion()))).thenRun(() -> {
                    FutureUtils.complete(completableFuture, null);
                }).exceptionally(th -> {
                    FutureUtils.completeExceptionally(completableFuture, th);
                    return null;
                });
            } catch (TextFormat.ParseException e) {
                FutureUtils.completeExceptionally(completableFuture, new ReplicationException.UnavailableException("Invalid underreplicated ledger data for ledger " + str, e));
            }
        }).exceptionally(th -> {
            FutureUtils.completeExceptionally(completableFuture, th);
            return null;
        });
    }

    @Override // org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public void acquireUnderreplicatedLedger(long j) throws ReplicationException {
        try {
            internalAcquireUnderreplicatedLedger(j);
        } catch (InterruptedException | ExecutionException e) {
            throw new ReplicationException.UnavailableException("Failed to acuire under-replicated ledger", e);
        }
    }

    private void internalAcquireUnderreplicatedLedger(long j) throws ExecutionException, InterruptedException {
        this.store.put(getUrLedgerLockPath(this.urLockPath, j), LOCK_DATA, Optional.of(-1L), EnumSet.of(CreateOption.Ephemeral)).get();
    }

    @Override // org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public void markLedgerReplicated(long j) throws ReplicationException.UnavailableException {
        if (log.isDebugEnabled()) {
            log.debug("markLedgerReplicated(ledgerId={})", Long.valueOf(j));
        }
        try {
            try {
                Lock lock = this.heldLocks.get(Long.valueOf(j));
                if (lock != null) {
                    this.store.delete(getUrLedgerPath(j), Optional.of(Long.valueOf(lock.getLedgerNodeVersion()))).get();
                }
                releaseUnderreplicatedLedger(j);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new ReplicationException.UnavailableException("Interrupted while contacting metadata store", e);
            } catch (ExecutionException e2) {
                if (!(e2.getCause() instanceof MetadataStoreException.NotFoundException) && !(e2.getCause() instanceof MetadataStoreException.BadVersionException)) {
                    log.error("Error deleting underreplicated ledger node", (Throwable) e2);
                    throw new ReplicationException.UnavailableException("Error contacting metadata store", e2);
                }
                releaseUnderreplicatedLedger(j);
            }
        } catch (Throwable th) {
            releaseUnderreplicatedLedger(j);
            throw th;
        }
    }

    @Override // org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public Iterator<UnderreplicatedLedger> listLedgersToRereplicate(final Predicate<List<String>> predicate) {
        final LinkedList linkedList = new LinkedList();
        linkedList.add(this.urLedgerPath);
        return new Iterator<UnderreplicatedLedger>() { // from class: org.apache.pulsar.metadata.bookkeeper.PulsarLedgerUnderreplicationManager.1
            final Queue<UnderreplicatedLedger> curBatch = new LinkedList();
            static final /* synthetic */ boolean $assertionsDisabled;

            @Override // java.util.Iterator
            public void remove() {
                throw new UnsupportedOperationException();
            }

            @Override // java.util.Iterator
            public boolean hasNext() {
                if (this.curBatch.size() > 0) {
                    return true;
                }
                while (linkedList.size() > 0 && this.curBatch.size() == 0) {
                    String str = (String) linkedList.remove();
                    try {
                        for (String str2 : PulsarLedgerUnderreplicationManager.this.store.getChildren(str).get()) {
                            String str3 = str + "/" + str2;
                            if (str2.startsWith("urL")) {
                                UnderreplicatedLedger ledgerUnreplicationInfo = PulsarLedgerUnderreplicationManager.this.getLedgerUnreplicationInfo(PulsarLedgerUnderreplicationManager.this.getLedgerId(str3));
                                if (ledgerUnreplicationInfo != null) {
                                    List<String> replicaList = ledgerUnreplicationInfo.getReplicaList();
                                    if (predicate == null || predicate.test(replicaList)) {
                                        this.curBatch.add(ledgerUnreplicationInfo);
                                    }
                                }
                            } else {
                                linkedList.add(str3);
                            }
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return false;
                    } catch (Exception e2) {
                        throw new RuntimeException("Error reading list", e2);
                    }
                }
                return this.curBatch.size() > 0;
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.Iterator
            public UnderreplicatedLedger next() {
                if ($assertionsDisabled || this.curBatch.size() > 0) {
                    return this.curBatch.remove();
                }
                throw new AssertionError();
            }

            static {
                $assertionsDisabled = !PulsarLedgerUnderreplicationManager.class.desiredAssertionStatus();
            }
        };
    }

    private long getLedgerToRereplicateFromHierarchy(String str, long j) throws ExecutionException, InterruptedException {
        if (j != 4) {
            ArrayList arrayList = new ArrayList(this.store.getChildren(str).join());
            Collections.shuffle(arrayList);
            while (arrayList.size() > 0) {
                String str2 = (String) arrayList.get(0);
                long ledgerToRereplicateFromHierarchy = getLedgerToRereplicateFromHierarchy(str + "/" + str2, j + 1);
                if (ledgerToRereplicateFromHierarchy != -1) {
                    return ledgerToRereplicateFromHierarchy;
                }
                arrayList.remove(str2);
            }
            return -1L;
        }
        ArrayList arrayList2 = new ArrayList(this.store.getChildren(str).get());
        Collections.shuffle(arrayList2);
        while (!arrayList2.isEmpty()) {
            String str3 = (String) arrayList2.get(0);
            try {
                if (this.store.getChildren(this.urLockPath).get().contains(str3)) {
                    arrayList2.remove(str3);
                } else {
                    Optional<GetResult> optional = this.store.get(str + "/" + str3).get();
                    if (optional.isPresent()) {
                        long ledgerId = getLedgerId(str3);
                        internalAcquireUnderreplicatedLedger(ledgerId);
                        this.heldLocks.put(Long.valueOf(ledgerId), new Lock(getUrLedgerLockPath(this.urLockPath, ledgerId), optional.get().getStat().getVersion()));
                        return ledgerId;
                    }
                    if (log.isDebugEnabled()) {
                        log.debug("{}/{} doesn't exist", str, str3);
                    }
                    arrayList2.remove(str3);
                }
            } catch (NumberFormatException e) {
                arrayList2.remove(str3);
            } catch (ExecutionException e2) {
                if (!(e2.getCause() instanceof MetadataStoreException.BadVersionException)) {
                    throw e2;
                }
                arrayList2.remove(str3);
            }
        }
        return -1L;
    }

    @Override // org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public long pollLedgerToRereplicate() throws ReplicationException.UnavailableException {
        if (log.isDebugEnabled()) {
            log.debug("pollLedgerToRereplicate()");
        }
        try {
            return getLedgerToRereplicateFromHierarchy(this.urLedgerPath, 0L);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while connecting metadata store", e);
        } catch (ExecutionException e2) {
            throw new ReplicationException.UnavailableException("Error contacting metadata store", e2);
        }
    }

    @Override // org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public long getLedgerToRereplicate() throws ReplicationException.UnavailableException {
        if (log.isDebugEnabled()) {
            log.debug("getLedgerToRereplicate()");
        }
        while (true) {
            try {
                waitIfLedgerReplicationDisabled();
                long ledgerToRereplicateFromHierarchy = getLedgerToRereplicateFromHierarchy(this.urLedgerPath, 0L);
                if (ledgerToRereplicateFromHierarchy != -1) {
                    return ledgerToRereplicateFromHierarchy;
                }
                synchronized (this) {
                    wait(1000L);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new ReplicationException.UnavailableException("Interrupted while connecting metadata store", e);
            } catch (ExecutionException e2) {
                throw new ReplicationException.UnavailableException("Error contacting metadata store", e2);
            }
        }
    }

    private void waitIfLedgerReplicationDisabled() throws ReplicationException.UnavailableException, InterruptedException {
        ReplicationEnableCb replicationEnableCb = new ReplicationEnableCb();
        if (isLedgerReplicationEnabled()) {
            return;
        }
        notifyLedgerReplicationEnabled(replicationEnableCb);
        replicationEnableCb.await();
    }

    @Override // org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public void releaseUnderreplicatedLedger(long j) throws ReplicationException.UnavailableException {
        if (log.isDebugEnabled()) {
            log.debug("releaseLedger(ledgerId={})", Long.valueOf(j));
        }
        try {
            Lock lock = this.heldLocks.get(Long.valueOf(j));
            if (lock != null) {
                this.store.delete(lock.getLockPath(), Optional.empty()).get();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while connecting metadata store", e);
        } catch (ExecutionException e2) {
            if (!(e2.getCause() instanceof MetadataStoreException.NotFoundException)) {
                log.error("Error deleting underreplicated ledger lock", (Throwable) e2);
                throw new ReplicationException.UnavailableException("Error contacting metadata store", e2);
            }
        }
        this.heldLocks.remove(Long.valueOf(j));
    }

    @Override // org.apache.bookkeeper.meta.LedgerUnderreplicationManager, java.lang.AutoCloseable
    public void close() throws ReplicationException.UnavailableException {
        if (log.isDebugEnabled()) {
            log.debug("close()");
        }
        try {
            Iterator<Map.Entry<Long, Lock>> it = this.heldLocks.entrySet().iterator();
            while (it.hasNext()) {
                this.store.delete(it.next().getValue().getLockPath(), Optional.empty()).get();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while connecting metadata store", e);
        } catch (ExecutionException e2) {
            if (e2.getCause() instanceof MetadataStoreException.NotFoundException) {
                return;
            }
            log.error("Error deleting underreplicated ledger lock", (Throwable) e2);
            throw new ReplicationException.UnavailableException("Error contacting metadata store", e2);
        }
    }

    @Override // org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public void disableLedgerReplication() throws ReplicationException.UnavailableException {
        if (log.isDebugEnabled()) {
            log.debug("disableLedegerReplication()");
        }
        try {
            this.store.put(this.basePath + '/' + BookKeeperConstants.DISABLE_NODE, "".getBytes(StandardCharsets.UTF_8), Optional.of(-1L)).get();
            log.info("Auto ledger re-replication is disabled!");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while stopping auto ledger re-replication", e);
        } catch (ExecutionException e2) {
            log.error("Exception while stopping auto ledger re-replication", (Throwable) e2);
            throw new ReplicationException.UnavailableException("Exception while stopping auto ledger re-replication", e2);
        }
    }

    @Override // org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public void enableLedgerReplication() throws ReplicationException.UnavailableException {
        if (log.isDebugEnabled()) {
            log.debug("enableLedegerReplication()");
        }
        try {
            this.store.delete(this.basePath + '/' + BookKeeperConstants.DISABLE_NODE, Optional.empty()).get();
            log.info("Resuming automatic ledger re-replication");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while resuming auto ledger re-replication", e);
        } catch (ExecutionException e2) {
            log.error("Exception while resuming ledger replication", (Throwable) e2);
            throw new ReplicationException.UnavailableException("Exception while resuming auto ledger re-replication", e2);
        }
    }

    @Override // org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public boolean isLedgerReplicationEnabled() throws ReplicationException.UnavailableException {
        if (log.isDebugEnabled()) {
            log.debug("isLedgerReplicationEnabled()");
        }
        try {
            return !this.store.exists(new StringBuilder().append(this.basePath).append('/').append(BookKeeperConstants.DISABLE_NODE).toString()).get().booleanValue();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", e);
        } catch (ExecutionException e2) {
            log.error("Error while checking the state of ledger re-replication", (Throwable) e2);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", e2);
        }
    }

    @Override // org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public void notifyLedgerReplicationEnabled(BookkeeperInternalCallbacks.GenericCallback<Void> genericCallback) throws ReplicationException.UnavailableException {
        if (log.isDebugEnabled()) {
            log.debug("notifyLedgerReplicationEnabled()");
        }
        synchronized (this) {
            this.replicationEnabledListener = genericCallback;
        }
        try {
            if (this.store.exists(this.basePath + '/' + BookKeeperConstants.DISABLE_NODE).get().booleanValue()) {
                return;
            }
            log.info("LedgerReplication is enabled externally through metadata store, since DISABLE_NODE node is deleted");
            genericCallback.operationComplete(0, null);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", e);
        } catch (ExecutionException e2) {
            log.error("Error while checking the state of ledger re-replication", (Throwable) e2);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", e2);
        }
    }

    @Override // org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public boolean isLedgerBeingReplicated(long j) throws ReplicationException {
        try {
            return this.store.exists(getUrLedgerLockPath(this.urLockPath, j)).get().booleanValue();
        } catch (Exception e) {
            throw new ReplicationException.UnavailableException("Failed to check if ledger is beinge replicated", e);
        }
    }

    @Override // org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public boolean initializeLostBookieRecoveryDelay(int i) throws ReplicationException.UnavailableException {
        log.debug("initializeLostBookieRecoveryDelay()");
        try {
            this.store.put(this.lostBookieRecoveryDelayPath, Integer.toString(i).getBytes(StandardCharsets.UTF_8), Optional.of(-1L)).get();
            return true;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", e);
        } catch (ExecutionException e2) {
            if (e2.getCause() instanceof MetadataStoreException.BadVersionException) {
                log.info("lostBookieRecoveryDelay node is already present, so using existing lostBookieRecoveryDelay node value");
                return false;
            }
            log.error("Error while initializing LostBookieRecoveryDelay", (Throwable) e2);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", e2);
        }
    }

    @Override // org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public void setLostBookieRecoveryDelay(int i) throws ReplicationException.UnavailableException {
        log.debug("setLostBookieRecoveryDelay()");
        try {
            this.store.put(this.lostBookieRecoveryDelayPath, Integer.toString(i).getBytes(StandardCharsets.UTF_8), Optional.empty()).get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", e);
        } catch (ExecutionException e2) {
            log.error("Error while setting LostBookieRecoveryDelay ", (Throwable) e2);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", e2);
        }
    }

    @Override // org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public int getLostBookieRecoveryDelay() throws ReplicationException.UnavailableException {
        log.debug("getLostBookieRecoveryDelay()");
        try {
            return Integer.parseInt(new String(this.store.get(this.lostBookieRecoveryDelayPath).get().get().getValue(), StandardCharsets.UTF_8));
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", e);
        } catch (ExecutionException e2) {
            log.error("Error while getting LostBookieRecoveryDelay ", (Throwable) e2);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", e2);
        }
    }

    @Override // org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public void notifyLostBookieRecoveryDelayChanged(BookkeeperInternalCallbacks.GenericCallback<Void> genericCallback) throws ReplicationException.UnavailableException {
        log.debug("notifyLostBookieRecoveryDelayChanged()");
        synchronized (this) {
            this.lostBookieRecoveryDelayListener = genericCallback;
        }
        try {
            if (this.store.exists(this.lostBookieRecoveryDelayPath).get().booleanValue()) {
                return;
            }
            genericCallback.operationComplete(0, null);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", e);
        } catch (ExecutionException e2) {
            log.error("Error while checking the state of lostBookieRecoveryDelay", (Throwable) e2);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", e2);
        }
    }

    @Override // org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public String getReplicationWorkerIdRereplicatingLedger(long j) throws ReplicationException.UnavailableException {
        try {
            Optional<GetResult> optional = this.store.get(getUrLedgerLockPath(this.urLockPath, j)).get();
            if (!optional.isPresent()) {
                return null;
            }
            byte[] value = optional.get().getValue();
            DataFormats.LockDataFormat.Builder newBuilder = DataFormats.LockDataFormat.newBuilder();
            TextFormat.merge(new String(value, StandardCharsets.UTF_8), newBuilder);
            return newBuilder.build().getBookieId();
        } catch (TextFormat.ParseException e) {
            log.error("Error while parsing ZK data of lock", (Throwable) e);
            throw new ReplicationException.UnavailableException("Error while parsing ZK data of lock", e);
        } catch (InterruptedException e2) {
            log.error("Got interrupted while getting ReplicationWorkerId rereplicating Ledger", (Throwable) e2);
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", e2);
        } catch (ExecutionException e3) {
            log.error("Error while getting ReplicationWorkerId rereplicating Ledger", (Throwable) e3);
            throw new ReplicationException.UnavailableException("Error while getting ReplicationWorkerId rereplicating Ledger", e3);
        }
    }

    @Override // org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public void setCheckAllLedgersCTime(long j) throws ReplicationException.UnavailableException {
        if (log.isDebugEnabled()) {
            log.debug("setCheckAllLedgersCTime");
        }
        try {
            DataFormats.CheckAllLedgersFormat.Builder newBuilder = DataFormats.CheckAllLedgersFormat.newBuilder();
            newBuilder.setCheckAllLedgersCTime(j);
            this.store.put(this.checkAllLedgersCtimePath, newBuilder.build().toByteArray(), Optional.empty()).get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", e);
        } catch (ExecutionException e2) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", e2);
        }
    }

    @Override // org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public long getCheckAllLedgersCTime() throws ReplicationException.UnavailableException {
        if (log.isDebugEnabled()) {
            log.debug("setCheckAllLedgersCTime");
        }
        try {
            Optional<GetResult> optional = this.store.get(this.checkAllLedgersCtimePath).get();
            if (!optional.isPresent()) {
                log.warn("checkAllLedgersCtimeZnode is not yet available");
                return -1L;
            }
            DataFormats.CheckAllLedgersFormat parseFrom = DataFormats.CheckAllLedgersFormat.parseFrom(optional.get().getValue());
            if (parseFrom.hasCheckAllLedgersCTime()) {
                return parseFrom.getCheckAllLedgersCTime();
            }
            return -1L;
        } catch (InvalidProtocolBufferException e) {
            throw new ReplicationException.UnavailableException("Error while parsing ZK protobuf binary data", e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", e2);
        } catch (ExecutionException e3) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", e3);
        }
    }

    @Override // org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public void setPlacementPolicyCheckCTime(long j) throws ReplicationException.UnavailableException {
        if (log.isDebugEnabled()) {
            log.debug("setPlacementPolicyCheckCTime");
        }
        try {
            DataFormats.PlacementPolicyCheckFormat.Builder newBuilder = DataFormats.PlacementPolicyCheckFormat.newBuilder();
            newBuilder.setPlacementPolicyCheckCTime(j);
            this.store.put(this.placementPolicyCheckCtimePath, newBuilder.build().toByteArray(), Optional.empty()).get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", e);
        } catch (ExecutionException e2) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", e2);
        }
    }

    @Override // org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public long getPlacementPolicyCheckCTime() throws ReplicationException.UnavailableException {
        if (log.isDebugEnabled()) {
            log.debug("getPlacementPolicyCheckCTime");
        }
        try {
            Optional<GetResult> optional = this.store.get(this.placementPolicyCheckCtimePath).get();
            if (!optional.isPresent()) {
                log.warn("placementPolicyCheckCtimeZnode is not yet available");
                return -1L;
            }
            DataFormats.PlacementPolicyCheckFormat parseFrom = DataFormats.PlacementPolicyCheckFormat.parseFrom(optional.get().getValue());
            if (parseFrom.hasPlacementPolicyCheckCTime()) {
                return parseFrom.getPlacementPolicyCheckCTime();
            }
            return -1L;
        } catch (InvalidProtocolBufferException e) {
            throw new ReplicationException.UnavailableException("Error while parsing ZK protobuf binary data", e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", e2);
        } catch (ExecutionException e3) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", e3);
        }
    }

    @Override // org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public void setReplicasCheckCTime(long j) throws ReplicationException.UnavailableException {
        try {
            DataFormats.ReplicasCheckFormat.Builder newBuilder = DataFormats.ReplicasCheckFormat.newBuilder();
            newBuilder.setReplicasCheckCTime(j);
            this.store.put(this.replicasCheckCtimePath, newBuilder.build().toByteArray(), Optional.empty()).get();
            if (log.isDebugEnabled()) {
                log.debug("setReplicasCheckCTime completed successfully");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", e);
        } catch (ExecutionException e2) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", e2);
        }
    }

    @Override // org.apache.bookkeeper.meta.LedgerUnderreplicationManager
    public long getReplicasCheckCTime() throws ReplicationException.UnavailableException {
        try {
            Optional<GetResult> optional = this.store.get(this.replicasCheckCtimePath).get();
            if (!optional.isPresent()) {
                log.warn("placementPolicyCheckCtimeZnode is not yet available");
                return -1L;
            }
            DataFormats.ReplicasCheckFormat parseFrom = DataFormats.ReplicasCheckFormat.parseFrom(optional.get().getValue());
            if (log.isDebugEnabled()) {
                log.debug("getReplicasCheckCTime completed successfully");
            }
            if (parseFrom.hasReplicasCheckCTime()) {
                return parseFrom.getReplicasCheckCTime();
            }
            return -1L;
        } catch (InvalidProtocolBufferException e) {
            throw new ReplicationException.UnavailableException("Error while parsing ZK protobuf binary data", e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", e2);
        } catch (ExecutionException e3) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", e3);
        }
    }
}
