package org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.impl.metadata;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.pulsar.functions.runtime.shaded.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.api.kv.op.CompareOp;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.api.kv.op.CompareResult;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.api.kv.op.RangeOp;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.api.kv.op.TxnOp;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.api.kv.op.TxnOpBuilder;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.api.kv.options.Options;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.api.kv.result.KeyValue;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.util.Bytes;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.statelib.api.mvcc.MVCCAsyncStore;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.NamespaceMetadata;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.NamespaceProperties;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.StreamConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.StreamName;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.StreamProperties;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.CreateNamespaceRequest;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.CreateNamespaceResponse;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.CreateStreamRequest;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.CreateStreamResponse;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceRequest;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceResponse;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.DeleteStreamRequest;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.DeleteStreamResponse;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.GetNamespaceRequest;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.StatusCode;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.protocol.util.ProtoUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.api.metadata.RootRangeStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/bookkeeper/stream/storage/impl/metadata/RootRangeStoreImpl.class */
public class RootRangeStoreImpl implements RootRangeStore {
    private static final byte SYSTEM_TAG = -1;
    private static final byte NS_NAME_TAG = 1;
    private static final byte NS_ID_TAG = 2;
    private static final byte NS_STREAM_NAME_SEP = 3;
    private static final byte NS_STREAM_ID_SEP = 4;
    private static final byte STREAM_ID_TAG = 5;
    private static final byte NS_END_SEP = -1;
    private final MVCCAsyncStore<byte[], byte[]> store;
    private final StorageContainerPlacementPolicy placementPolicy;
    private final ScheduledExecutorService executor;
    private static final Logger log = LoggerFactory.getLogger(RootRangeStoreImpl.class);
    static final byte[] NS_ID_KEY = {-1, 110, 115, 105, 100};
    static final byte[] STREAM_ID_KEY = {-1, 115, 116, 114, 101, 97, 109, 105, 100};

    static final byte[] getNamespaceNameKey(String str) {
        byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
        byte[] bArr = new byte[bytes.length + 1];
        System.arraycopy(bytes, 0, bArr, 1, bytes.length);
        bArr[0] = 1;
        return bArr;
    }

    static final byte[] getNamespaceIdKey(long j) {
        byte[] bArr = new byte[9];
        bArr[0] = 2;
        Bytes.toBytes(j, bArr, 1);
        return bArr;
    }

    static final byte[] getNamespaceIdEndKey(long j) {
        byte[] bArr = new byte[10];
        bArr[0] = 2;
        Bytes.toBytes(j, bArr, 1);
        bArr[9] = -1;
        return bArr;
    }

    static final byte[] getStreamNameKey(long j, String str) {
        byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
        byte[] bArr = new byte[bytes.length + 8 + 2];
        bArr[0] = 2;
        Bytes.toBytes(j, bArr, 1);
        bArr[9] = 3;
        System.arraycopy(bytes, 0, bArr, 10, bytes.length);
        return bArr;
    }

    static final byte[] getStreamIdKey(long j, long j2) {
        byte[] bArr = new byte[18];
        bArr[0] = 2;
        Bytes.toBytes(j, bArr, 1);
        bArr[9] = 4;
        Bytes.toBytes(j2, bArr, 10);
        return bArr;
    }

    static final byte[] getStreamIdKey(long j) {
        byte[] bArr = new byte[9];
        bArr[0] = 5;
        Bytes.toBytes(j, bArr, 1);
        return bArr;
    }

    public RootRangeStoreImpl(MVCCAsyncStore<byte[], byte[]> mVCCAsyncStore, StorageContainerPlacementPolicy storageContainerPlacementPolicy, ScheduledExecutorService scheduledExecutorService) {
        this.store = mVCCAsyncStore;
        this.placementPolicy = storageContainerPlacementPolicy;
        this.executor = scheduledExecutorService;
    }

