package org.apache.pulsar.metadata.impl;

import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.bookkeeper.net.NodeBase;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreLifecycle;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.impl.batching.AbstractBatchedMetadataStore;
import org.apache.pulsar.metadata.impl.batching.MetadataOp;
import org.apache.pulsar.metadata.impl.batching.OpDelete;
import org.apache.pulsar.metadata.impl.batching.OpGet;
import org.apache.pulsar.metadata.impl.batching.OpGetChildren;
import org.apache.pulsar.metadata.impl.batching.OpPut;
import org.apache.zookeeper.AddWatchMode;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.client.ConnectStringParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-metadata-2.11.2.8.jar:org/apache/pulsar/metadata/impl/ZKMetadataStore.class */
public class ZKMetadataStore extends AbstractBatchedMetadataStore implements MetadataStoreExtended, MetadataStoreLifecycle {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ZKMetadataStore.class);
    public static final String ZK_SCHEME = "zk";
    public static final String ZK_SCHEME_IDENTIFIER = "zk:";
    private final String zkConnectString;
    private final String rootPath;
    private final MetadataStoreConfig metadataStoreConfig;
    private final boolean isZkManaged;
    private final ZooKeeper zkc;
    private Optional<ZKSessionWatcher> sessionWatcher;

    public ZKMetadataStore(String str, MetadataStoreConfig metadataStoreConfig, boolean z) throws MetadataStoreException {
        super(metadataStoreConfig);
        try {
            if (str.startsWith(ZK_SCHEME_IDENTIFIER)) {
                this.zkConnectString = str.substring(ZK_SCHEME_IDENTIFIER.length());
            } else {
                this.zkConnectString = str;
            }
            this.metadataStoreConfig = metadataStoreConfig;
            this.rootPath = new ConnectStringParser(this.zkConnectString).getChrootPath();
            this.isZkManaged = true;
            this.zkc = PulsarZooKeeperClient.newBuilder().connectString(this.zkConnectString).connectRetryPolicy(new BoundExponentialBackoffRetryPolicy(100L, 60000L, Integer.MAX_VALUE)).allowReadOnlyMode(metadataStoreConfig.isAllowReadOnlyOperations()).sessionTimeoutMs(metadataStoreConfig.getSessionTimeoutMillis()).watchers(Collections.singleton(watchedEvent -> {
                if (this.sessionWatcher != null) {
                    this.sessionWatcher.ifPresent(zKSessionWatcher -> {
                        this.executor.execute(() -> {
                            zKSessionWatcher.process(watchedEvent);
                        });
                    });
                }
            })).build();
            this.zkc.addWatch(NodeBase.PATH_SEPARATOR_STR, this::handleWatchEvent, AddWatchMode.PERSISTENT_RECURSIVE);
            if (z) {
                this.sessionWatcher = Optional.of(new ZKSessionWatcher(this.zkc, this::receivedSessionEvent));
            } else {
                this.sessionWatcher = Optional.empty();
            }
        } catch (Throwable th) {
            throw new MetadataStoreException(th);
        }
    }

    @VisibleForTesting
    public ZKMetadataStore(ZooKeeper zooKeeper) {
        this(zooKeeper, MetadataStoreConfig.builder().build());
    }

    @VisibleForTesting
    public ZKMetadataStore(ZooKeeper zooKeeper, MetadataStoreConfig metadataStoreConfig) {
        super(metadataStoreConfig);
        this.zkConnectString = null;
        this.rootPath = null;
        this.metadataStoreConfig = null;
        this.isZkManaged = false;
        this.zkc = zooKeeper;
        this.sessionWatcher = Optional.of(new ZKSessionWatcher(zooKeeper, this::receivedSessionEvent));
        zooKeeper.addWatch(NodeBase.PATH_SEPARATOR_STR, this::handleWatchEvent, AddWatchMode.PERSISTENT_RECURSIVE);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.metadata.impl.AbstractMetadataStore
    public void receivedSessionEvent(SessionEvent sessionEvent) {
        if (sessionEvent == SessionEvent.SessionReestablished) {
            this.zkc.addWatch(NodeBase.PATH_SEPARATOR_STR, this::handleWatchEvent, AddWatchMode.PERSISTENT_RECURSIVE, (i, str, obj) -> {
                if (i == KeeperException.Code.OK.intValue()) {
                    super.receivedSessionEvent(sessionEvent);
                    return;
                }
                log.error("Failed to recreate persistent watch on ZooKeeper: {}", KeeperException.Code.get(i));
                this.sessionWatcher.ifPresent((v0) -> {
                    v0.setSessionInvalid();
                });
                if (this.zkc instanceof PulsarZooKeeperClient) {
                    ((PulsarZooKeeperClient) this.zkc).process(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null));
                }
            }, null);
        } else {
            super.receivedSessionEvent(sessionEvent);
        }
    }

    @Override // org.apache.pulsar.metadata.api.MetadataStore
    public CompletableFuture<Void> sync(final String str) {
        final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.zkc.sync(str, new AsyncCallback.VoidCallback() { // from class: org.apache.pulsar.metadata.impl.ZKMetadataStore.1
            @Override // org.apache.zookeeper.AsyncCallback.VoidCallback
            public void processResult(int i, String str2, Object obj) {
                KeeperException.Code code = KeeperException.Code.get(i);
                if (code == KeeperException.Code.OK) {
                    completableFuture.complete(null);
                } else {
                    completableFuture.completeExceptionally(ZKMetadataStore.getException(code, str));
                }
            }
        }, null);
        return completableFuture;
    }

    @Override // org.apache.pulsar.metadata.impl.batching.AbstractBatchedMetadataStore
    protected void batchOperation(List<MetadataOp> list) {
        try {
            this.zkc.multi((Iterable) list.stream().map(this::convertOp).collect(Collectors.toList()), (i, str, obj, list2) -> {
                if (list2 != null) {
                    execute(() -> {
                        for (int i = 0; i < list.size(); i++) {
                            OpResult opResult = (OpResult) list2.get(i);
                            MetadataOp metadataOp = (MetadataOp) list.get(i);
                            switch (metadataOp.getType()) {
                                case GET:
                                    handleGetResult(metadataOp.asGet(), opResult);
                                    break;
                                case PUT:
                                    handlePutResult(metadataOp.asPut(), opResult);
                                    break;
                                case DELETE:
                                    handleDeleteResult(metadataOp.asDelete(), opResult);
                                    break;
                                case GET_CHILDREN:
                                    handleGetChildrenResult(metadataOp.asGetChildren(), opResult);
                                    break;
                                default:
                                    metadataOp.getFuture().completeExceptionally(new MetadataStoreException("Operation type not supported in multi: " + metadataOp.getType()));
                                    break;
                            }
                        }
                    }, () -> {
                        return (List) list.stream().map((v0) -> {
                            return v0.getFuture();
                        }).collect(Collectors.toList());
                    });
                    return;
                }
                KeeperException.Code code = KeeperException.Code.get(i);
                if (code == KeeperException.Code.CONNECTIONLOSS) {
                    this.executor.schedule(() -> {
                        list.forEach(metadataOp -> {
                            batchOperation(Collections.singletonList(metadataOp));
                        });
                    }, 100L, TimeUnit.MILLISECONDS);
                } else {
                    MetadataStoreException exception = getException(code, str);
                    list.forEach(metadataOp -> {
                        metadataOp.getFuture().completeExceptionally(exception);
                    });
                }
            }, null);
        } catch (Throwable th) {
            list.forEach(metadataOp -> {
                metadataOp.getFuture().completeExceptionally(new MetadataStoreException(th));
            });
        }
    }

    private void handlePutResult(OpPut opPut, OpResult opResult) {
        if (!(opResult instanceof OpResult.ErrorResult)) {
            if (opResult instanceof OpResult.CreateResult) {
                opPut.getFuture().complete(new Stat(((OpResult.CreateResult) opResult).getPath(), 0L, 0L, 0L, opPut.isEphemeral(), true));
                return;
            } else {
                opPut.getFuture().complete(getStat(opPut.getPath(), ((OpResult.SetDataResult) opResult).getStat()));
                return;
            }
        }
        KeeperException.Code code = KeeperException.Code.get(((OpResult.ErrorResult) opResult).getErr());
        if (code == KeeperException.Code.NONODE) {
            internalStorePut(opPut);
            return;
        }
        if (code == KeeperException.Code.NODEEXISTS) {
            opPut.getFuture().completeExceptionally(getException(KeeperException.Code.BADVERSION, opPut.getPath()));
        } else if (code == KeeperException.Code.RUNTIMEINCONSISTENCY || code == KeeperException.Code.OK) {
            internalStorePut(opPut);
        } else {
            opPut.getFuture().completeExceptionally(getException(code, opPut.getPath()));
        }
    }

    private void handleGetResult(OpGet opGet, OpResult opResult) {
        if (!(opResult instanceof OpResult.ErrorResult)) {
            OpResult.GetDataResult getDataResult = (OpResult.GetDataResult) opResult;
            opGet.getFuture().complete(Optional.of(new GetResult(getDataResult.getData(), getStat(opGet.getPath(), getDataResult.getStat()))));
            return;
        }
        KeeperException.Code code = KeeperException.Code.get(((OpResult.ErrorResult) opResult).getErr());
        if (code == KeeperException.Code.NONODE) {
            opGet.getFuture().complete(Optional.empty());
        } else {
            opGet.getFuture().completeExceptionally(getException(code, opGet.getPath()));
        }
    }

    private void handleGetChildrenResult(OpGetChildren opGetChildren, OpResult opResult) {
        if (!(opResult instanceof OpResult.ErrorResult)) {
            OpResult.GetChildrenResult getChildrenResult = (OpResult.GetChildrenResult) opResult;
            Collections.sort(getChildrenResult.getChildren());
            opGetChildren.getFuture().complete(getChildrenResult.getChildren());
        } else {
            KeeperException.Code code = KeeperException.Code.get(((OpResult.ErrorResult) opResult).getErr());
            if (code == KeeperException.Code.NONODE) {
                opGetChildren.asGetChildren().getFuture().complete(Collections.emptyList());
            } else {
                opGetChildren.getFuture().completeExceptionally(getException(code, opGetChildren.getPath()));
            }
        }
    }

    private void handleDeleteResult(OpDelete opDelete, OpResult opResult) {
        if (!(opResult instanceof OpResult.ErrorResult)) {
            opDelete.getFuture().complete(null);
            return;
        }
        KeeperException.Code code = KeeperException.Code.get(((OpResult.ErrorResult) opResult).getErr());
        if (code == KeeperException.Code.RUNTIMEINCONSISTENCY || code == KeeperException.Code.OK) {
            internalStoreDelete(opDelete);
        } else {
            opDelete.getFuture().completeExceptionally(getException(code, opDelete.getPath()));
        }
    }

    private Op convertOp(MetadataOp metadataOp) {
        switch (metadataOp.getType()) {
            case GET:
                return Op.getData(metadataOp.asGet().getPath());
            case PUT:
                OpPut asPut = metadataOp.asPut();
                return (asPut.getOptExpectedVersion().isPresent() && asPut.getOptExpectedVersion().get().longValue() == -1) ? Op.create(asPut.getPath(), asPut.getData(), ZooDefs.Ids.OPEN_ACL_UNSAFE, getCreateMode(asPut.getOptions())) : Op.setData(asPut.getPath(), asPut.getData(), asPut.getOptExpectedVersion().orElse(-1L).intValue());
            case DELETE:
                OpDelete asDelete = metadataOp.asDelete();
                return Op.delete(asDelete.getPath(), asDelete.getOptExpectedVersion().orElse(-1L).intValue());
            case GET_CHILDREN:
                return Op.getChildren(metadataOp.asGetChildren().getPath());
            default:
                return null;
        }
    }

    @Override // org.apache.pulsar.metadata.impl.AbstractMetadataStore
    public CompletableFuture<Boolean> existsFromStore(String str) {
        CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        try {
            this.zkc.exists(str, (Watcher) null, (i, str2, obj, stat) -> {
                execute(() -> {
                    KeeperException.Code code = KeeperException.Code.get(i);
                    if (code == KeeperException.Code.OK) {
                        completableFuture.complete(true);
                    } else if (code == KeeperException.Code.NONODE) {
                        completableFuture.complete(false);
                    } else {
                        completableFuture.completeExceptionally(getException(code, str));
                    }
                }, (CompletableFuture<?>) completableFuture);
            }, completableFuture);
        } catch (Throwable th) {
            completableFuture.completeExceptionally(new MetadataStoreException(th));
        }
        return completableFuture;
    }

    private void internalStoreDelete(OpDelete opDelete) {
        int intValue = opDelete.getOptExpectedVersion().orElse(-1L).intValue();
        CompletableFuture<Void> future = opDelete.getFuture();
        try {
            this.zkc.delete(opDelete.getPath(), intValue, (i, str, obj) -> {
                execute(() -> {
                    KeeperException.Code code = KeeperException.Code.get(i);
                    if (code == KeeperException.Code.OK) {
                        future.complete(null);
                    } else {
                        future.completeExceptionally(getException(code, opDelete.getPath()));
                    }
                }, (CompletableFuture<?>) future);
            }, null);
        } catch (Throwable th) {
            future.completeExceptionally(new MetadataStoreException(th));
        }
    }

    private void internalStorePut(OpPut opPut) {
        boolean isPresent = opPut.getOptExpectedVersion().isPresent();
        int intValue = opPut.getOptExpectedVersion().orElse(-1L).intValue();
        CompletableFuture<Stat> future = opPut.getFuture();
        try {
            if (isPresent && intValue == -1) {
                CreateMode createMode = getCreateMode(opPut.getOptions());
                asyncCreateFullPathOptimistic(this.zkc, opPut.getPath(), opPut.getData(), createMode, (i, str, obj, str2) -> {
                    execute(() -> {
                        KeeperException.Code code = KeeperException.Code.get(i);
                        if (code == KeeperException.Code.OK) {
                            future.complete(new Stat(str2, 0L, 0L, 0L, createMode.isEphemeral(), true));
                        } else if (code == KeeperException.Code.NODEEXISTS) {
                            future.completeExceptionally(getException(KeeperException.Code.BADVERSION, opPut.getPath()));
                        } else {
                            future.completeExceptionally(getException(code, opPut.getPath()));
                        }
                    }, (CompletableFuture<?>) future);
                });
            } else {
                this.zkc.setData(opPut.getPath(), opPut.getData(), intValue, (i2, str3, obj2, stat) -> {
                    execute(() -> {
                        KeeperException.Code code = KeeperException.Code.get(i2);
                        if (code == KeeperException.Code.OK) {
                            future.complete(getStat(str3, stat));
                            return;
                        }
                        if (code != KeeperException.Code.NONODE) {
                            future.completeExceptionally(getException(code, opPut.getPath()));
                        } else if (isPresent) {
                            future.completeExceptionally(getException(KeeperException.Code.BADVERSION, opPut.getPath()));
                        } else {
                            put(opPut.getPath(), opPut.getData(), Optional.of(-1L)).thenAccept(stat -> {
                                future.complete(stat);
                            }).exceptionally(th -> {
                                if (th.getCause() instanceof MetadataStoreException.BadVersionException) {
                                    internalStorePut(opPut);
                                    return null;
                                }
                                future.completeExceptionally(MetadataStoreException.wrap(th.getCause()));
                                return null;
                            });
                        }
                    }, (CompletableFuture<?>) future);
                }, null);
            }
        } catch (Throwable th) {
            future.completeExceptionally(new MetadataStoreException(th));
        }
    }

    @Override // org.apache.pulsar.metadata.impl.batching.AbstractBatchedMetadataStore, org.apache.pulsar.metadata.impl.AbstractMetadataStore, java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.isZkManaged) {
            this.zkc.close();
        }
        if (this.sessionWatcher.isPresent()) {
            this.sessionWatcher.get().close();
        }
        super.close();
    }

    private Stat getStat(String str, org.apache.zookeeper.data.Stat stat) {
        return new Stat(str, stat.getVersion(), stat.getCtime(), stat.getMtime(), stat.getEphemeralOwner() != -1, stat.getEphemeralOwner() == this.zkc.getSessionId());
    }

    private static MetadataStoreException getException(KeeperException.Code code, String str) {
        KeeperException create = KeeperException.create(code, str);
        switch (code) {
            case BADVERSION:
                return new MetadataStoreException.BadVersionException(create);
            case NONODE:
                return new MetadataStoreException.NotFoundException(create);
            case NODEEXISTS:
                return new MetadataStoreException.AlreadyExistsException(create);
            default:
                return new MetadataStoreException(create);
        }
    }

    private void handleWatchEvent(WatchedEvent watchedEvent) {
        NotificationType notificationType;
        if (log.isDebugEnabled()) {
            log.debug("Received ZK watch : {}", watchedEvent);
        }
        String path = watchedEvent.getPath();
        if (path == null) {
            return;
        }
        String parent = parent(path);
        Notification notification = null;
        switch (watchedEvent.getType()) {
            case NodeCreated:
                notificationType = NotificationType.Created;
                if (parent != null) {
                    notification = new Notification(NotificationType.ChildrenChanged, parent);
                    break;
                }
                break;
            case NodeDataChanged:
                notificationType = NotificationType.Modified;
                break;
            case NodeChildrenChanged:
                notificationType = NotificationType.ChildrenChanged;
                break;
            case NodeDeleted:
                notificationType = NotificationType.Deleted;
                if (parent != null) {
                    notification = new Notification(NotificationType.ChildrenChanged, parent);
                    break;
                }
                break;
            default:
                return;
        }
        receivedNotification(new Notification(notificationType, watchedEvent.getPath()));
        if (notification != null) {
            receivedNotification(notification);
        }
    }

    private static CreateMode getCreateMode(EnumSet<CreateOption> enumSet) {
        return enumSet.contains(CreateOption.Ephemeral) ? enumSet.contains(CreateOption.Sequential) ? CreateMode.EPHEMERAL_SEQUENTIAL : CreateMode.EPHEMERAL : enumSet.contains(CreateOption.Sequential) ? CreateMode.PERSISTENT_SEQUENTIAL : CreateMode.PERSISTENT;
    }

    public long getZkSessionId() {
        return this.zkc.getSessionId();
    }

    public ZooKeeper getZkClient() {
        return this.zkc;
    }

    @Override // org.apache.pulsar.metadata.api.MetadataStoreLifecycle
    public CompletableFuture<Void> initializeCluster() {
        if (this.zkConnectString == null) {
            return FutureUtil.failedFuture(new MetadataStoreException("metadataURL is not set"));
        }
        if (this.metadataStoreConfig == null) {
            return FutureUtil.failedFuture(new MetadataStoreException("metadataStoreConfig is not set"));
        }
        int indexOf = this.zkConnectString.indexOf(NodeBase.PATH_SEPARATOR_STR);
        if (indexOf > 0) {
            String substring = this.zkConnectString.substring(indexOf);
            try {
                PulsarZooKeeperClient build = PulsarZooKeeperClient.newBuilder().connectString(this.zkConnectString.substring(0, indexOf)).sessionTimeoutMs(this.metadataStoreConfig.getSessionTimeoutMillis()).connectRetryPolicy(new BoundExponentialBackoffRetryPolicy(this.metadataStoreConfig.getSessionTimeoutMillis(), this.metadataStoreConfig.getSessionTimeoutMillis(), 0)).build();
                try {
                    if (build.exists(substring, false) == null) {
                        createFullPathOptimistic(build, substring, new byte[0], CreateMode.PERSISTENT);
                        log.info("Created zookeeper chroot path {} successfully", substring);
                    }
                    if (build != null) {
                        build.close();
                    }
                } finally {
                }
            } catch (Exception e) {
                return FutureUtil.failedFuture(e);
            }
        }
        return CompletableFuture.completedFuture(null);
    }

    private static void asyncCreateFullPathOptimistic(ZooKeeper zooKeeper, String str, byte[] bArr, CreateMode createMode, AsyncCallback.StringCallback stringCallback) {
        zooKeeper.create(str, bArr, ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode, (i, str2, obj, str3) -> {
            if (i != KeeperException.Code.NONODE.intValue()) {
                stringCallback.processResult(i, str2, obj, str3);
            } else {
                asyncCreateFullPathOptimistic(zooKeeper, new File(str).getParent().replace("\\", NodeBase.PATH_SEPARATOR_STR), new byte[0], CreateMode.CONTAINER, (i, str2, obj, str3) -> {
                    if (i == KeeperException.Code.OK.intValue() || i == KeeperException.Code.NODEEXISTS.intValue()) {
                        asyncCreateFullPathOptimistic(zooKeeper, str, bArr, createMode, stringCallback);
                    } else {
                        stringCallback.processResult(i, str2, obj, str3);
                    }
                });
            }
        }, (Object) null);
    }

    private static void createFullPathOptimistic(ZooKeeper zooKeeper, String str, byte[] bArr, CreateMode createMode) throws KeeperException, InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicInteger atomicInteger = new AtomicInteger(KeeperException.Code.OK.intValue());
        asyncCreateFullPathOptimistic(zooKeeper, str, bArr, createMode, (i, str2, obj, str3) -> {
            atomicInteger.set(i);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        if (atomicInteger.get() != KeeperException.Code.OK.intValue()) {
            throw KeeperException.create(KeeperException.Code.get(atomicInteger.get()));
        }
    }

    public String getRootPath() {
        return this.rootPath;
    }
}
