package com.couchbase.client.core.kv;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.Reactor;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.api.kv.CoreKvParamValidators;
import com.couchbase.client.core.config.BucketCapabilities;
import com.couchbase.client.core.config.BucketConfig;
import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.error.AuthenticationFailureException;
import com.couchbase.client.core.error.CollectionNotFoundException;
import com.couchbase.client.core.error.CouchbaseException;
import com.couchbase.client.core.error.FeatureNotAvailableException;
import com.couchbase.client.core.error.InternalServerFailureException;
import com.couchbase.client.core.error.InvalidArgumentException;
import com.couchbase.client.core.error.MutationTokenOutdatedException;
import com.couchbase.client.core.error.RangeScanPartitionFailedException;
import com.couchbase.client.core.error.RequestCanceledException;
import com.couchbase.client.core.error.UnambiguousTimeoutException;
import com.couchbase.client.core.error.context.CancellationErrorContext;
import com.couchbase.client.core.error.context.KeyValueErrorContext;
import com.couchbase.client.core.io.CollectionIdentifier;
import com.couchbase.client.core.msg.CancellationReason;
import com.couchbase.client.core.msg.ResponseStatus;
import com.couchbase.client.core.msg.kv.MutationToken;
import com.couchbase.client.core.msg.kv.RangeScanCancelRequest;
import com.couchbase.client.core.msg.kv.RangeScanContinueRequest;
import com.couchbase.client.core.msg.kv.RangeScanCreateRequest;
import com.couchbase.client.core.util.Validators;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.retry.Retry;

@Stability.Internal
/* loaded from: input_file:com/couchbase/client/core/kv/RangeScanOrchestrator.class */
public class RangeScanOrchestrator {
    public static final int RANGE_SCAN_DEFAULT_BATCH_BYTE_LIMIT = 15000;
    public static final int RANGE_SCAN_DEFAULT_BATCH_ITEM_LIMIT = 50;
    private final Core core;
    private final CollectionIdentifier collectionIdentifier;
    private volatile BucketConfig currentBucketConfig;
    private volatile boolean capabilityEnabled = false;

    public RangeScanOrchestrator(Core core, CollectionIdentifier collectionIdentifier) {
        this.core = (Core) Validators.notNull(core, "Core");
        this.collectionIdentifier = (CollectionIdentifier) Validators.notNull(collectionIdentifier, "CollectionIdentifier");
        core.configurationProvider().configs().subscribe(clusterConfig -> {
            BucketConfig bucketConfig = clusterConfig.bucketConfig(collectionIdentifier.bucket());
            if (bucketConfig != null) {
                this.currentBucketConfig = bucketConfig;
                this.capabilityEnabled = bucketConfig.bucketCapabilities().contains(BucketCapabilities.RANGE_SCAN);
            }
        });
    }

    public Flux<CoreRangeScanItem> rangeScan(CoreRangeScan coreRangeScan, CoreScanOptions coreScanOptions) {
        return Flux.defer(() -> {
            CoreKvParamValidators.validateScanParams(coreRangeScan, coreScanOptions);
            if (this.currentBucketConfig == null) {
                return Mono.delay(Duration.ofMillis(100L), this.core.context().environment().scheduler()).flatMapMany(l -> {
                    return rangeScan(coreRangeScan, coreScanOptions);
                });
            }
            if (!(this.currentBucketConfig instanceof CouchbaseBucketConfig)) {
                return Flux.error(new IllegalStateException("Only Couchbase buckets are supported with KV Range Scan"));
            }
            Map<Short, MutationToken> consistencyMap = coreScanOptions.consistencyMap();
            return streamForPartitions((sh, bArr) -> {
                return RangeScanCreateRequest.forRangeScan(bArr == null ? coreRangeScan.from().id().getBytes(StandardCharsets.UTF_8) : bArr, coreRangeScan, coreScanOptions, sh.shortValue(), this.core.context(), this.collectionIdentifier, consistencyMap);
            }, coreScanOptions);
        });
    }

