package io.quarkiverse.reactive.messaging.nats.jetstream.client;

import io.nats.client.ConsumerContext;
import io.nats.client.JetStreamStatusException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PullConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.Tracer;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.TracerFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.tracing.TracerType;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.MessageMapper;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.Context;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executors;
import lombok.Generated;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.logging.Logger;

/* loaded from: input_file:io/quarkiverse/reactive/messaging/nats/jetstream/client/PullSubscription.class */
public class PullSubscription<T> implements Subscription<T> {

    @Generated
    private static final Logger log = Logger.getLogger(PullSubscription.class);
    private final PullConsumerConfiguration<T> consumerConfiguration;
    private final ConsumerContext consumerContext;
    private final MessageMapper messageMapper;
    private final TracerFactory tracerFactory;
    private final Context context;

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Subscription
    public Multi<Message<T>> subscribe() {
        Class<T> orElse = this.consumerConfiguration.consumerConfiguration().payloadType().orElse(null);
        Tracer<T> create = this.tracerFactory.create(TracerType.Subscribe);
        Multi runSubscriptionOn = Multi.createBy().repeating().uni(this::readNextMessage).whilst(optional -> {
            return true;
        }).runSubscriptionOn(Executors.newSingleThreadExecutor(JetstreamWorkerThread::new));
        Context context = this.context;
        Objects.requireNonNull(context);
        return runSubscriptionOn.emitOn(context::runOnContext).flatMap(optional2 -> {
            return createMulti((io.nats.client.Message) optional2.orElse(null), orElse, this.context);
        }).onItem().transformToUniAndMerge(message -> {
            return create.withTrace(message, message -> {
                return message;
            });
        });
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.Subscription
    public void close() {
    }

    private Uni<Optional<io.nats.client.Message>> readNextMessage() {
        return Uni.createFrom().emitter(uniEmitter -> {
            try {
                Duration maxExpires = this.consumerConfiguration.maxExpires();
                if (maxExpires != null) {
                    uniEmitter.complete(Optional.ofNullable(this.consumerContext.next(maxExpires)));
                } else {
                    uniEmitter.complete(Optional.ofNullable(this.consumerContext.next()));
                }
            } catch (JetStreamStatusException e) {
                uniEmitter.fail(new PullException(e));
            } catch (IllegalStateException e2) {
                uniEmitter.complete(Optional.empty());
            } catch (InterruptedException e3) {
                uniEmitter.fail(new PullException(String.format("The reader was interrupted for stream: %s", this.consumerConfiguration.consumerConfiguration().stream()), e3));
            } catch (Exception e4) {
                uniEmitter.fail(new PullException(String.format("Error reading next message from stream: %s", this.consumerConfiguration.consumerConfiguration().stream()), e4));
            }
        });
    }

    private Multi<Message<T>> createMulti(io.nats.client.Message message, Class<T> cls, Context context) {
        return (message == null || message.getData() == null) ? Multi.createFrom().empty() : Multi.createFrom().item(() -> {
            return this.messageMapper.of(message, cls, context);
        });
    }

    @Generated
    public PullSubscription(PullConsumerConfiguration<T> pullConsumerConfiguration, ConsumerContext consumerContext, MessageMapper messageMapper, TracerFactory tracerFactory, Context context) {
        this.consumerConfiguration = pullConsumerConfiguration;
        this.consumerContext = consumerContext;
        this.messageMapper = messageMapper;
        this.tracerFactory = tracerFactory;
        this.context = context;
    }
}