    CompletableFuture<KeyValue<byte[], byte[]>> getValue(byte[] bArr) {
        RangeOp<byte[], byte[]> newRange = this.store.getOpFactory().newRange(bArr, Options.get());
        return this.store.range(newRange).thenApplyAsync(rangeResult -> {
            try {
                if (rangeResult.count() <= 0) {
                    return null;
                }
                return (KeyValue) rangeResult.getKvsAndClear().get(0);
            } finally {
                rangeResult.close();
            }
        }).whenComplete((BiConsumer<? super U, ? super Throwable>) (keyValue, th) -> {
            newRange.close();
        });
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.api.metadata.RootRangeStore
    public CompletableFuture<CreateNamespaceResponse> createNamespace(CreateNamespaceRequest createNamespaceRequest) {
        if (log.isTraceEnabled()) {
            log.trace("Received CreateNamespace request : {}", createNamespaceRequest);
        }
        return CreateNamespaceProcessor.of().process(this, createNamespaceRequest, this.executor);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StatusCode verifyCreateNamespaceRequest(CreateNamespaceRequest createNamespaceRequest) {
        String name = createNamespaceRequest.getName();
        StatusCode statusCode = StatusCode.SUCCESS;
        if (!ProtoUtils.validateNamespaceName(name)) {
            log.error("Failed to create namespace due to invalid namespace name {}", name);
            statusCode = StatusCode.INVALID_NAMESPACE_NAME;
        }
        return statusCode;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<CreateNamespaceResponse> doProcessCreateNamespaceRequest(CreateNamespaceRequest createNamespaceRequest) {
        return getValue(NS_ID_KEY).thenCompose(keyValue -> {
            long j;
            long modifiedRevision;
            if (null == keyValue) {
                j = -1;
                modifiedRevision = -1;
            } else {
                j = Bytes.toLong((byte[]) keyValue.value(), 0);
                modifiedRevision = keyValue.modifiedRevision();
                keyValue.close();
            }
            return executeCreateNamespaceTxn(j, modifiedRevision, createNamespaceRequest);
        });
    }

    private CompletableFuture<CreateNamespaceResponse> executeCreateNamespaceTxn(long j, long j2, CreateNamespaceRequest createNamespaceRequest) {
        long j3 = j + 1;
        String name = createNamespaceRequest.getName();
        NamespaceMetadata build = NamespaceMetadata.newBuilder().setProps(NamespaceProperties.newBuilder().setNamespaceId(j3).setNamespaceName(name).setDefaultStreamConf(createNamespaceRequest.getNsConf().getDefaultStreamConf())).build();
        byte[] namespaceNameKey = getNamespaceNameKey(name);
        byte[] bytes = Bytes.toBytes(j3);
        byte[] namespaceIdKey = getNamespaceIdKey(j3);
        byte[] byteArray = build.toByteArray();
        TxnOpBuilder<byte[], byte[]> newTxn = this.store.newTxn();
        CompareOp[] compareOpArr = new CompareOp[2];
        compareOpArr[0] = this.store.newCompareValue(CompareResult.EQUAL, namespaceNameKey, null);
        compareOpArr[1] = j2 < 0 ? this.store.newCompareValue(CompareResult.EQUAL, NS_ID_KEY, null) : this.store.newCompareModRevision(CompareResult.EQUAL, NS_ID_KEY, j2);
        TxnOp<byte[], byte[]> build2 = newTxn.If(compareOpArr).Then(this.store.newPut(namespaceNameKey, bytes), this.store.newPut(namespaceIdKey, byteArray), this.store.newPut(NS_ID_KEY, Bytes.toBytes(j3))).build();
        return this.store.txn(build2).thenApply(txnResult -> {
            try {
                CreateNamespaceResponse.Builder newBuilder = CreateNamespaceResponse.newBuilder();
                if (txnResult.isSuccess()) {
                    newBuilder.setCode(StatusCode.SUCCESS);
                    newBuilder.setNsProps(build.getProps());
                } else {
                    newBuilder.setCode(StatusCode.INTERNAL_SERVER_ERROR);
                }
                CreateNamespaceResponse build3 = newBuilder.build();
                txnResult.close();
                return build3;
            } catch (Throwable th) {
                txnResult.close();
                throw th;
            }
        }).whenComplete((BiConsumer<? super U, ? super Throwable>) (createNamespaceResponse, th) -> {
            build2.close();
        });
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.api.metadata.RootRangeStore
    public CompletableFuture<DeleteNamespaceResponse> deleteNamespace(DeleteNamespaceRequest deleteNamespaceRequest) {
        return DeleteNamespaceProcessor.of().process(this, deleteNamespaceRequest, this.executor);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StatusCode verifyDeleteNamespaceRequest(DeleteNamespaceRequest deleteNamespaceRequest) {
        String name = deleteNamespaceRequest.getName();
        StatusCode statusCode = StatusCode.SUCCESS;
        if (!ProtoUtils.validateNamespaceName(name)) {
            log.error("Failed to delete namespace due to invalid namespace name {}", name);
            statusCode = StatusCode.INVALID_NAMESPACE_NAME;
        }
        return statusCode;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<DeleteNamespaceResponse> doProcessDeleteNamespaceRequest(DeleteNamespaceRequest deleteNamespaceRequest) {
        String name = deleteNamespaceRequest.getName();
        return getNamespace(name).thenCompose(namespaceMetadata -> {
            return deleteNamespace(name, namespaceMetadata);
        });
    }

    CompletableFuture<DeleteNamespaceResponse> deleteNamespace(String str, NamespaceMetadata namespaceMetadata) {
        if (null == namespaceMetadata) {
            return FutureUtils.value(DeleteNamespaceResponse.newBuilder().setCode(StatusCode.NAMESPACE_NOT_FOUND).build());
        }
        byte[] namespaceNameKey = getNamespaceNameKey(str);
        byte[] namespaceIdKey = getNamespaceIdKey(namespaceMetadata.getProps().getNamespaceId());
        TxnOp<byte[], byte[]> build = this.store.newTxn().If(this.store.newCompareValue(CompareResult.NOT_EQUAL, namespaceNameKey, null), this.store.newCompareValue(CompareResult.NOT_EQUAL, namespaceIdKey, null)).Then(this.store.newDelete(namespaceNameKey), this.store.newDeleteRange(namespaceIdKey, getNamespaceIdEndKey(namespaceMetadata.getProps().getNamespaceId()))).build();
        return this.store.txn(build).thenApply(txnResult -> {
            try {
                DeleteNamespaceResponse.Builder newBuilder = DeleteNamespaceResponse.newBuilder();
                if (txnResult.isSuccess()) {
                    newBuilder.setCode(StatusCode.SUCCESS);
                } else {
                    newBuilder.setCode(StatusCode.INTERNAL_SERVER_ERROR);
                }
                return newBuilder.build();
            } finally {
                txnResult.close();
            }
        }).whenComplete((BiConsumer<? super U, ? super Throwable>) (deleteNamespaceResponse, th) -> {
            build.close();
        });
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.api.metadata.RootRangeStore
    public CompletableFuture<GetNamespaceResponse> getNamespace(GetNamespaceRequest getNamespaceRequest) {
        String name = getNamespaceRequest.getName();
        if (ProtoUtils.validateNamespaceName(name)) {
            return getNamespace(getNamespaceRequest.getName()).thenApply(namespaceMetadata -> {
                GetNamespaceResponse.Builder newBuilder = GetNamespaceResponse.newBuilder();
                if (null == namespaceMetadata) {
                    newBuilder.setCode(StatusCode.NAMESPACE_NOT_FOUND);
                } else {
                    newBuilder.setCode(StatusCode.SUCCESS);
                    newBuilder.setNsProps(namespaceMetadata.getProps());
                }
                return newBuilder.build();
            });
        }
        log.error("Failed to get namespace due to invalid namespace name {}", name);
        return FutureUtils.value(GetNamespaceResponse.newBuilder().setCode(StatusCode.INVALID_NAMESPACE_NAME).build());
    }

    private CompletableFuture<NamespaceMetadata> getNamespace(long j) {
        return this.store.get(getNamespaceIdKey(j)).thenCompose(bArr -> {
            NamespaceMetadata parseFrom;
            if (null != bArr) {
                try {
                    parseFrom = NamespaceMetadata.parseFrom(bArr);
                } catch (InvalidProtocolBufferException e) {
                    return FutureUtils.exception(e);
                }
            } else {
                parseFrom = null;
            }
            return FutureUtils.value(parseFrom);
        });
    }

    private CompletableFuture<NamespaceMetadata> getNamespace(String str) {
        return this.store.get(getNamespaceNameKey(str)).thenCompose(bArr -> {
            return null == bArr ? FutureUtils.value(null) : getNamespace(Bytes.toLong(bArr, 0));
        });
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.api.metadata.RootRangeStore
    public CompletableFuture<CreateStreamResponse> createStream(CreateStreamRequest createStreamRequest) {
        String name = createStreamRequest.getName();
        String nsName = createStreamRequest.getNsName();
        StatusCode verifyStreamRequest = verifyStreamRequest(nsName, name);
        return StatusCode.SUCCESS != verifyStreamRequest ? FutureUtils.value(CreateStreamResponse.newBuilder().setCode(verifyStreamRequest).build()) : createStream(nsName, name, createStreamRequest.getStreamConf());
    }

    StatusCode verifyStreamRequest(String str, String str2) {
        StatusCode statusCode = StatusCode.SUCCESS;
        if (!ProtoUtils.validateNamespaceName(str)) {
            log.error("Invalid namespace name {}", str);
            statusCode = StatusCode.INVALID_NAMESPACE_NAME;
        } else if (!ProtoUtils.validateStreamName(str2)) {
            log.error("Invalid stream name {}", str2);
            statusCode = StatusCode.INVALID_STREAM_NAME;
        }
        return statusCode;
    }

    private CompletableFuture<CreateStreamResponse> createStream(String str, String str2, StreamConfiguration streamConfiguration) {
        return getNamespace(str).thenCompose(namespaceMetadata -> {
            return createStream(namespaceMetadata, str2, streamConfiguration);
        }).exceptionally((Function<Throwable, ? extends U>) th -> {
            return CreateStreamResponse.newBuilder().setCode(StatusCode.INTERNAL_SERVER_ERROR).build();
        });
    }

    private CompletableFuture<CreateStreamResponse> createStream(NamespaceMetadata namespaceMetadata, String str, StreamConfiguration streamConfiguration) {
        return null == namespaceMetadata ? FutureUtils.value(CreateStreamResponse.newBuilder().setCode(StatusCode.NAMESPACE_NOT_FOUND).build()) : getValue(STREAM_ID_KEY).thenCompose(keyValue -> {
            long j = -1;
            long j2 = -1;
            if (null != keyValue) {
                j = Bytes.toLong((byte[]) keyValue.value(), 0);
                j2 = keyValue.modifiedRevision();
                keyValue.close();
            }
            return executeCreateStreamTxn(namespaceMetadata.getProps().getNamespaceId(), str, streamConfiguration, j, j2);
        });
    }

    private CompletableFuture<CreateStreamResponse> executeCreateStreamTxn(long j, String str, StreamConfiguration streamConfiguration, long j2, long j3) {
        long j4 = j2 < 0 ? 1024L : j2 + 1;
        StreamProperties build = StreamProperties.newBuilder().setStreamId(j4).setStreamName(str).setStorageContainerId(this.placementPolicy.placeStreamRange(j4, 0L)).setStreamConf(streamConfiguration).build();
        byte[] namespaceIdKey = getNamespaceIdKey(j);
        byte[] streamNameKey = getStreamNameKey(j, str);
        byte[] bytes = Bytes.toBytes(j4);
        byte[] streamIdKey = getStreamIdKey(j, j4);
        byte[] byteArray = build.toByteArray();
        byte[] streamIdKey2 = getStreamIdKey(j4);
        byte[] bytes2 = Bytes.toBytes(j);
        TxnOpBuilder<byte[], byte[]> newTxn = this.store.newTxn();
        CompareOp[] compareOpArr = new CompareOp[3];
        compareOpArr[0] = this.store.newCompareValue(CompareResult.NOT_EQUAL, namespaceIdKey, null);
        compareOpArr[1] = j3 < 0 ? this.store.newCompareValue(CompareResult.EQUAL, STREAM_ID_KEY, null) : this.store.newCompareModRevision(CompareResult.EQUAL, STREAM_ID_KEY, j3);
        compareOpArr[2] = this.store.newCompareValue(CompareResult.EQUAL, streamNameKey, null);
        TxnOp<byte[], byte[]> build2 = newTxn.If(compareOpArr).Then(this.store.newPut(streamNameKey, bytes), this.store.newPut(streamIdKey, byteArray), this.store.newPut(streamIdKey2, bytes2), this.store.newPut(STREAM_ID_KEY, Bytes.toBytes(j4))).build();
        return this.store.txn(build2).thenApply(txnResult -> {
            try {
                CreateStreamResponse.Builder newBuilder = CreateStreamResponse.newBuilder();
                if (txnResult.isSuccess()) {
                    newBuilder.setCode(StatusCode.SUCCESS);
                    newBuilder.setStreamProps(build);
                } else {
                    newBuilder.setCode(StatusCode.INTERNAL_SERVER_ERROR);
                }
                CreateStreamResponse build3 = newBuilder.build();
                txnResult.close();
                build2.close();
                return build3;
            } catch (Throwable th) {
                txnResult.close();
                build2.close();
                throw th;
            }
        }).exceptionally((Function<Throwable, ? extends U>) th -> {
            build2.close();
            return CreateStreamResponse.newBuilder().setCode(StatusCode.INTERNAL_SERVER_ERROR).build();
        });
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.api.metadata.RootRangeStore
    public CompletableFuture<DeleteStreamResponse> deleteStream(DeleteStreamRequest deleteStreamRequest) {
        String name = deleteStreamRequest.getName();
        String nsName = deleteStreamRequest.getNsName();
        StatusCode verifyStreamRequest = verifyStreamRequest(nsName, name);
        return StatusCode.SUCCESS != verifyStreamRequest ? FutureUtils.value(DeleteStreamResponse.newBuilder().setCode(verifyStreamRequest).build()) : deleteStream(nsName, name);
    }

    private CompletableFuture<DeleteStreamResponse> deleteStream(String str, String str2) {
        return getNamespace(str).thenCompose(namespaceMetadata -> {
            return deleteStream(namespaceMetadata, str2);
        });
    }

    private CompletableFuture<DeleteStreamResponse> deleteStream(NamespaceMetadata namespaceMetadata, String str) {
        if (null == namespaceMetadata) {
            return FutureUtils.value(DeleteStreamResponse.newBuilder().setCode(StatusCode.NAMESPACE_NOT_FOUND).build());
        }
        long namespaceId = namespaceMetadata.getProps().getNamespaceId();
        return this.store.get(getStreamNameKey(namespaceId, str)).thenCompose(bArr -> {
            return null == bArr ? FutureUtils.value(DeleteStreamResponse.newBuilder().setCode(StatusCode.STREAM_NOT_FOUND).build()) : deleteStream(namespaceId, Bytes.toLong(bArr, 0), str);
        });
    }

    private CompletableFuture<DeleteStreamResponse> deleteStream(long j, long j2, String str) {
        byte[] namespaceIdKey = getNamespaceIdKey(j);
        byte[] streamNameKey = getStreamNameKey(j, str);
        byte[] streamIdKey = getStreamIdKey(j, j2);
        byte[] streamIdKey2 = getStreamIdKey(j2);
        TxnOp<byte[], byte[]> build = this.store.newTxn().If(this.store.newCompareValue(CompareResult.NOT_EQUAL, namespaceIdKey, null), this.store.newCompareValue(CompareResult.NOT_EQUAL, streamNameKey, null), this.store.newCompareValue(CompareResult.NOT_EQUAL, streamIdKey, null), this.store.newCompareValue(CompareResult.NOT_EQUAL, streamIdKey2, null)).Then(this.store.newDelete(streamIdKey), this.store.newDelete(streamNameKey), this.store.newDelete(streamIdKey2)).build();
        return this.store.txn(build).thenApply(txnResult -> {
            try {
                DeleteStreamResponse.Builder newBuilder = DeleteStreamResponse.newBuilder();
                if (txnResult.isSuccess()) {
                    newBuilder.setCode(StatusCode.SUCCESS);
                } else {
                    newBuilder.setCode(StatusCode.INTERNAL_SERVER_ERROR);
                }
                return newBuilder.build();
            } finally {
                txnResult.close();
            }
        }).whenComplete((BiConsumer<? super U, ? super Throwable>) (deleteStreamResponse, th) -> {
            build.close();
        });
    }

    private CompletableFuture<GetStreamResponse> streamPropertiesToResponse(CompletableFuture<StreamProperties> completableFuture) {
        GetStreamResponse.Builder newBuilder = GetStreamResponse.newBuilder();
        return completableFuture.thenCompose(streamProperties -> {
            return null == streamProperties ? FutureUtils.value(newBuilder.setCode(StatusCode.STREAM_NOT_FOUND).build()) : FutureUtils.value(newBuilder.setCode(StatusCode.SUCCESS).setStreamProps(streamProperties).build());
        }).exceptionally((Function<Throwable, ? extends U>) th -> {
            return newBuilder.setCode(StatusCode.INTERNAL_SERVER_ERROR).build();
        });
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.api.metadata.RootRangeStore
    public CompletableFuture<GetStreamResponse> getStream(GetStreamRequest getStreamRequest) {
        return GetStreamRequest.IdCase.STREAM_ID == getStreamRequest.getIdCase() ? streamPropertiesToResponse(getStreamProps(getStreamRequest.getStreamId())) : GetStreamRequest.IdCase.STREAM_NAME == getStreamRequest.getIdCase() ? getStreamProps(getStreamRequest.getStreamName()) : FutureUtils.value(GetStreamResponse.newBuilder().setCode(StatusCode.ILLEGAL_OP).build());
    }

    CompletableFuture<StreamProperties> getStreamProps(long j) {
        return this.store.get(getStreamIdKey(j)).thenCompose(bArr -> {
            return null == bArr ? FutureUtils.value(null) : getStreamProps(Bytes.toLong(bArr, 0), j);
        });
    }

    CompletableFuture<GetStreamResponse> getStreamProps(StreamName streamName) {
        StatusCode verifyStreamRequest = verifyStreamRequest(streamName.getNamespaceName(), streamName.getStreamName());
        if (StatusCode.SUCCESS != verifyStreamRequest) {
            return FutureUtils.value(GetStreamResponse.newBuilder().setCode(verifyStreamRequest).build());
        }
        return this.store.get(getNamespaceNameKey(streamName.getNamespaceName())).thenCompose(bArr -> {
            return null == bArr ? FutureUtils.value(GetStreamResponse.newBuilder().setCode(StatusCode.NAMESPACE_NOT_FOUND).build()) : streamPropertiesToResponse(getStreamProps(Bytes.toLong(bArr, 0), streamName.getStreamName()));
        });
    }

    CompletableFuture<StreamProperties> getStreamProps(long j, String str) {
        return this.store.get(getStreamNameKey(j, str)).thenCompose(bArr -> {
            return null == bArr ? FutureUtils.value(null) : getStreamProps(j, Bytes.toLong(bArr, 0));
        });
    }

    CompletableFuture<StreamProperties> getStreamProps(long j, long j2) {
        return this.store.get(getStreamIdKey(j, j2)).thenCompose(bArr -> {
            if (null == bArr) {
                return FutureUtils.value(null);
            }
            try {
                return FutureUtils.value(StreamProperties.parseFrom(bArr));
            } catch (InvalidProtocolBufferException e) {
                return FutureUtils.exception(e);
            }
        });
    }
}
