package org.apache.pulsar.metadata.impl.oxia;

import io.streamnative.oxia.client.api.AsyncOxiaClient;
import io.streamnative.oxia.client.api.DeleteOption;
import io.streamnative.oxia.client.api.Notification;
import io.streamnative.oxia.client.api.OxiaClientBuilder;
import io.streamnative.oxia.client.api.PutOption;
import io.streamnative.oxia.client.api.PutResult;
import io.streamnative.oxia.client.api.Version;
import io.streamnative.oxia.client.api.exceptions.KeyAlreadyExistsException;
import io.streamnative.oxia.client.api.exceptions.UnexpectedVersionIdException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.time.Duration;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.stream.Stream;
import lombok.NonNull;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-metadata-3.3.0.9.jar:org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.class */
public class OxiaMetadataStore extends AbstractMetadataStore {
    private final AsyncOxiaClient client;
    private final String identity;
    private Optional<MetadataEventSynchronizer> synchronizer;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) OxiaMetadataStore.class);
    private static final byte[] EMPTY_VALUE = new byte[0];
    private static final Set<PutOption> IF_RECORD_DOES_NOT_EXIST = Collections.singleton(PutOption.IfRecordDoesNotExist);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-metadata-3.3.0.9.jar:org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore$PathWithPutResult.class */
    public static final class PathWithPutResult extends Record {
        private final String path;
        private final PutResult result;

        private PathWithPutResult(String str, PutResult putResult) {
            this.path = str;
            this.result = putResult;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, PathWithPutResult.class), PathWithPutResult.class, "path;result", "FIELD:Lorg/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore$PathWithPutResult;->path:Ljava/lang/String;", "FIELD:Lorg/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore$PathWithPutResult;->result:Lio/streamnative/oxia/client/api/PutResult;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, PathWithPutResult.class), PathWithPutResult.class, "path;result", "FIELD:Lorg/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore$PathWithPutResult;->path:Ljava/lang/String;", "FIELD:Lorg/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore$PathWithPutResult;->result:Lio/streamnative/oxia/client/api/PutResult;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, PathWithPutResult.class, Object.class), PathWithPutResult.class, "path;result", "FIELD:Lorg/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore$PathWithPutResult;->path:Ljava/lang/String;", "FIELD:Lorg/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore$PathWithPutResult;->result:Lio/streamnative/oxia/client/api/PutResult;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public String path() {
            return this.path;
        }

        public PutResult result() {
            return this.result;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public OxiaMetadataStore(@NonNull String str, @NonNull String str2, @NonNull MetadataStoreConfig metadataStoreConfig, boolean z) throws Exception {
        super("oxia-metadata");
        if (str == null) {
            throw new NullPointerException("serviceAddress is marked non-null but is null");
        }
        if (str2 == null) {
            throw new NullPointerException("namespace is marked non-null but is null");
        }
        if (metadataStoreConfig == null) {
            throw new NullPointerException("metadataStoreConfig is marked non-null but is null");
        }
        int batchingMaxDelayMillis = metadataStoreConfig.isBatchingEnabled() ? metadataStoreConfig.getBatchingMaxDelayMillis() : 0;
        updateMetadataEventSynchronizer(metadataStoreConfig.getSynchronizer());
        this.identity = UUID.randomUUID().toString();
        this.client = OxiaClientBuilder.create(str).clientIdentifier(this.identity).namespace(str2).sessionTimeout(Duration.ofMillis(metadataStoreConfig.getSessionTimeoutMillis())).batchLinger(Duration.ofMillis(batchingMaxDelayMillis)).maxRequestsPerBatch(metadataStoreConfig.getBatchingMaxOperations()).asyncClient().get();
        this.client.notifications(this::notificationCallback);
        super.registerSyncListener(Optional.ofNullable(metadataStoreConfig.getSynchronizer()));
    }

    private void notificationCallback(Notification notification) {
        if (notification instanceof Notification.KeyCreated) {
            Notification.KeyCreated keyCreated = (Notification.KeyCreated) notification;
            receivedNotification(new org.apache.pulsar.metadata.api.Notification(NotificationType.Created, keyCreated.key()));
            notifyParentChildrenChanged(keyCreated.key());
        } else if (notification instanceof Notification.KeyModified) {
            receivedNotification(new org.apache.pulsar.metadata.api.Notification(NotificationType.Modified, ((Notification.KeyModified) notification).key()));
        } else {
            if (!(notification instanceof Notification.KeyDeleted)) {
                log.error("Unknown notification type {}", notification);
                return;
            }
            Notification.KeyDeleted keyDeleted = (Notification.KeyDeleted) notification;
            receivedNotification(new org.apache.pulsar.metadata.api.Notification(NotificationType.Deleted, keyDeleted.key()));
            notifyParentChildrenChanged(keyDeleted.key());
        }
    }

    Optional<GetResult> convertGetResult(String str, io.streamnative.oxia.client.api.GetResult getResult) {
        return getResult == null ? Optional.empty() : Optional.of(getResult).map(getResult2 -> {
            return new GetResult(getResult2.getValue(), convertStat(str, getResult2.getVersion()));
        });
    }

    Stat convertStat(String str, Version version) {
        long versionId = version.versionId();
        long createdTimestamp = version.createdTimestamp();
        long modifiedTimestamp = version.modifiedTimestamp();
        boolean isPresent = version.sessionId().isPresent();
        Stream<String> stream = version.clientIdentifier().stream();
        String str2 = this.identity;
        Objects.requireNonNull(str2);
        return new Stat(str, versionId, createdTimestamp, modifiedTimestamp, isPresent, stream.anyMatch((v1) -> {
            return r8.equals(v1);
        }), version.modificationsCount() == 0);
    }

    @Override // org.apache.pulsar.metadata.impl.AbstractMetadataStore
    protected CompletableFuture<List<String>> getChildrenFromStore(String str) {
        String str2 = str + "/";
        return this.client.list(str2, str2 + "/").thenApply(list -> {
            return list.stream().map(str3 -> {
                return str3.substring(str2.length());
            }).toList();
        }).exceptionallyCompose((Function<Throwable, ? extends CompletionStage<U>>) this::convertException);
    }

    @Override // org.apache.pulsar.metadata.impl.AbstractMetadataStore
    protected CompletableFuture<Boolean> existsFromStore(String str) {
        return this.client.get(str).thenApply((v0) -> {
            return Objects.nonNull(v0);
        }).exceptionallyCompose((Function<Throwable, ? extends CompletionStage<U>>) this::convertException);
    }

    @Override // org.apache.pulsar.metadata.impl.AbstractMetadataStore
    protected CompletableFuture<Optional<GetResult>> storeGet(String str) {
        return this.client.get(str).thenApply(getResult -> {
            return convertGetResult(str, getResult);
        }).exceptionallyCompose((Function<Throwable, ? extends CompletionStage<U>>) this::convertException);
    }

    @Override // org.apache.pulsar.metadata.impl.AbstractMetadataStore
    protected CompletableFuture<Void> storeDelete(String str, Optional<Long> optional) {
        return getChildrenFromStore(str).thenCompose(list -> {
            if (!list.isEmpty()) {
                return CompletableFuture.failedFuture(new MetadataStoreException("Key '" + str + "' has children"));
            }
            return this.client.delete(str, (Set) optional.map(l -> {
                return Collections.singleton(DeleteOption.IfVersionIdEquals(l.longValue()));
            }).orElse(Collections.emptySet())).thenCompose(bool -> {
                return !bool.booleanValue() ? CompletableFuture.failedFuture(new MetadataStoreException.NotFoundException("Key '" + str + "' does not exist")) : CompletableFuture.completedFuture((Void) null);
            }).exceptionallyCompose((Function<Throwable, ? extends CompletionStage<U>>) this::convertException);
        });
    }

    @Override // org.apache.pulsar.metadata.impl.AbstractMetadataStore
    protected CompletableFuture<Stat> storePut(String str, byte[] bArr, Optional<Long> optional, EnumSet<CreateOption> enumSet) {
        return createParents(str).thenCompose(r10 -> {
            CompletableFuture completedFuture;
            Optional optional2 = optional;
            if (optional2.isPresent() && ((Long) optional2.get()).longValue() != -1 && enumSet.contains(CreateOption.Sequential)) {
                return CompletableFuture.failedFuture(new MetadataStoreException("Can't have expectedVersion and Sequential at the same time"));
            }
            if (enumSet.contains(CreateOption.Sequential)) {
                String parent = parent(str);
                completedFuture = this.client.put(parent == null ? "/" : parent, new byte[0]).thenApply(putResult -> {
                    return String.format("%s%010d", str, Long.valueOf(putResult.version().modificationsCount()));
                });
                optional2 = Optional.of(-1L);
            } else {
                completedFuture = CompletableFuture.completedFuture(str);
            }
            HashSet hashSet = new HashSet();
            Optional map = optional2.map(l -> {
                return l.longValue() == -1 ? PutOption.IfRecordDoesNotExist : PutOption.IfVersionIdEquals(l.longValue());
            });
            Objects.requireNonNull(hashSet);
            map.ifPresent((v1) -> {
                r1.add(v1);
            });
            if (enumSet.contains(CreateOption.Ephemeral)) {
                hashSet.add(PutOption.AsEphemeralRecord);
            }
            return completedFuture.thenCompose(str2 -> {
                return this.client.put(str2, bArr, hashSet).thenApply(putResult2 -> {
                    return new PathWithPutResult(str2, putResult2);
                });
            }).thenApply(pathWithPutResult -> {
                return convertStat(pathWithPutResult.path(), pathWithPutResult.result().version());
            }).exceptionallyCompose(this::convertException);
        });
    }

    private <T> CompletionStage<T> convertException(Throwable th) {
        return ((th.getCause() instanceof UnexpectedVersionIdException) || (th.getCause() instanceof KeyAlreadyExistsException)) ? CompletableFuture.failedFuture(new MetadataStoreException.BadVersionException(th.getCause())) : th.getCause() instanceof IllegalStateException ? CompletableFuture.failedFuture(new MetadataStoreException.AlreadyClosedException(th.getCause())) : CompletableFuture.failedFuture(th.getCause());
    }

    private CompletableFuture<Void> createParents(String str) {
        String parent = parent(str);
        return (parent == null || parent.isEmpty()) ? CompletableFuture.completedFuture(null) : exists(parent).thenCompose(bool -> {
            return bool.booleanValue() ? CompletableFuture.completedFuture(null) : this.client.put(parent, EMPTY_VALUE, IF_RECORD_DOES_NOT_EXIST).thenCompose(putResult -> {
                return createParents(parent);
            });
        }).exceptionallyCompose((Function<Throwable, ? extends CompletionStage<U>>) th -> {
            return th.getCause() instanceof KeyAlreadyExistsException ? CompletableFuture.completedFuture(null) : CompletableFuture.failedFuture(th.getCause());
        });
    }

    @Override // org.apache.pulsar.metadata.impl.AbstractMetadataStore, java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.client != null) {
            this.client.close();
        }
        super.close();
    }

    @Override // org.apache.pulsar.metadata.api.extended.MetadataStoreExtended
    public Optional<MetadataEventSynchronizer> getMetadataEventSynchronizer() {
        return this.synchronizer;
    }

    @Override // org.apache.pulsar.metadata.api.extended.MetadataStoreExtended
    public void updateMetadataEventSynchronizer(MetadataEventSynchronizer metadataEventSynchronizer) {
        this.synchronizer = Optional.ofNullable(metadataEventSynchronizer);
        registerSyncListener(this.synchronizer);
    }
}
