package org.apache.pulsar.metadata.impl;

import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Txn;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.kv.TxnResponse;
import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
import io.etcd.jetcd.op.Cmp;
import io.etcd.jetcd.op.CmpTarget;
import io.etcd.jetcd.op.Op;
import io.etcd.jetcd.options.DeleteOption;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.PutOption;
import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.support.CloseableClient;
import io.etcd.jetcd.watch.WatchEvent;
import io.etcd.jetcd.watch.WatchResponse;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.impl.batching.AbstractBatchedMetadataStore;
import org.apache.pulsar.metadata.impl.batching.MetadataOp;
import org.apache.pulsar.metadata.impl.batching.OpDelete;
import org.apache.pulsar.metadata.impl.batching.OpGet;
import org.apache.pulsar.metadata.impl.batching.OpGetChildren;
import org.apache.pulsar.metadata.impl.batching.OpPut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-metadata-2.10.0.0-rc3.jar:org/apache/pulsar/metadata/impl/EtcdMetadataStore.class */
public class EtcdMetadataStore extends AbstractBatchedMetadataStore {
    static final String ETCD_SCHEME_IDENTIFIER = "etcd:";
    private final int leaseTTLSeconds;
    private final Client client;
    private final KV kv;
    private volatile long leaseId;
    private volatile CloseableClient leaseClient;
    private final EtcdSessionWatcher sessionWatcher;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) EtcdMetadataStore.class);
    private static final GetOption EXISTS_GET_OPTION = GetOption.newBuilder().withCountOnly(true).build();
    private static final GetOption SINGLE_GET_OPTION = GetOption.newBuilder().withLimit(1).build();

    public EtcdMetadataStore(String str, MetadataStoreConfig metadataStoreConfig, boolean z) throws MetadataStoreException {
        super(metadataStoreConfig);
        this.leaseTTLSeconds = metadataStoreConfig.getSessionTimeoutMillis() / 1000;
        try {
            this.client = Client.builder().endpoints(str.replaceFirst(ETCD_SCHEME_IDENTIFIER, "")).build();
            this.kv = this.client.getKVClient();
            this.client.getWatchClient().watch(ByteSequence.from("��", StandardCharsets.UTF_8), WatchOption.newBuilder().withPrefix(ByteSequence.from("/", StandardCharsets.UTF_8)).build(), this::handleWatchResponse);
            if (z) {
                this.sessionWatcher = new EtcdSessionWatcher(this.client, metadataStoreConfig.getSessionTimeoutMillis(), this::receivedSessionEvent);
                createLease(false).join();
            } else {
                this.sessionWatcher = null;
            }
        } catch (Exception e) {
            throw new MetadataStoreException(e);
        }
    }

    @Override // org.apache.pulsar.metadata.impl.batching.AbstractBatchedMetadataStore, org.apache.pulsar.metadata.impl.AbstractMetadataStore, java.lang.AutoCloseable
    public void close() throws Exception {
        super.close();
        if (this.sessionWatcher != null) {
            this.sessionWatcher.close();
        }
        if (this.leaseClient != null) {
            this.leaseClient.close();
        }
        if (this.leaseId != 0) {
            this.client.getLeaseClient().revoke(this.leaseId);
        }
        this.kv.close();
        this.client.close();
    }

    @Override // org.apache.pulsar.metadata.impl.AbstractMetadataStore
    protected CompletableFuture<Boolean> existsFromStore(String str) {
        return this.kv.get(ByteSequence.from(str, StandardCharsets.UTF_8), EXISTS_GET_OPTION).thenApplyAsync(getResponse -> {
            return Boolean.valueOf(getResponse.getCount() == 1);
        }, (Executor) this.executor);
    }

    @Override // org.apache.pulsar.metadata.impl.batching.AbstractBatchedMetadataStore, org.apache.pulsar.metadata.impl.AbstractMetadataStore
    protected CompletableFuture<Stat> storePut(String str, byte[] bArr, Optional<Long> optional, EnumSet<CreateOption> enumSet) {
        if (!enumSet.contains(CreateOption.Sequential)) {
            return super.storePut(str, bArr, optional, enumSet);
        }
        String parent = parent(str);
        if (parent == null) {
            parent = "/";
        }
        return super.storePut(parent, new byte[0], Optional.empty(), EnumSet.noneOf(CreateOption.class)).thenComposeAsync(stat -> {
            return super.storePut(str + stat.getVersion(), bArr, optional, enumSet);
        }, (Executor) this.executor);
    }

    @Override // org.apache.pulsar.metadata.impl.batching.AbstractBatchedMetadataStore
    protected void batchOperation(List<MetadataOp> list) {
        try {
            Txn txn = this.kv.txn();
            list.forEach(metadataOp -> {
                switch (metadataOp.getType()) {
                    case PUT:
                        OpPut asPut = metadataOp.asPut();
                        ByteSequence from = ByteSequence.from(asPut.getPath(), StandardCharsets.UTF_8);
                        if (asPut.getOptExpectedVersion().isPresent()) {
                            long longValue = asPut.getOptExpectedVersion().get().longValue();
                            if (longValue == -1) {
                                txn.If(new Cmp(from, Cmp.Op.EQUAL, CmpTarget.createRevision(0L)));
                                return;
                            } else {
                                txn.If(new Cmp(from, Cmp.Op.EQUAL, CmpTarget.version(longValue + 1)));
                                return;
                            }
                        }
                        return;
                    case DELETE:
                        OpDelete asDelete = metadataOp.asDelete();
                        ByteSequence from2 = ByteSequence.from(asDelete.getPath(), StandardCharsets.UTF_8);
                        if (asDelete.getOptExpectedVersion().isPresent()) {
                            txn.If(new Cmp(from2, Cmp.Op.EQUAL, CmpTarget.version(asDelete.getOptExpectedVersion().get().longValue() + 1)));
                            return;
                        }
                        return;
                    default:
                        return;
                }
            });
            list.forEach(metadataOp2 -> {
                switch (metadataOp2.getType()) {
                    case GET:
                        txn.Then(Op.get(ByteSequence.from(metadataOp2.asGet().getPath(), StandardCharsets.UTF_8), SINGLE_GET_OPTION));
                        return;
                    case PUT:
                        OpPut asPut = metadataOp2.asPut();
                        ByteSequence from = ByteSequence.from(asPut.getPath(), StandardCharsets.UTF_8);
                        if (asPut.getFuture().isDone()) {
                            return;
                        }
                        PutOption.Builder withPrevKV = PutOption.newBuilder().withPrevKV();
                        if (asPut.isEphemeral()) {
                            withPrevKV.withLeaseId(this.leaseId);
                        }
                        txn.Then(Op.put(from, ByteSequence.from(asPut.getData()), withPrevKV.build()));
                        return;
                    case DELETE:
                        txn.Then(Op.delete(ByteSequence.from(metadataOp2.asDelete().getPath(), StandardCharsets.UTF_8), DeleteOption.DEFAULT));
                        return;
                    case GET_CHILDREN:
                        String path = metadataOp2.asGetChildren().getPath();
                        ByteSequence from2 = ByteSequence.from(path.equals("/") ? path : path + "/", StandardCharsets.UTF_8);
                        txn.Then(Op.get(from2, GetOption.newBuilder().withKeysOnly(true).withSortField(GetOption.SortTarget.KEY).withSortOrder(GetOption.SortOrder.ASCEND).withPrefix(from2).build()));
                        return;
                    default:
                        return;
                }
            });
            txn.commit().thenAccept(txnResponse -> {
                handleBatchOperationResult(txnResponse, list);
            }).exceptionally(th -> {
                Throwable cause = th.getCause();
                if ((cause instanceof ExecutionException) || (cause instanceof CompletionException)) {
                    cause = cause.getCause();
                }
                if (list.size() <= 1 || !(cause instanceof StatusRuntimeException)) {
                    log.warn("Failed to commit: {}", cause.getMessage());
                    this.executor.execute(() -> {
                        list.forEach(metadataOp3 -> {
                            metadataOp3.getFuture().completeExceptionally(th);
                        });
                    });
                    return null;
                }
                Status.Code code = ((StatusRuntimeException) cause).getStatus().getCode();
                if (code != Status.Code.INVALID_ARGUMENT && code != Status.Code.RESOURCE_EXHAUSTED) {
                    return null;
                }
                list.forEach(metadataOp3 -> {
                    batchOperation(Collections.singletonList(metadataOp3));
                });
                return null;
            });
        } catch (Throwable th2) {
            log.warn("Error in committing batch: {}", th2.getMessage());
        }
    }

    private void handleBatchOperationResult(TxnResponse txnResponse, List<MetadataOp> list) {
        this.executor.execute(() -> {
            if (!txnResponse.isSucceeded()) {
                if (list.size() > 1) {
                    list.forEach(metadataOp -> {
                        batchOperation(Collections.singletonList(metadataOp));
                    });
                    return;
                } else {
                    ((MetadataOp) list.get(0)).getFuture().completeExceptionally(new MetadataStoreException.BadVersionException("Bad version"));
                    return;
                }
            }
            int i = 0;
            int i2 = 0;
            int i3 = 0;
            Iterator it = list.iterator();
            while (it.hasNext()) {
                MetadataOp metadataOp2 = (MetadataOp) it.next();
                switch (metadataOp2.getType()) {
                    case GET:
                        OpGet asGet = metadataOp2.asGet();
                        int i4 = i;
                        i++;
                        GetResponse getResponse = txnResponse.getGetResponses().get(i4);
                        if (getResponse.getCount() != 0) {
                            KeyValue keyValue = getResponse.getKvs().get(0);
                            asGet.getFuture().complete(Optional.of(new GetResult(keyValue.getValue().getBytes(), new Stat(asGet.getPath(), keyValue.getVersion() - 1, 0L, 0L, keyValue.getLease() != 0, keyValue.getLease() == this.leaseId))));
                            break;
                        } else {
                            asGet.getFuture().complete(Optional.empty());
                            break;
                        }
                    case PUT:
                        OpPut asPut = metadataOp2.asPut();
                        int i5 = i3;
                        i3++;
                        KeyValue prevKv = txnResponse.getPutResponses().get(i5).getPrevKv();
                        if (prevKv != null) {
                            asPut.getFuture().complete(new Stat(asPut.getPath(), prevKv.getVersion(), 0L, 0L, asPut.isEphemeral(), true));
                            break;
                        } else {
                            asPut.getFuture().complete(new Stat(asPut.getPath(), 0L, 0L, 0L, asPut.isEphemeral(), true));
                            break;
                        }
                    case DELETE:
                        OpDelete asDelete = metadataOp2.asDelete();
                        int i6 = i2;
                        i2++;
                        if (txnResponse.getDeleteResponses().get(i6).getDeleted() != 0) {
                            asDelete.getFuture().complete(null);
                            break;
                        } else {
                            asDelete.getFuture().completeExceptionally(new MetadataStoreException.NotFoundException());
                            break;
                        }
                    case GET_CHILDREN:
                        OpGetChildren asGetChildren = metadataOp2.asGetChildren();
                        int i7 = i;
                        i++;
                        GetResponse getResponse2 = txnResponse.getGetResponses().get(i7);
                        String str = asGetChildren.getPath() + "/";
                        asGetChildren.getFuture().complete(new ArrayList((Set) getResponse2.getKvs().stream().map(keyValue2 -> {
                            return keyValue2.getKey().toString(StandardCharsets.UTF_8);
                        }).map(str2 -> {
                            return str2.replace(str, "");
                        }).map(str3 -> {
                            return str3.split("/", 2)[0];
                        }).collect(Collectors.toCollection(TreeSet::new))));
                        break;
                }
            }
        });
    }

    private synchronized CompletableFuture<Void> createLease(boolean z) {
        CompletableFuture<Void> thenAccept = this.client.getLeaseClient().grant(this.leaseTTLSeconds).thenAccept(leaseGrantResponse -> {
            synchronized (this) {
                this.leaseId = leaseGrantResponse.getID();
                if (this.leaseClient != null) {
                    this.leaseClient.close();
                }
                this.leaseClient = this.client.getLeaseClient().keepAlive(this.leaseId, new StreamObserver<LeaseKeepAliveResponse>() { // from class: org.apache.pulsar.metadata.impl.EtcdMetadataStore.1
                    @Override // io.grpc.stub.StreamObserver
                    public void onNext(LeaseKeepAliveResponse leaseKeepAliveResponse) {
                        if (EtcdMetadataStore.log.isDebugEnabled()) {
                            EtcdMetadataStore.log.debug("On next: {}", leaseKeepAliveResponse);
                        }
                    }

                    @Override // io.grpc.stub.StreamObserver
                    public void onError(Throwable th) {
                        EtcdMetadataStore.log.warn("Lease client error :", th);
                        EtcdMetadataStore.this.receivedSessionEvent(SessionEvent.SessionLost);
                    }

                    @Override // io.grpc.stub.StreamObserver
                    public void onCompleted() {
                        EtcdMetadataStore.log.info("Etcd lease has expired");
                        EtcdMetadataStore.this.receivedSessionEvent(SessionEvent.SessionLost);
                    }
                });
            }
        });
        if (z) {
            thenAccept.exceptionally(th -> {
                log.warn("Failed to create Etcd lease. Retrying later", th);
                this.executor.schedule(() -> {
                    createLease(true);
                }, 1L, TimeUnit.SECONDS);
                return null;
            });
        }
        return thenAccept;
    }

    private void handleWatchResponse(WatchResponse watchResponse) {
        watchResponse.getEvents().forEach(watchEvent -> {
            String byteSequence = watchEvent.getKeyValue().getKey().toString(StandardCharsets.UTF_8);
            if (watchEvent.getEventType() != WatchEvent.EventType.PUT) {
                if (watchEvent.getEventType() == WatchEvent.EventType.DELETE) {
                    receivedNotification(new Notification(NotificationType.Deleted, byteSequence));
                    notifyParentChildrenChanged(byteSequence);
                    return;
                }
                return;
            }
            if (watchEvent.getKeyValue().getVersion() != 1) {
                receivedNotification(new Notification(NotificationType.Modified, byteSequence));
            } else {
                receivedNotification(new Notification(NotificationType.Created, byteSequence));
                notifyParentChildrenChanged(byteSequence);
            }
        });
    }

    @Override // org.apache.pulsar.metadata.impl.AbstractMetadataStore
    protected void receivedSessionEvent(SessionEvent sessionEvent) {
        if (sessionEvent == SessionEvent.SessionReestablished) {
            createLease(true).thenRun(() -> {
                super.receivedSessionEvent(sessionEvent);
            });
        } else {
            super.receivedSessionEvent(sessionEvent);
        }
    }
}