    public Flux<CoreRangeScanItem> samplingScan(CoreSamplingScan coreSamplingScan, CoreScanOptions coreScanOptions) {
        return Flux.defer(() -> {
            CoreKvParamValidators.validateScanParams(coreSamplingScan, coreScanOptions);
            return Mono.just(0);
        }).thenMany(Flux.defer(() -> {
            if (this.currentBucketConfig == null) {
                return Mono.delay(Duration.ofMillis(100L), this.core.context().environment().scheduler()).flatMapMany(l -> {
                    return samplingScan(coreSamplingScan, coreScanOptions);
                });
            }
            if (!(this.currentBucketConfig instanceof CouchbaseBucketConfig)) {
                return Flux.error(new IllegalStateException("Only Couchbase buckets are supported with KV Range Scan"));
            }
            Map<Short, MutationToken> consistencyMap = coreScanOptions.consistencyMap();
            return streamForPartitions((sh, bArr) -> {
                return RangeScanCreateRequest.forSamplingScan(coreSamplingScan, coreScanOptions, sh.shortValue(), this.core.context(), this.collectionIdentifier, consistencyMap);
            }, coreScanOptions);
        }).take(coreSamplingScan.limit()));
    }

    private Flux<CoreRangeScanItem> streamForPartitions(BiFunction<Short, byte[], RangeScanCreateRequest> biFunction, CoreScanOptions coreScanOptions) {
        if (!this.capabilityEnabled) {
            return Flux.error(FeatureNotAvailableException.rangeScan());
        }
        AtomicLong atomicLong = new AtomicLong();
        int numberOfPartitions = ((CouchbaseBucketConfig) this.currentBucketConfig).numberOfPartitions();
        ArrayList arrayList = new ArrayList(numberOfPartitions);
        short s = 0;
        while (true) {
            short s2 = s;
            if (s2 >= numberOfPartitions) {
                return Flux.concat(arrayList).doOnNext(coreRangeScanItem -> {
                    atomicLong.incrementAndGet();
                }).timeout(coreScanOptions.commonOptions().timeout().orElse(this.core.context().environment().timeoutConfig().kvScanTimeout()), Mono.defer(() -> {
                    return Mono.error(new UnambiguousTimeoutException("RangeScan timed out", new CancellationErrorContext(new RangeScanContext(atomicLong.get()))));
                }));
            }
            arrayList.add(streamForPartition(s2, biFunction, coreScanOptions));
            s = (short) (s2 + 1);
        }
    }

    private Flux<CoreRangeScanItem> streamForPartition(short s, BiFunction<Short, byte[], RangeScanCreateRequest> biFunction, CoreScanOptions coreScanOptions) {
        AtomicReference atomicReference = new AtomicReference();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        return Flux.defer(() -> {
            RangeScanCreateRequest rangeScanCreateRequest = (RangeScanCreateRequest) biFunction.apply(Short.valueOf(s), (byte[]) atomicReference.get());
            this.core.send(rangeScanCreateRequest);
            return Reactor.shieldFromCancellation(Reactor.wrap(rangeScanCreateRequest, rangeScanCreateRequest.response(), true).flatMapMany(rangeScanCreateResponse -> {
                if (rangeScanCreateResponse.status().success()) {
                    return atomicBoolean.get() ? cancel(rangeScanCreateResponse.rangeScanId(), s, coreScanOptions).thenMany(Flux.empty()) : continueScan(s, rangeScanCreateResponse.rangeScanId(), coreScanOptions, atomicBoolean);
                }
                KeyValueErrorContext completedRequest = KeyValueErrorContext.completedRequest(rangeScanCreateRequest, rangeScanCreateResponse);
                switch (rangeScanCreateResponse.status()) {
                    case NOT_FOUND:
                        return Flux.empty();
                    case INTERNAL_SERVER_ERROR:
                        return Flux.error(new InternalServerFailureException(completedRequest));
                    case VBUUID_NOT_EQUAL:
                        return Flux.error(new MutationTokenOutdatedException(completedRequest));
                    default:
                        return Flux.error(new CouchbaseException(rangeScanCreateResponse.toString(), completedRequest));
                }
            }));
        }).doOnNext(coreRangeScanItem -> {
            atomicReference.set(coreRangeScanItem.keyBytes());
        }).retryWhen(Retry.from(flux -> {
            return flux.map(retrySignal -> {
                if ((retrySignal.failure() instanceof RangeScanPartitionFailedException) && ((RangeScanPartitionFailedException) retrySignal.failure()).status() == ResponseStatus.NOT_MY_VBUCKET) {
                    return true;
                }
                throw Exceptions.propagate(retrySignal.failure());
            });
        })).doOnCancel(() -> {
            atomicBoolean.set(true);
        });
    }

