package io.quarkiverse.reactive.messaging.nats.jetstream.client;

import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PushConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PushSubscribeOptionsFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ReaderConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.message.MessageFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.vertx.PushSubscribeMessageConnection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.vertx.ReaderMessageSubscribeConnection;
import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamInstrumenter;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.unchecked.Unchecked;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.Objects;
import java.util.Optional;
import org.jboss.logging.Logger;

@ApplicationScoped
/* loaded from: input_file:io/quarkiverse/reactive/messaging/nats/jetstream/client/ConnectionFactory.class */
public class ConnectionFactory {
    private static final Logger logger = Logger.getLogger(ConnectionFactory.class);
    private final ExecutionHolder executionHolder;
    private final MessageFactory messageFactory;
    private final JetStreamInstrumenter instrumenter;

    @Inject
    public ConnectionFactory(ExecutionHolder executionHolder, MessageFactory messageFactory, JetStreamInstrumenter jetStreamInstrumenter) {
        this.executionHolder = executionHolder;
        this.messageFactory = messageFactory;
        this.instrumenter = jetStreamInstrumenter;
    }

    public <T> Uni<? extends MessageSubscribeConnection> subscribe(ConnectionConfiguration connectionConfiguration, ConnectionListener connectionListener, ReaderConsumerConfiguration<T> readerConsumerConfiguration) {
        return getContext().onFailure().invoke(th -> {
            logger.warn(th.getMessage(), th);
        }).onItem().transformToUni(context -> {
            return subscribe(connectionConfiguration, connectionListener, readerConsumerConfiguration, context);
        });
    }

    public <T> Uni<? extends MessageSubscribeConnection> subscribe(ConnectionConfiguration connectionConfiguration, ConnectionListener connectionListener, PushConsumerConfiguration<T> pushConsumerConfiguration, PushSubscribeOptionsFactory pushSubscribeOptionsFactory) {
        return getContext().onFailure().invoke(th -> {
            logger.warn(th.getMessage(), th);
        }).onItem().transformToUni(context -> {
            return subscribe(connectionConfiguration, connectionListener, pushConsumerConfiguration, pushSubscribeOptionsFactory, context);
        });
    }

    public Uni<? extends AdministrationConnection> administration(ConnectionConfiguration connectionConfiguration, ConnectionListener connectionListener) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            return new io.quarkiverse.reactive.messaging.nats.jetstream.client.administration.AdministrationConnection(connectionConfiguration, connectionListener);
        }));
    }

    public Uni<? extends MessageConnection> message(ConnectionConfiguration connectionConfiguration, ConnectionListener connectionListener) {
        return getContext().onFailure().invoke(th -> {
            logger.warn(th.getMessage(), th);
        }).onItem().transformToUni(context -> {
            return message(connectionConfiguration, connectionListener, context);
        });
    }

    private Optional<Vertx> getVertx() {
        return Optional.ofNullable(this.executionHolder.vertx());
    }

    private <T> Uni<? extends MessageSubscribeConnection> subscribe(ConnectionConfiguration connectionConfiguration, ConnectionListener connectionListener, ReaderConsumerConfiguration<T> readerConsumerConfiguration, Context context) {
        Uni item = Uni.createFrom().item(Unchecked.supplier(() -> {
            return new ReaderMessageSubscribeConnection(connectionConfiguration, connectionListener, context, this.instrumenter, readerConsumerConfiguration, this.messageFactory);
        }));
        Objects.requireNonNull(context);
        return item.emitOn(context::runOnContext);
    }

    private <T> Uni<? extends MessageSubscribeConnection> subscribe(ConnectionConfiguration connectionConfiguration, ConnectionListener connectionListener, PushConsumerConfiguration<T> pushConsumerConfiguration, PushSubscribeOptionsFactory pushSubscribeOptionsFactory, Context context) {
        Uni item = Uni.createFrom().item(Unchecked.supplier(() -> {
            return new PushSubscribeMessageConnection(connectionConfiguration, connectionListener, context, this.instrumenter, pushConsumerConfiguration, this.messageFactory, pushSubscribeOptionsFactory);
        }));
        Objects.requireNonNull(context);
        return item.emitOn(context::runOnContext);
    }

    private Uni<Context> getContext() {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            return (Context) getVertx().map((v0) -> {
                return v0.getOrCreateContext();
            }).orElseThrow(() -> {
                return new ContextException("No Vertx available");
            });
        }));
    }

    private Uni<? extends MessageConnection> message(ConnectionConfiguration connectionConfiguration, ConnectionListener connectionListener, Context context) {
        Uni item = Uni.createFrom().item(Unchecked.supplier(() -> {
            return new io.quarkiverse.reactive.messaging.nats.jetstream.client.vertx.MessageConnection(connectionConfiguration, connectionListener, this.messageFactory, context, this.instrumenter);
        }));
        Objects.requireNonNull(context);
        return item.emitOn(context::runOnContext);
    }
}
