package io.quarkiverse.reactive.messaging.nats.jetstream.processors.publisher;

import io.nats.client.JetStreamApiException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionEvent;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionListener;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.SubscribeConnection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.SubscribeException;
import io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor;
import io.quarkiverse.reactive.messaging.nats.jetstream.processors.Status;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniOnItem;
import java.io.IOException;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.logging.Logger;

/* loaded from: input_file:io/quarkiverse/reactive/messaging/nats/jetstream/processors/publisher/MessagePublisherProcessor.class */
public abstract class MessagePublisherProcessor<T> implements MessageProcessor, ConnectionListener {
    private static final Logger logger = Logger.getLogger(MessagePublisherProcessor.class);
    private static final int CONSUMER_ALREADY_IN_USE = 10013;
    private final AtomicReference<Status> readiness = new AtomicReference<>(Status.builder().event(ConnectionEvent.Closed).message("Publish processor inactive").healthy(false).build());
    private final AtomicReference<Status> liveness = new AtomicReference<>(Status.builder().event(ConnectionEvent.Closed).message("Publish processor inactive").healthy(false).build());
    private final AtomicReference<SubscribeConnection<T>> connection = new AtomicReference<>();

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor
    public String channel() {
        return configuration().channel();
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor
    public Status readiness() {
        return this.readiness.get();
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor
    public Status liveness() {
        return this.liveness.get();
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.processors.MessageProcessor
    public void close() {
        try {
            SubscribeConnection<T> andSet = this.connection.getAndSet(null);
            if (andSet != null) {
                andSet.close();
            }
        } catch (Throwable th) {
            logger.warnf(th, "Failed to close connection", th);
        }
    }

    public Multi<Message<T>> publisher() {
        return subscribe().onFailure().transform(this::transformFailure).onFailure().retry().withBackOff(configuration().retryBackoff()).indefinitely();
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionListener
    public void onEvent(ConnectionEvent connectionEvent, String str) {
        switch (connectionEvent) {
            case Connected:
                this.readiness.set(Status.builder().event(connectionEvent).message(str).healthy(true).build());
                this.liveness.set(Status.builder().event(connectionEvent).message(str).healthy(true).build());
                return;
            case Closed:
            case CommunicationFailed:
            case Disconnected:
                this.readiness.set(Status.builder().event(connectionEvent).message(str).healthy(false).build());
                return;
            case Reconnected:
                this.readiness.set(Status.builder().event(connectionEvent).message(str).healthy(true).build());
                return;
            default:
                return;
        }
    }

    protected abstract MessagePublisherConfiguration configuration();

    protected abstract Uni<? extends SubscribeConnection<T>> connect();

    private Multi<Message<T>> subscribe() {
        return getOrEstablishConnection().onItem().transformToMulti((v0) -> {
            return v0.subscribe();
        });
    }

    private Uni<SubscribeConnection<T>> getOrEstablishConnection() {
        UniOnItem onItem = Uni.createFrom().item(() -> {
            return (SubscribeConnection) Optional.ofNullable(this.connection.get()).filter((v0) -> {
                return v0.isConnected();
            }).orElse(null);
        }).onItem().ifNull().switchTo(this::connect).onItem();
        AtomicReference<SubscribeConnection<T>> atomicReference = this.connection;
        Objects.requireNonNull(atomicReference);
        return onItem.invoke((v1) -> {
            r1.set(v1);
        });
    }

    private SubscribeException transformFailure(Throwable th) {
        if (isCommunicationFailure(th) && !isConsumerAlreadyInUse(th)) {
            logger.errorf(th, "Failed to publish messages: %s", th.getMessage());
            Optional.ofNullable(this.connection.get()).ifPresent(subscribeConnection -> {
                subscribeConnection.fireEvent(ConnectionEvent.CommunicationFailed, th.getMessage());
            });
            close();
        }
        return new SubscribeException(th);
    }

    private boolean isConsumerAlreadyInUse(Throwable th) {
        return (th instanceof JetStreamApiException) && ((JetStreamApiException) th).getApiErrorCode() == CONSUMER_ALREADY_IN_USE;
    }

    private boolean isCommunicationFailure(Throwable th) {
        return (th instanceof JetStreamApiException) || (th instanceof IOException);
    }
}
