package org.apache.pulsar.reactive.client.internal.api;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.reactive.client.api.MessageGroupingFunction;
import org.apache.pulsar.reactive.client.api.MessageResult;
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
import org.apache.pulsar.reactive.client.api.ReactiveMessagePipeline;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;
import reactor.util.retry.Retry;

/* loaded from: input_file:org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.class */
class DefaultReactiveMessagePipeline<T> implements ReactiveMessagePipeline {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultReactiveMessagePipeline.class);
    private static final String INFLIGHT_LIMITER_CONTEXT_KEY = DefaultReactiveMessagePipelineBuilder.class.getName() + ".INFLIGHT_LIMITER_CONTEXT_KEY";
    private final Mono<Void> pipeline;
    private final Function<Message<T>, Publisher<Void>> messageHandler;
    private final BiConsumer<Message<T>, Throwable> errorLogger;
    private final Retry pipelineRetrySpec;
    private final Duration handlingTimeout;
    private final Function<Flux<Message<T>>, Publisher<MessageResult<Void>>> streamingMessageHandler;
    private final int concurrency;
    private final int maxInflight;
    private final MessageGroupingFunction groupingFunction;
    private final AtomicReference<Disposable> killSwitch = new AtomicReference<>();
    private final AtomicReference<InternalConsumerListenerImpl> consumerListener = new AtomicReference<>();
    private final AtomicReference<CompletableFuture<Void>> pipelineStoppedFuture = new AtomicReference<>();

    /* loaded from: input_file:org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline$InternalConsumerListenerImpl.class */
    private static final class InternalConsumerListenerImpl implements InternalConsumerListener {
        private final CompletableFuture<Void> createdFuture;

        private InternalConsumerListenerImpl() {
            this.createdFuture = new CompletableFuture<>();
        }

        @Override // org.apache.pulsar.reactive.client.internal.api.InternalConsumerListener
        public void onConsumerCreated(Object obj) {
            if (this.createdFuture.isDone()) {
                return;
            }
            this.createdFuture.complete(null);
        }

        Mono<Void> waitForConsumerCreated() {
            return Mono.fromFuture(this.createdFuture, true);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultReactiveMessagePipeline(ReactiveMessageConsumer<T> reactiveMessageConsumer, Function<Message<T>, Publisher<Void>> function, BiConsumer<Message<T>, Throwable> biConsumer, Retry retry, Duration duration, Function<Mono<Void>, Publisher<Void>> function2, Function<Flux<Message<T>>, Publisher<MessageResult<Void>>> function3, MessageGroupingFunction messageGroupingFunction, int i, int i2) {
        this.messageHandler = function;
        this.errorLogger = biConsumer;
        this.pipelineRetrySpec = retry;
        this.handlingTimeout = duration;
        this.streamingMessageHandler = function3;
        this.groupingFunction = messageGroupingFunction;
        this.concurrency = i;
        this.maxInflight = i2;
        this.pipeline = reactiveMessageConsumer.consumeMany(this::createMessageConsumer).then().transform(function2).transform(this::decoratePipeline).doFinally(signalType -> {
            CompletableFuture<Void> completableFuture = this.pipelineStoppedFuture.get();
            if (completableFuture != null) {
                completableFuture.complete(null);
            }
        }).doFirst(() -> {
            this.pipelineStoppedFuture.set(new CompletableFuture<>());
        });
    }

    private Mono<Void> decorateMessageHandler(Mono<Void> mono) {
        if (this.handlingTimeout != null) {
            mono = mono.timeout(this.handlingTimeout);
        }
        if (this.maxInflight > 0) {
            mono = mono.transformDeferredContextual((mono2, contextView) -> {
                return ((InflightLimiter) contextView.get(INFLIGHT_LIMITER_CONTEXT_KEY)).transform(mono2);
            });
        }
        return mono;
    }

    private Mono<Void> decoratePipeline(Mono<Void> mono) {
        if (this.maxInflight > 0) {
            mono = Mono.using(() -> {
                return new InflightLimiter(this.maxInflight);
            }, inflightLimiter -> {
                return mono.contextWrite(Context.of(INFLIGHT_LIMITER_CONTEXT_KEY, inflightLimiter));
            }, (v0) -> {
                v0.dispose();
            });
        }
        return this.pipelineRetrySpec != null ? mono.retryWhen(this.pipelineRetrySpec) : mono;
    }

    private Flux<MessageResult<Void>> createMessageConsumer(Flux<Message<T>> flux) {
        if (this.messageHandler == null) {
            return Flux.from((Publisher) ((Function) Objects.requireNonNull(this.streamingMessageHandler, "streamingMessageHandler or messageHandler must be set")).apply(flux));
        }
        if (this.streamingMessageHandler != null) {
            throw new IllegalStateException("messageHandler and streamingMessageHandler cannot be set at the same time.");
        }
        return this.concurrency > 1 ? this.groupingFunction != null ? GroupOrderedMessageProcessors.processGroupsInOrderConcurrently(flux, this.groupingFunction, this::handleMessage, Schedulers.parallel(), this.concurrency) : flux.flatMap(message -> {
            return handleMessage(message).subscribeOn(Schedulers.parallel());
        }, this.concurrency) : flux.concatMap(this::handleMessage);
    }

    private Mono<MessageResult<Void>> handleMessage(Message<T> message) {
        return Mono.defer(() -> {
            return Mono.from(this.messageHandler.apply(message));
        }).transform(this::decorateMessageHandler).thenReturn(MessageResult.acknowledge(message.getMessageId())).onErrorResume(th -> {
            if (this.errorLogger != null) {
                try {
                    this.errorLogger.accept(message, th);
                } catch (Exception e) {
                    LOG.error("Error in calling error logger", e);
                }
            } else {
                LOG.error("Message handling for message id {} failed.", message.getMessageId(), th);
            }
            return Mono.just(MessageResult.negativeAcknowledge(message.getMessageId()));
        });
    }

    @Override // org.apache.pulsar.reactive.client.api.ReactiveMessagePipeline
    public ReactiveMessagePipeline start() {
        if (this.killSwitch.get() != null) {
            throw new IllegalStateException("Message handler is already running.");
        }
        InternalConsumerListenerImpl internalConsumerListenerImpl = new InternalConsumerListenerImpl();
        Disposable subscribe = this.pipeline.contextWrite(Context.of(InternalConsumerListener.class, internalConsumerListenerImpl)).subscribe((Consumer) null, this::logError, this::logUnexpectedCompletion);
        if (this.killSwitch.compareAndSet(null, subscribe)) {
            this.consumerListener.set(internalConsumerListenerImpl);
            return this;
        }
        subscribe.dispose();
        throw new IllegalStateException("Message handler was already running.");
    }

    @Override // org.apache.pulsar.reactive.client.api.ReactiveMessagePipeline
    public Mono<Void> untilStarted() {
        if (isRunning()) {
            return this.consumerListener.get().waitForConsumerCreated();
        }
        throw new IllegalStateException("Pipeline isn't running. Call start first.");
    }

    private void logError(Throwable th) {
        LOG.error("ReactiveMessageHandler was unexpectedly terminated.", th);
    }

    private void logUnexpectedCompletion() {
        if (isRunning()) {
            LOG.error("ReactiveMessageHandler was unexpectedly completed.");
        }
    }

    @Override // org.apache.pulsar.reactive.client.api.ReactiveMessagePipeline
    public ReactiveMessagePipeline stop() {
        Disposable andSet = this.killSwitch.getAndSet(null);
        if (andSet != null) {
            andSet.dispose();
        }
        return this;
    }

    @Override // org.apache.pulsar.reactive.client.api.ReactiveMessagePipeline
    public Mono<Void> untilStopped() {
        if (isRunning()) {
            throw new IllegalStateException("Pipeline is running. Call stop first.");
        }
        CompletableFuture<Void> completableFuture = this.pipelineStoppedFuture.get();
        return completableFuture != null ? Mono.fromFuture(completableFuture, true) : Mono.empty();
    }

    @Override // org.apache.pulsar.reactive.client.api.ReactiveMessagePipeline
    public boolean isRunning() {
        return this.killSwitch.get() != null;
    }
}
