package io.scalecube.gateway.benchmarks;

import io.scalecube.benchmarks.BenchmarkSettings;
import io.scalecube.benchmarks.metrics.BenchmarkMeter;
import io.scalecube.gateway.clientsdk.ClientMessage;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;

/* loaded from: input_file:io/scalecube/gateway/benchmarks/InfiniteStreamScenario.class */
public final class InfiniteStreamScenario {
    private static final Logger LOGGER = LoggerFactory.getLogger(InfiniteStreamScenario.class);
    public static final String QUALIFIER = "/benchmarks/infiniteStream";
    private static final String RATE_LIMIT = "rateLimit";

    private InfiniteStreamScenario() {
    }

    public static void runWith(String[] strArr, Function<BenchmarkSettings, AbstractBenchmarkState<?>> function) {
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        BenchmarkSettings build = BenchmarkSettings.from(strArr).injectors(availableProcessors).messageRate(1).rampUpDuration(Duration.ofSeconds(availableProcessors)).durationUnit(TimeUnit.MILLISECONDS).build();
        function.apply(build).runWithRampUp((l, abstractBenchmarkState) -> {
            return abstractBenchmarkState.createClient();
        }, abstractBenchmarkState2 -> {
            LatencyHelper latencyHelper = new LatencyHelper(abstractBenchmarkState2);
            BenchmarkMeter meter = abstractBenchmarkState2.meter("meter.client-to-service");
            BenchmarkMeter meter2 = abstractBenchmarkState2.meter("meter.service-to-client");
            Integer rateLimit = rateLimit(build);
            ClientMessage.Builder qualifier = ClientMessage.builder().qualifier(QUALIFIER);
            Optional ofNullable = Optional.ofNullable(rateLimit);
            qualifier.getClass();
            ofNullable.ifPresent(qualifier::rateLimit);
            ClientMessage build2 = qualifier.build();
            return client -> {
                return (l2, benchmarkTask) -> {
                    return Flux.defer(() -> {
                        meter.mark();
                        return client.requestStream(build2, benchmarkTask.scheduler()).doOnNext(clientMessage -> {
                            meter2.mark();
                            latencyHelper.calculate(clientMessage);
                        }).doOnError(th -> {
                            LOGGER.warn("Exception occured on requestStream: " + th);
                        });
                    });
                };
            };
        }, (abstractBenchmarkState3, client) -> {
            return client.close();
        });
    }

    private static Integer rateLimit(BenchmarkSettings benchmarkSettings) {
        return (Integer) Optional.ofNullable(benchmarkSettings.find(RATE_LIMIT, (String) null)).map(Integer::parseInt).orElse(null);
    }
}
