package io.quarkiverse.reactive.messaging.nats.jetstream.client.vertx;

import io.nats.client.JetStreamReader;
import io.nats.client.JetStreamStatusException;
import io.nats.client.JetStreamSubscription;
import io.quarkiverse.reactive.messaging.nats.jetstream.ExponentialBackoff;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionListener;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.JetstreamWorkerThread;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.MessageSubscribeConnection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PullSubscribeOptionsFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ReaderConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.message.MessageFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamInstrumenter;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.Context;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executors;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.logging.Logger;

/* loaded from: input_file:io/quarkiverse/reactive/messaging/nats/jetstream/client/vertx/ReaderMessageSubscribeConnection.class */
public class ReaderMessageSubscribeConnection<K> extends MessageConnection implements MessageSubscribeConnection {
    private static final Logger logger = Logger.getLogger(ReaderMessageSubscribeConnection.class);
    private final ReaderConsumerConfiguration<K> consumerConfiguration;
    private final JetStreamReader reader;
    private final JetStreamSubscription subscription;

    public ReaderMessageSubscribeConnection(ConnectionConfiguration connectionConfiguration, ConnectionListener connectionListener, Context context, JetStreamInstrumenter jetStreamInstrumenter, ReaderConsumerConfiguration<K> readerConsumerConfiguration, MessageFactory messageFactory) throws ConnectionException {
        super(connectionConfiguration, connectionListener, messageFactory, context, jetStreamInstrumenter);
        this.consumerConfiguration = readerConsumerConfiguration;
        try {
            this.subscription = this.connection.jetStream().subscribe(readerConsumerConfiguration.subject(), new PullSubscribeOptionsFactory().create(readerConsumerConfiguration));
            this.reader = this.subscription.reader(readerConsumerConfiguration.maxRequestBatch().intValue(), readerConsumerConfiguration.rePullAt().intValue());
        } catch (Throwable th) {
            throw new ReaderException(th);
        }
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.MessageSubscribeConnection
    public Multi<Message<?>> subscribe() {
        boolean traceEnabled = this.consumerConfiguration.consumerConfiguration().traceEnabled();
        Class<K> orElse = this.consumerConfiguration.consumerConfiguration().payloadType().orElse(null);
        Multi runSubscriptionOn = Multi.createBy().repeating().supplier(this::nextMessage).until(optional -> {
            return !this.subscription.isActive();
        }).runSubscriptionOn(Executors.newSingleThreadExecutor(JetstreamWorkerThread::new));
        Context context = this.context;
        Objects.requireNonNull(context);
        return runSubscriptionOn.emitOn(context::runOnContext).flatMap(optional2 -> {
            return createMulti((io.nats.client.Message) optional2.orElse(null), traceEnabled, orElse, this.context);
        });
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.AbstractConnection, io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection
    public Uni<Void> flush(Duration duration) {
        Uni<Void> flush = super.flush(duration);
        Context context = this.context;
        Objects.requireNonNull(context);
        return flush.emitOn(context::runOnContext);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.AbstractConnection, java.lang.AutoCloseable
    public void close() throws Exception {
        try {
            this.reader.stop();
        } catch (Throwable th) {
            logger.warnf("Failed to stop reader with message %s", th.getMessage());
        }
        try {
            if (this.subscription.isActive()) {
                this.subscription.drain(Duration.ofMillis(1000L));
            }
        } catch (Throwable th2) {
            logger.warnf("Interrupted while draining subscription", new Object[0]);
        }
        try {
            if (this.subscription.isActive()) {
                this.subscription.unsubscribe();
            }
        } catch (Throwable th3) {
            logger.warnf("Failed to unsubscribe subscription with message %s", th3.getMessage());
        }
        super.close();
    }

    private Optional<io.nats.client.Message> nextMessage() {
        try {
            return Optional.ofNullable(this.reader.nextMessage(this.consumerConfiguration.maxRequestExpires().orElse(Duration.ZERO)));
        } catch (JetStreamStatusException e) {
            logger.debugf(e, e.getMessage(), new Object[0]);
            return Optional.empty();
        } catch (IllegalStateException e2) {
            logger.debugf(e2, "The subscription became inactive for stream: %s", this.consumerConfiguration.consumerConfiguration().stream());
            return Optional.empty();
        } catch (InterruptedException e3) {
            logger.debugf(e3, "The reader was interrupted for stream: %s", this.consumerConfiguration.consumerConfiguration().stream());
            return Optional.empty();
        } catch (Throwable th) {
            logger.warnf(th, "Error reading next message from stream: %s", this.consumerConfiguration.consumerConfiguration().stream());
            return Optional.empty();
        }
    }

    private Multi<Message<K>> createMulti(io.nats.client.Message message, boolean z, Class<?> cls, Context context) {
        return (message == null || message.getData() == null) ? Multi.createFrom().empty() : Multi.createFrom().item(() -> {
            return this.messageFactory.create(message, z, cls, context, new ExponentialBackoff(this.consumerConfiguration.consumerConfiguration().exponentialBackoff(), this.consumerConfiguration.consumerConfiguration().exponentialBackoffMaxDuration()), this.consumerConfiguration.consumerConfiguration().ackTimeout());
        });
    }
}
