package io.streamnative.oxia.client;

import io.grpc.netty.shaded.io.netty.util.concurrent.DefaultThreadFactory;
import io.grpc.stub.StreamObserver;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.streamnative.oxia.client.api.AsyncOxiaClient;
import io.streamnative.oxia.client.api.DeleteOption;
import io.streamnative.oxia.client.api.DeleteRangeOption;
import io.streamnative.oxia.client.api.GetOption;
import io.streamnative.oxia.client.api.GetResult;
import io.streamnative.oxia.client.api.ListOption;
import io.streamnative.oxia.client.api.Notification;
import io.streamnative.oxia.client.api.PutOption;
import io.streamnative.oxia.client.api.PutResult;
import io.streamnative.oxia.client.api.RangeScanConsumer;
import io.streamnative.oxia.client.api.RangeScanOption;
import io.streamnative.oxia.client.batch.BatchManager;
import io.streamnative.oxia.client.batch.Operation;
import io.streamnative.oxia.client.grpc.OxiaBackoffProvider;
import io.streamnative.oxia.client.grpc.OxiaStub;
import io.streamnative.oxia.client.grpc.OxiaStubManager;
import io.streamnative.oxia.client.grpc.OxiaStubProvider;
import io.streamnative.oxia.client.metrics.Counter;
import io.streamnative.oxia.client.metrics.InstrumentProvider;
import io.streamnative.oxia.client.metrics.LatencyHistogram;
import io.streamnative.oxia.client.metrics.Unit;
import io.streamnative.oxia.client.metrics.UpDownCounter;
import io.streamnative.oxia.client.notify.NotificationManager;
import io.streamnative.oxia.client.session.SessionManager;
import io.streamnative.oxia.client.shard.ShardManager;
import io.streamnative.oxia.proto.KeyComparisonType;
import io.streamnative.oxia.proto.ListRequest;
import io.streamnative.oxia.proto.ListResponse;
import io.streamnative.oxia.proto.RangeScanRequest;
import io.streamnative.oxia.proto.RangeScanResponse;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Stream;
import lombok.NonNull;
import org.apache.zookeeper.audit.AuditConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/oxia-client-0.4.5.jar:io/streamnative/oxia/client/AsyncOxiaClientImpl.class */
class AsyncOxiaClientImpl implements AsyncOxiaClient {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) AsyncOxiaClientImpl.class);

    @NonNull
    private final String clientIdentifier;

    @NonNull
    private final InstrumentProvider instrumentProvider;

    @NonNull
    private final OxiaStubManager stubManager;

    @NonNull
    private final ShardManager shardManager;

    @NonNull
    private final NotificationManager notificationManager;

    @NonNull
    private final BatchManager readBatchManager;

    @NonNull
    private final BatchManager writeBatchManager;

    @NonNull
    private final SessionManager sessionManager;
    private volatile boolean closed;
    private final Counter counterPutBytes;
    private final Counter counterGetBytes;
    private final Counter counterListBytes;
    private final Counter counterRangeScanBytes;
    private final UpDownCounter gaugePendingPutRequests;
    private final UpDownCounter gaugePendingGetRequests;
    private final UpDownCounter gaugePendingListRequests;
    private final UpDownCounter gaugePendingRangeScanRequests;
    private final UpDownCounter gaugePendingDeleteRequests;
    private final UpDownCounter gaugePendingDeleteRangeRequests;
    private final UpDownCounter gaugePendingPutBytes;
    private final LatencyHistogram histogramPutLatency;
    private final LatencyHistogram histogramGetLatency;
    private final LatencyHistogram histogramDeleteLatency;
    private final LatencyHistogram histogramDeleteRangeLatency;
    private final LatencyHistogram histogramListLatency;
    private final LatencyHistogram histogramRangeScanLatency;
    private final ScheduledExecutorService scheduledExecutor;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:META-INF/bundled-dependencies/oxia-client-0.4.5.jar:io/streamnative/oxia/client/AsyncOxiaClientImpl$RangeScanConsumerWithShard.class */
    public interface RangeScanConsumerWithShard {
        void onNext(long j, GetResult getResult);

        void onError(Throwable th);

        void onCompleted(long j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @NonNull
    public static CompletableFuture<AsyncOxiaClient> newInstance(@NonNull ClientConfig clientConfig) {
        if (clientConfig == null) {
            throw new NullPointerException("config is marked non-null but is null");
        }
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("oxia-client"));
        OxiaStubManager oxiaStubManager = new OxiaStubManager(clientConfig.namespace(), clientConfig.authentication(), clientConfig.enableTls(), OxiaBackoffProvider.create(clientConfig.connectionBackoffMinDelay(), clientConfig.connectionBackoffMaxDelay()));
        InstrumentProvider instrumentProvider = new InstrumentProvider(clientConfig.openTelemetry(), clientConfig.namespace());
        ShardManager shardManager = new ShardManager(newSingleThreadScheduledExecutor, oxiaStubManager.getStub(clientConfig.serviceAddress()), instrumentProvider, clientConfig.namespace());
        NotificationManager notificationManager = new NotificationManager(newSingleThreadScheduledExecutor, oxiaStubManager, shardManager, instrumentProvider);
        OxiaStubProvider oxiaStubProvider = new OxiaStubProvider(oxiaStubManager, shardManager);
        shardManager.addCallback(notificationManager);
        BatchManager newReadBatchManager = BatchManager.newReadBatchManager(clientConfig, oxiaStubProvider, instrumentProvider);
        SessionManager sessionManager = new SessionManager(newSingleThreadScheduledExecutor, clientConfig, oxiaStubProvider, instrumentProvider);
        shardManager.addCallback(sessionManager);
        AsyncOxiaClientImpl asyncOxiaClientImpl = new AsyncOxiaClientImpl(clientConfig.clientIdentifier(), newSingleThreadScheduledExecutor, instrumentProvider, oxiaStubManager, shardManager, notificationManager, newReadBatchManager, BatchManager.newWriteBatchManager(clientConfig, oxiaStubProvider, sessionManager, instrumentProvider), sessionManager);
        return shardManager.start().thenApply(r3 -> {
            return asyncOxiaClientImpl;
        });
    }

    AsyncOxiaClientImpl(@NonNull String str, @NonNull ScheduledExecutorService scheduledExecutorService, @NonNull InstrumentProvider instrumentProvider, @NonNull OxiaStubManager oxiaStubManager, @NonNull ShardManager shardManager, @NonNull NotificationManager notificationManager, @NonNull BatchManager batchManager, @NonNull BatchManager batchManager2, @NonNull SessionManager sessionManager) {
        if (str == null) {
            throw new NullPointerException("clientIdentifier is marked non-null but is null");
        }
        if (scheduledExecutorService == null) {
            throw new NullPointerException("scheduledExecutor is marked non-null but is null");
        }
        if (instrumentProvider == null) {
            throw new NullPointerException("instrumentProvider is marked non-null but is null");
        }
        if (oxiaStubManager == null) {
            throw new NullPointerException("stubManager is marked non-null but is null");
        }
        if (shardManager == null) {
            throw new NullPointerException("shardManager is marked non-null but is null");
        }
        if (notificationManager == null) {
            throw new NullPointerException("notificationManager is marked non-null but is null");
        }
        if (batchManager == null) {
            throw new NullPointerException("readBatchManager is marked non-null but is null");
        }
        if (batchManager2 == null) {
            throw new NullPointerException("writeBatchManager is marked non-null but is null");
        }
        if (sessionManager == null) {
            throw new NullPointerException("sessionManager is marked non-null but is null");
        }
        this.clientIdentifier = str;
        this.instrumentProvider = instrumentProvider;
        this.stubManager = oxiaStubManager;
        this.shardManager = shardManager;
        this.notificationManager = notificationManager;
        this.readBatchManager = batchManager;
        this.writeBatchManager = batchManager2;
        this.sessionManager = sessionManager;
        this.scheduledExecutor = scheduledExecutorService;
        this.counterPutBytes = instrumentProvider.newCounter("oxia.client.ops.size", Unit.Bytes, "Total number of bytes in operations", Attributes.of(AttributeKey.stringKey("oxia.op"), "put"));
        this.counterGetBytes = instrumentProvider.newCounter("oxia.client.ops.size", Unit.Bytes, "Total number of bytes in operations", Attributes.of(AttributeKey.stringKey("oxia.op"), "get"));
        this.counterListBytes = instrumentProvider.newCounter("oxia.client.ops.size", Unit.Bytes, "Total number of bytes in operations", Attributes.of(AttributeKey.stringKey("oxia.op"), "list"));
        this.counterRangeScanBytes = instrumentProvider.newCounter("oxia.client.ops.size", Unit.Bytes, "Total number of bytes in operations", Attributes.of(AttributeKey.stringKey("oxia.op"), "range-scan"));
        this.gaugePendingPutRequests = instrumentProvider.newUpDownCounter("oxia.client.ops.pending", Unit.Events, "Current number of outstanding requests", Attributes.of(AttributeKey.stringKey("oxia.op"), "put"));
        this.gaugePendingGetRequests = instrumentProvider.newUpDownCounter("oxia.client.ops.pending", Unit.Events, "Current number of outstanding requests", Attributes.of(AttributeKey.stringKey("oxia.op"), "get"));
        this.gaugePendingListRequests = instrumentProvider.newUpDownCounter("oxia.client.ops.pending", Unit.Events, "Current number of outstanding requests", Attributes.of(AttributeKey.stringKey("oxia.op"), "list"));
        this.gaugePendingRangeScanRequests = instrumentProvider.newUpDownCounter("oxia.client.ops.pending", Unit.Events, "Current number of outstanding requests", Attributes.of(AttributeKey.stringKey("oxia.op"), "range-scan"));
        this.gaugePendingDeleteRequests = instrumentProvider.newUpDownCounter("oxia.client.ops.pending", Unit.Events, "Current number of outstanding requests", Attributes.of(AttributeKey.stringKey("oxia.op"), AuditConstants.OP_DELETE));
        this.gaugePendingDeleteRangeRequests = instrumentProvider.newUpDownCounter("oxia.client.ops.pending", Unit.Events, "Current number of outstanding requests", Attributes.of(AttributeKey.stringKey("oxia.op"), "delete-range"));
        this.gaugePendingPutBytes = instrumentProvider.newUpDownCounter("oxia.client.ops.outstanding", Unit.Bytes, "Current number of outstanding bytes in put operations", Attributes.of(AttributeKey.stringKey("oxia.op"), "put"));
        this.histogramPutLatency = instrumentProvider.newLatencyHistogram("oxia.client.ops", "Duration of operations", Attributes.of(AttributeKey.stringKey("oxia.op"), "put"));
        this.histogramGetLatency = instrumentProvider.newLatencyHistogram("oxia.client.ops", "Duration of operations", Attributes.of(AttributeKey.stringKey("oxia.op"), "get"));
        this.histogramDeleteLatency = instrumentProvider.newLatencyHistogram("oxia.client.ops", "Duration of operations", Attributes.of(AttributeKey.stringKey("oxia.op"), AuditConstants.OP_DELETE));
        this.histogramDeleteRangeLatency = instrumentProvider.newLatencyHistogram("oxia.client.ops", "Duration of operations", Attributes.of(AttributeKey.stringKey("oxia.op"), "delete-range"));
        this.histogramListLatency = instrumentProvider.newLatencyHistogram("oxia.client.ops", "Duration of operations", Attributes.of(AttributeKey.stringKey("oxia.op"), "list"));
        this.histogramRangeScanLatency = instrumentProvider.newLatencyHistogram("oxia.client.ops", "Duration of operations", Attributes.of(AttributeKey.stringKey("oxia.op"), "range-scan"));
    }

    @Override // io.streamnative.oxia.client.api.AsyncOxiaClient
    @NonNull
    public CompletableFuture<PutResult> put(String str, byte[] bArr) {
        return put(str, bArr, Collections.emptySet());
    }

    @Override // io.streamnative.oxia.client.api.AsyncOxiaClient
    @NonNull
    public CompletableFuture<PutResult> put(String str, byte[] bArr, Set<PutOption> set) {
        CompletableFuture<PutResult> failedFuture;
        long nanoTime = System.nanoTime();
        try {
            checkIfClosed();
            Objects.requireNonNull(str);
            Objects.requireNonNull(bArr);
            failedFuture = internalPut(str, bArr, set);
        } catch (RuntimeException e) {
            failedFuture = CompletableFuture.failedFuture(e);
        }
        return failedFuture.whenComplete((putResult, th) -> {
            this.gaugePendingPutRequests.decrement();
            this.gaugePendingPutBytes.add(-bArr.length);
            if (th != null) {
                this.histogramPutLatency.recordFailure(System.nanoTime() - nanoTime);
            } else {
                this.counterPutBytes.add(bArr.length);
                this.histogramPutLatency.recordSuccess(System.nanoTime() - nanoTime);
            }
        });
    }

    private CompletableFuture<PutResult> internalPut(String str, byte[] bArr, Set<PutOption> set) {
        this.gaugePendingPutRequests.increment();
        this.gaugePendingPutBytes.add(bArr.length);
        Optional<String> partitionKey = OptionsUtils.getPartitionKey(set);
        long shardForKey = this.shardManager.getShardForKey(partitionKey.orElse(str));
        OptionalLong versionId = OptionsUtils.getVersionId(set);
        Optional<List<Long>> sequenceKeysDeltas = OptionsUtils.getSequenceKeysDeltas(set);
        CompletableFuture<PutResult> completableFuture = new CompletableFuture<>();
        if (OptionsUtils.isEphemeral(set)) {
            this.sessionManager.getSession(shardForKey).thenAccept(session -> {
                this.writeBatchManager.getBatcher(shardForKey).add(new Operation.WriteOperation.PutOperation(completableFuture, str, partitionKey, sequenceKeysDeltas, bArr, versionId, OptionalLong.of(session.getSessionId()), Optional.of(this.clientIdentifier)));
            }).exceptionally(th -> {
                completableFuture.completeExceptionally(th);
                return null;
            });
        } else {
            this.writeBatchManager.getBatcher(shardForKey).add(new Operation.WriteOperation.PutOperation(completableFuture, str, partitionKey, sequenceKeysDeltas, bArr, versionId, OptionalLong.empty(), Optional.empty()));
        }
        return completableFuture;
    }

    @Override // io.streamnative.oxia.client.api.AsyncOxiaClient
    @NonNull
    public CompletableFuture<Boolean> delete(String str) {
        return delete(str, Collections.emptySet());
    }

    @Override // io.streamnative.oxia.client.api.AsyncOxiaClient
    @NonNull
    public CompletableFuture<Boolean> delete(String str, Set<DeleteOption> set) {
        long nanoTime = System.nanoTime();
        this.gaugePendingDeleteRequests.increment();
        CompletableFuture completableFuture = new CompletableFuture();
        try {
            checkIfClosed();
            Objects.requireNonNull(str);
            OptionalLong versionId = OptionsUtils.getVersionId(set);
            this.writeBatchManager.getBatcher(this.shardManager.getShardForKey(OptionsUtils.getPartitionKey(set).orElse(str))).add(new Operation.WriteOperation.DeleteOperation(completableFuture, str, versionId));
        } catch (RuntimeException e) {
            completableFuture.completeExceptionally(e);
        }
        return completableFuture.whenComplete((bool, th) -> {
            this.gaugePendingDeleteRequests.decrement();
            if (th == null) {
                this.histogramDeleteLatency.recordSuccess(System.nanoTime() - nanoTime);
            } else {
                this.histogramDeleteLatency.recordFailure(System.nanoTime() - nanoTime);
            }
        });
    }

    @Override // io.streamnative.oxia.client.api.AsyncOxiaClient
    @NonNull
    public CompletableFuture<Void> deleteRange(String str, String str2) {
        return deleteRange(str, str2, Collections.emptySet());
    }

    @Override // io.streamnative.oxia.client.api.AsyncOxiaClient
    @NonNull
    public CompletableFuture<Void> deleteRange(String str, String str2, Set<DeleteRangeOption> set) {
        CompletableFuture<Void> failedFuture;
        long nanoTime = System.nanoTime();
        this.gaugePendingDeleteRangeRequests.increment();
        try {
            checkIfClosed();
            Objects.requireNonNull(str);
            Objects.requireNonNull(str2);
            Optional<String> partitionKey = OptionsUtils.getPartitionKey(set);
            if (partitionKey.isPresent()) {
                long shardForKey = this.shardManager.getShardForKey(partitionKey.get());
                failedFuture = new CompletableFuture<>();
                this.writeBatchManager.getBatcher(shardForKey).add(new Operation.WriteOperation.DeleteRangeOperation(failedFuture, str, str2));
            } else {
                Stream<Long> stream = this.shardManager.allShardIds().stream();
                BatchManager batchManager = this.writeBatchManager;
                Objects.requireNonNull(batchManager);
                failedFuture = CompletableFuture.allOf((CompletableFuture[]) stream.map((v1) -> {
                    return r1.getBatcher(v1);
                }).map(batcher -> {
                    CompletableFuture completableFuture = new CompletableFuture();
                    batcher.add(new Operation.WriteOperation.DeleteRangeOperation(completableFuture, str, str2));
                    return completableFuture;
                }).toArray(i -> {
                    return new CompletableFuture[i];
                }));
            }
        } catch (RuntimeException e) {
            failedFuture = CompletableFuture.failedFuture(e);
        }
        return failedFuture.whenComplete((r9, th) -> {
            this.gaugePendingDeleteRangeRequests.decrement();
            if (th == null) {
                this.histogramDeleteRangeLatency.recordSuccess(System.nanoTime() - nanoTime);
            } else {
                this.histogramDeleteRangeLatency.recordFailure(System.nanoTime() - nanoTime);
            }
        });
    }

    @Override // io.streamnative.oxia.client.api.AsyncOxiaClient
    @NonNull
    public CompletableFuture<GetResult> get(String str) {
        return get(str, Collections.emptySet());
    }

    @Override // io.streamnative.oxia.client.api.AsyncOxiaClient
    @NonNull
    public CompletableFuture<GetResult> get(String str, Set<GetOption> set) {
        long nanoTime = System.nanoTime();
        this.gaugePendingGetRequests.increment();
        CompletableFuture<GetResult> completableFuture = new CompletableFuture<>();
        try {
            checkIfClosed();
            Objects.requireNonNull(str);
            internalGet(str, set, completableFuture);
        } catch (RuntimeException e) {
            completableFuture.completeExceptionally(e);
        }
        return completableFuture.whenComplete((getResult, th) -> {
            this.gaugePendingGetRequests.decrement();
            if (th != null) {
                this.histogramGetLatency.recordFailure(System.nanoTime() - nanoTime);
                return;
            }
            if (getResult != null) {
                this.counterGetBytes.add(getResult.getValue().length);
            }
            this.histogramGetLatency.recordSuccess(System.nanoTime() - nanoTime);
        });
    }

    private void internalGet(String str, Set<GetOption> set, CompletableFuture<GetResult> completableFuture) {
        KeyComparisonType comparisonType = OptionsUtils.getComparisonType(set);
        Optional<String> partitionKey = OptionsUtils.getPartitionKey(set);
        if (comparisonType != KeyComparisonType.EQUAL && !partitionKey.isPresent()) {
            internalGetFloorCeiling(str, comparisonType, completableFuture);
        } else {
            this.readBatchManager.getBatcher(this.shardManager.getShardForKey(partitionKey.orElse(str))).add(new Operation.ReadOperation.GetOperation(completableFuture, str, comparisonType));
        }
    }

    private void internalGetFloorCeiling(String str, KeyComparisonType keyComparisonType, CompletableFuture<GetResult> completableFuture) {
        ArrayList arrayList = new ArrayList();
        Iterator<Long> it = this.shardManager.allShardIds().iterator();
        while (it.hasNext()) {
            long longValue = it.next().longValue();
            CompletableFuture completableFuture2 = new CompletableFuture();
            this.readBatchManager.getBatcher(longValue).add(new Operation.ReadOperation.GetOperation(completableFuture2, str, keyComparisonType));
            arrayList.add(completableFuture2);
        }
        CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])).whenComplete((r7, th) -> {
            GetResult getResult;
            if (th != null) {
                completableFuture.completeExceptionally(th);
                return;
            }
            try {
                List list = arrayList.stream().map((v0) -> {
                    return v0.join();
                }).filter((v0) -> {
                    return Objects.nonNull(v0);
                }).sorted((getResult2, getResult3) -> {
                    return CompareWithSlash.INSTANCE.compare(getResult2.getKey(), getResult3.getKey());
                }).toList();
                if (list.isEmpty()) {
                    completableFuture.complete(null);
                    return;
                }
                switch (keyComparisonType) {
                    case EQUAL:
                    case UNRECOGNIZED:
                        getResult = null;
                        break;
                    case FLOOR:
                    case LOWER:
                        getResult = (GetResult) list.get(list.size() - 1);
                        break;
                    case CEILING:
                    case HIGHER:
                        getResult = (GetResult) list.get(0);
                        break;
                    default:
                        throw new IncompatibleClassChangeError();
                }
                completableFuture.complete(getResult);
            } catch (Throwable th) {
                completableFuture.completeExceptionally(th);
            }
        });
    }

    @Override // io.streamnative.oxia.client.api.AsyncOxiaClient
    @NonNull
    public CompletableFuture<List<String>> list(String str, String str2) {
        return list(str, str2, Collections.emptySet());
    }

    @Override // io.streamnative.oxia.client.api.AsyncOxiaClient
    @NonNull
    public CompletableFuture<List<String>> list(String str, String str2, Set<ListOption> set) {
        CompletableFuture<List<String>> failedFuture;
        long nanoTime = System.nanoTime();
        this.gaugePendingListRequests.increment();
        try {
            checkIfClosed();
            Objects.requireNonNull(str);
            Objects.requireNonNull(str2);
            Optional<String> partitionKey = OptionsUtils.getPartitionKey(set);
            failedFuture = partitionKey.isPresent() ? internalShardlist(this.shardManager.getShardForKey(partitionKey.get()), str, str2) : internalListMultiShards(str, str2);
        } catch (Exception e) {
            failedFuture = CompletableFuture.failedFuture(e);
        }
        return failedFuture.whenComplete((list, th) -> {
            this.gaugePendingListRequests.decrement();
            if (th != null) {
                this.histogramListLatency.recordFailure(System.nanoTime() - nanoTime);
            } else {
                this.counterListBytes.add(list.stream().mapToInt((v0) -> {
                    return v0.length();
                }).sum());
                this.histogramListLatency.recordSuccess(System.nanoTime() - nanoTime);
            }
        });
    }

    @Override // io.streamnative.oxia.client.api.AsyncOxiaClient
    public void notifications(@NonNull Consumer<Notification> consumer) {
        if (consumer == null) {
            throw new NullPointerException("notificationCallback is marked non-null but is null");
        }
        checkIfClosed();
        this.notificationManager.registerCallback(consumer);
    }

    private CompletableFuture<List<String>> internalListMultiShards(String str, String str2) {
        ArrayList arrayList = new ArrayList();
        Iterator<Long> it = this.shardManager.allShardIds().iterator();
        while (it.hasNext()) {
            arrayList.add(internalShardlist(it.next().longValue(), str, str2));
        }
        CompletableFuture<List<String>> completableFuture = new CompletableFuture<>();
        ArrayList arrayList2 = new ArrayList();
        CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])).thenRun(() -> {
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                arrayList2.addAll((Collection) ((CompletableFuture) it2.next()).join());
            }
            arrayList2.sort(CompareWithSlash.INSTANCE);
            completableFuture.complete(arrayList2);
        }).exceptionally(th -> {
            completableFuture.completeExceptionally(th);
            return null;
        });
        return completableFuture;
    }

    private CompletableFuture<List<String>> internalShardlist(long j, String str, String str2) {
        OxiaStub stub = this.stubManager.getStub(this.shardManager.leader(j));
        ListRequest build = ListRequest.newBuilder().setShardId(j).setStartInclusive(str).setEndExclusive(str2).build();
        final CompletableFuture<List<String>> completableFuture = new CompletableFuture<>();
        final ArrayList arrayList = new ArrayList();
        stub.async().list(build, new StreamObserver<ListResponse>() { // from class: io.streamnative.oxia.client.AsyncOxiaClientImpl.1
            @Override // io.grpc.stub.StreamObserver
            public void onNext(ListResponse listResponse) {
                for (int i = 0; i < listResponse.getKeysCount(); i++) {
                    arrayList.add(listResponse.getKeys(i));
                }
            }

            @Override // io.grpc.stub.StreamObserver
            public void onError(Throwable th) {
                completableFuture.completeExceptionally(th);
            }

            @Override // io.grpc.stub.StreamObserver
            public void onCompleted() {
                completableFuture.complete(arrayList);
            }
        });
        return completableFuture;
    }

    @Override // io.streamnative.oxia.client.api.AsyncOxiaClient
    public void rangeScan(@NonNull String str, @NonNull String str2, @NonNull RangeScanConsumer rangeScanConsumer) {
        if (str == null) {
            throw new NullPointerException("startKeyInclusive is marked non-null but is null");
        }
        if (str2 == null) {
            throw new NullPointerException("endKeyExclusive is marked non-null but is null");
        }
        if (rangeScanConsumer == null) {
            throw new NullPointerException("consumer is marked non-null but is null");
        }
        rangeScan(str, str2, rangeScanConsumer, Collections.emptySet());
    }

    @Override // io.streamnative.oxia.client.api.AsyncOxiaClient
    public void rangeScan(@NonNull String str, @NonNull String str2, @NonNull final RangeScanConsumer rangeScanConsumer, @NonNull Set<RangeScanOption> set) {
        if (str == null) {
            throw new NullPointerException("startKeyInclusive is marked non-null but is null");
        }
        if (str2 == null) {
            throw new NullPointerException("endKeyExclusive is marked non-null but is null");
        }
        if (rangeScanConsumer == null) {
            throw new NullPointerException("consumer is marked non-null but is null");
        }
        if (set == null) {
            throw new NullPointerException("options is marked non-null but is null");
        }
        this.gaugePendingRangeScanRequests.increment();
        RangeScanConsumerWithShard rangeScanConsumerWithShard = new RangeScanConsumerWithShard() { // from class: io.streamnative.oxia.client.AsyncOxiaClientImpl.2
            final long startTime = System.nanoTime();
            final AtomicLong totalSize = new AtomicLong();

            @Override // io.streamnative.oxia.client.AsyncOxiaClientImpl.RangeScanConsumerWithShard
            public void onNext(long j, GetResult getResult) {
                this.totalSize.addAndGet(getResult.getValue().length);
                rangeScanConsumer.onNext(getResult);
            }

            @Override // io.streamnative.oxia.client.AsyncOxiaClientImpl.RangeScanConsumerWithShard
            public void onError(Throwable th) {
                AsyncOxiaClientImpl.this.gaugePendingListRequests.decrement();
                AsyncOxiaClientImpl.this.histogramRangeScanLatency.recordFailure(System.nanoTime() - this.startTime);
                rangeScanConsumer.onError(th);
            }

            @Override // io.streamnative.oxia.client.AsyncOxiaClientImpl.RangeScanConsumerWithShard
            public void onCompleted(long j) {
                AsyncOxiaClientImpl.this.gaugePendingRangeScanRequests.decrement();
                AsyncOxiaClientImpl.this.counterRangeScanBytes.add(this.totalSize.longValue());
                AsyncOxiaClientImpl.this.histogramRangeScanLatency.recordSuccess(System.nanoTime() - this.startTime);
                rangeScanConsumer.onCompleted();
            }
        };
        try {
            checkIfClosed();
            Objects.requireNonNull(str);
            Objects.requireNonNull(str2);
            Optional<String> partitionKey = OptionsUtils.getPartitionKey(set);
            if (partitionKey.isPresent()) {
                internalShardRangeScan(this.shardManager.getShardForKey(partitionKey.get()), str, str2, rangeScanConsumerWithShard);
            } else {
                internalRangeScanMultiShards(str, str2, rangeScanConsumerWithShard);
            }
        } catch (Exception e) {
            rangeScanConsumer.onError(e);
        }
    }

    private void internalShardRangeScan(final long j, String str, String str2, final RangeScanConsumerWithShard rangeScanConsumerWithShard) {
        OxiaStub stub = this.stubManager.getStub(this.shardManager.leader(j));
        stub.async().rangeScan(RangeScanRequest.newBuilder().setShardId(j).setStartInclusive(str).setEndExclusive(str2).build(), new StreamObserver<RangeScanResponse>() { // from class: io.streamnative.oxia.client.AsyncOxiaClientImpl.3
            @Override // io.grpc.stub.StreamObserver
            public void onNext(RangeScanResponse rangeScanResponse) {
                for (int i = 0; i < rangeScanResponse.getRecordsCount(); i++) {
                    rangeScanConsumerWithShard.onNext(j, ProtoUtil.getResultFromProto("", rangeScanResponse.getRecords(i)));
                }
            }

            @Override // io.grpc.stub.StreamObserver
            public void onError(Throwable th) {
                rangeScanConsumerWithShard.onError(th);
            }

            @Override // io.grpc.stub.StreamObserver
            public void onCompleted() {
                rangeScanConsumerWithShard.onCompleted(j);
            }
        });
    }

    private void internalRangeScanMultiShards(String str, String str2, final RangeScanConsumerWithShard rangeScanConsumerWithShard) {
        final Set<Long> allShardIds = this.shardManager.allShardIds();
        RangeScanConsumerWithShard rangeScanConsumerWithShard2 = new RangeScanConsumerWithShard() { // from class: io.streamnative.oxia.client.AsyncOxiaClientImpl.4
            private final Set<Long> pendingShards;
            private boolean failed = false;

            {
                this.pendingShards = new HashSet(allShardIds);
            }

            @Override // io.streamnative.oxia.client.AsyncOxiaClientImpl.RangeScanConsumerWithShard
            public synchronized void onNext(long j, GetResult getResult) {
                if (this.failed) {
                    return;
                }
                rangeScanConsumerWithShard.onNext(j, getResult);
            }

            @Override // io.streamnative.oxia.client.AsyncOxiaClientImpl.RangeScanConsumerWithShard
            public synchronized void onError(Throwable th) {
                this.failed = true;
                rangeScanConsumerWithShard.onError(th);
            }

            @Override // io.streamnative.oxia.client.AsyncOxiaClientImpl.RangeScanConsumerWithShard
            public synchronized void onCompleted(long j) {
                if (this.failed) {
                    return;
                }
                this.pendingShards.remove(Long.valueOf(j));
                if (this.pendingShards.isEmpty()) {
                    rangeScanConsumerWithShard.onCompleted(j);
                }
            }
        };
        Iterator<Long> it = allShardIds.iterator();
        while (it.hasNext()) {
            internalShardRangeScan(it.next().longValue(), str, str2, rangeScanConsumerWithShard2);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.readBatchManager.close();
        this.writeBatchManager.close();
        this.sessionManager.close();
        this.notificationManager.close();
        this.shardManager.close();
        this.stubManager.close();
        this.scheduledExecutor.shutdownNow();
    }

    private void checkIfClosed() {
        if (this.closed) {
            throw new IllegalStateException("Client has been closed");
        }
    }
}
