package org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.impl.metadata;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.collect.Maps;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.statelib.api.mvcc.MVCCAsyncStore;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.RangeMetadata;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.StreamProperties;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.RelatedRanges;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.RelationType;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.StatusCode;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.api.metadata.MetaRangeStore;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.api.metadata.stream.MetaRange;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.impl.metadata.stream.MetaRangeImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImpl.class */
public class MetaRangeStoreImpl implements MetaRangeStore {
    private static final Logger log = LoggerFactory.getLogger(MetaRangeStoreImpl.class);
    private final MVCCAsyncStore<byte[], byte[]> store;
    private final ScheduledExecutorService executor;
    private final StorageContainerPlacementPolicy rangePlacementPolicy;
    private final Map<Long, MetaRangeImpl> streams = Maps.newHashMap();
    private final StorageServerClientManager clientManager;

    public MetaRangeStoreImpl(MVCCAsyncStore<byte[], byte[]> mVCCAsyncStore, StorageContainerPlacementPolicy storageContainerPlacementPolicy, ScheduledExecutorService scheduledExecutorService, StorageServerClientManager storageServerClientManager) {
        this.store = mVCCAsyncStore;
        this.executor = scheduledExecutorService;
        this.rangePlacementPolicy = storageContainerPlacementPolicy;
        this.clientManager = storageServerClientManager;
    }

    private CompletableFuture<GetActiveRangesResponse> createStreamIfMissing(long j, MetaRangeImpl metaRangeImpl, StreamProperties streamProperties) {
        return null == streamProperties ? FutureUtils.value(GetActiveRangesResponse.newBuilder().setCode(StatusCode.STREAM_NOT_FOUND).build()) : metaRangeImpl.create(streamProperties).thenCompose(bool -> {
            if (!bool.booleanValue()) {
                return FutureUtils.value(GetActiveRangesResponse.newBuilder().setCode(StatusCode.INTERNAL_SERVER_ERROR).build());
            }
            synchronized (this.streams) {
                this.streams.put(Long.valueOf(j), metaRangeImpl);
            }
            return getActiveRanges(metaRangeImpl);
        });
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.api.metadata.MetaRangeStore
    public CompletableFuture<GetActiveRangesResponse> getActiveRanges(GetActiveRangesRequest getActiveRangesRequest) {
        long streamId = getActiveRangesRequest.getStreamId();
        MetaRangeImpl metaRangeImpl = this.streams.get(Long.valueOf(streamId));
        if (null != metaRangeImpl) {
            return getActiveRanges(metaRangeImpl);
        }
        MetaRangeImpl metaRangeImpl2 = new MetaRangeImpl(this.store, this.executor, this.rangePlacementPolicy);
        return metaRangeImpl2.load(streamId).thenCompose(metaRange -> {
            if (null == metaRange) {
                return this.clientManager.getStreamProperties(streamId).thenCompose(streamProperties -> {
                    return createStreamIfMissing(streamId, metaRangeImpl2, streamProperties);
                });
            }
            synchronized (this.streams) {
                this.streams.put(Long.valueOf(streamId), (MetaRangeImpl) metaRange);
            }
            return getActiveRanges(metaRange);
        });
    }

    private CompletableFuture<GetActiveRangesResponse> getActiveRanges(MetaRange metaRange) {
        GetActiveRangesResponse.Builder newBuilder = GetActiveRangesResponse.newBuilder();
        return metaRange.getActiveRanges().thenApplyAsync(list -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                RangeMetadata rangeMetadata = (RangeMetadata) it.next();
                newBuilder.addRanges(RelatedRanges.newBuilder().setProps(rangeMetadata.getProps()).setType(RelationType.PARENTS).addAllRelatedRanges(rangeMetadata.getParentsList()));
            }
            return newBuilder.setCode(StatusCode.SUCCESS).build();
        }, (Executor) this.executor).exceptionally((Function<Throwable, ? extends U>) th -> {
            return newBuilder.setCode(StatusCode.INTERNAL_SERVER_ERROR).build();
        });
    }
}