    private Flux<CoreRangeScanItem> continueScan(short s, CoreRangeScanId coreRangeScanId, CoreScanOptions coreScanOptions, AtomicBoolean atomicBoolean) {
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        return Flux.defer(() -> {
            RangeScanContinueRequest rangeScanContinueRequest = new RangeScanContinueRequest(coreRangeScanId, Sinks.many().unicast().onBackpressureBuffer(), null, coreScanOptions, s, this.core.context(), this.collectionIdentifier);
            this.core.send(rangeScanContinueRequest);
            return Reactor.wrap(rangeScanContinueRequest, rangeScanContinueRequest.response(), true).flatMapMany(rangeScanContinueResponse -> {
                if (rangeScanContinueResponse.status() == ResponseStatus.SUCCESS || rangeScanContinueResponse.status() == ResponseStatus.COMPLETE || rangeScanContinueResponse.status() == ResponseStatus.CONTINUE) {
                    if (!atomicBoolean.get()) {
                        return rangeScanContinueResponse.items();
                    }
                    atomicBoolean2.set(true);
                    return cancel(coreRangeScanId, s, coreScanOptions).thenMany(Flux.empty());
                }
                KeyValueErrorContext completedRequest = KeyValueErrorContext.completedRequest(rangeScanContinueRequest, rangeScanContinueResponse);
                switch (rangeScanContinueResponse.status()) {
                    case NOT_FOUND:
                        return Flux.error(new CouchbaseException("The range scan internal partition UUID could not be found on the server", completedRequest));
                    case INVALID_REQUEST:
                        return Flux.error(new InvalidArgumentException("The request failed the server-side input validation check.", null, completedRequest));
                    case NO_ACCESS:
                        return Flux.error(new AuthenticationFailureException("The user is no longer authorized to perform this operation", completedRequest, null));
                    case CANCELED:
                        return Flux.error(new RequestCanceledException("The range scan was cancelled.", CancellationReason.OTHER, new CancellationErrorContext(completedRequest)));
                    case NOT_MY_VBUCKET:
                        return Flux.error(new RangeScanPartitionFailedException("Received \"Not My VBucket\" for the continue response", rangeScanContinueResponse.status()));
                    case UNKNOWN_COLLECTION:
                        return Flux.error(new CollectionNotFoundException(rangeScanContinueRequest.collectionIdentifier().collection().orElse("_default"), completedRequest));
                    case SERVER_BUSY:
                        return Flux.error(new CouchbaseException("The range scan for this partition is already streaming on another connection - this is a SDK bug please report.", completedRequest));
                    default:
                        return Flux.error(new CouchbaseException(rangeScanContinueResponse.toString(), completedRequest));
                }
            });
        }).map(coreRangeScanItem -> {
            if (coreRangeScanItem instanceof LastCoreRangeScanItem) {
                atomicBoolean2.set(true);
            }
            return coreRangeScanItem;
        }).repeat(() -> {
            return !atomicBoolean2.get();
        }).filter(coreRangeScanItem2 -> {
            return !(coreRangeScanItem2 instanceof LastCoreRangeScanItem);
        });
    }

    private Mono<Void> cancel(CoreRangeScanId coreRangeScanId, short s, CoreScanOptions coreScanOptions) {
        return Mono.defer(() -> {
            RangeScanCancelRequest rangeScanCancelRequest = new RangeScanCancelRequest(coreRangeScanId, coreScanOptions, s, this.core.context(), this.collectionIdentifier);
            this.core.send(rangeScanCancelRequest);
            return Reactor.wrap(rangeScanCancelRequest, rangeScanCancelRequest.response(), true);
        }).onErrorResume(th -> {
            return Mono.empty();
        }).then();
    }
}
