package com.couchbase.quarkus.extension.runtime.health;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.CoreContext;
import com.couchbase.client.core.Reactor;
import com.couchbase.client.core.config.ConfigVersion;
import com.couchbase.client.core.msg.ResponseStatus;
import com.couchbase.client.core.msg.kv.CarrierGlobalConfigRequest;
import com.couchbase.client.core.retry.BestEffortRetryStrategy;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.topology.ClusterTopology;
import com.couchbase.client.core.topology.NodeIdentifier;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.diagnostics.WaitUntilReadyOptions;
import com.couchbase.quarkus.extension.runtime.CouchbaseConfig;
import io.smallrye.health.api.AsyncHealthCheck;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.converters.uni.UniReactorConverters;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.time.Duration;
import java.util.Objects;
import org.eclipse.microprofile.health.HealthCheckResponse;
import org.eclipse.microprofile.health.HealthCheckResponseBuilder;
import org.eclipse.microprofile.health.Readiness;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@ApplicationScoped
@Readiness
/* loaded from: input_file:com/couchbase/quarkus/extension/runtime/health/CouchbaseReadyCheck.class */
public class CouchbaseReadyCheck implements AsyncHealthCheck {

    @Inject
    Cluster cluster;

    @Inject
    CouchbaseConfig config;

    public Uni<HealthCheckResponse> call() {
        HealthCheckResponseBuilder named = HealthCheckResponse.named("Couchbase Cluster Readiness Check");
        return Uni.createFrom().converter(UniReactorConverters.fromMono(), quarkusWaitUntilReady(this.cluster)).map(r3 -> {
            return named.up().build();
        }).onFailure().recoverWithItem(th -> {
            return named.down().withData("error", th.getMessage()).build();
        });
    }

    public Mono<Void> quarkusWaitUntilReady(Cluster cluster) {
        return cluster.reactive().waitUntilReady(Duration.ofSeconds(this.config.readinessTimeout()), WaitUntilReadyOptions.waitUntilReadyOptions().serviceTypes(new ServiceType[]{ServiceType.KV})).then(Mono.defer(() -> {
            return Flux.fromIterable(((ClusterTopology) Objects.requireNonNull(cluster.core().configurationProvider().config().globalTopology(), "Cluster topology isn't loaded?")).nodes()).filter(hostAndServicePorts -> {
                return hostAndServicePorts.has(ServiceType.KV);
            }).flatMap(hostAndServicePorts2 -> {
                return pingGcccp(cluster.core(), hostAndServicePorts2.id());
            }).then();
        }));
    }

    private Mono<Void> pingGcccp(Core core, NodeIdentifier nodeIdentifier) {
        CoreContext context = core.context();
        return Mono.defer(() -> {
            CarrierGlobalConfigRequest carrierGlobalConfigRequest = new CarrierGlobalConfigRequest(context.environment().timeoutConfig().connectTimeout(), context, BestEffortRetryStrategy.INSTANCE, nodeIdentifier, ConfigVersion.ZERO);
            core.send(carrierGlobalConfigRequest);
            return Reactor.wrap(carrierGlobalConfigRequest, carrierGlobalConfigRequest.response(), true);
        }).flatMap(carrierGlobalConfigResponse -> {
            ResponseStatus status = carrierGlobalConfigResponse.status();
            return (status.success() || status == ResponseStatus.UNSUPPORTED || status == ResponseStatus.NO_BUCKET) ? Mono.empty() : Mono.error(new RuntimeException("Got unexpected response when pinging GCCCP: " + carrierGlobalConfigResponse));
        });
    }
}
