package org.apache.pulsar.reactive.client.internal.adapter;

import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.MessageResult;
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerSpec;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.class */
class AdaptedReactiveMessageConsumer<T> implements ReactiveMessageConsumer<T> {
    private final ReactiveConsumerAdapterFactory reactiveConsumerAdapterFactory;
    private final Schema<T> schema;
    private final ReactiveMessageConsumerSpec consumerSpec;
    private final boolean acknowledgeAsynchronously;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AdaptedReactiveMessageConsumer(ReactiveConsumerAdapterFactory reactiveConsumerAdapterFactory, Schema<T> schema, ReactiveMessageConsumerSpec reactiveMessageConsumerSpec) {
        this.reactiveConsumerAdapterFactory = reactiveConsumerAdapterFactory;
        this.schema = schema;
        this.consumerSpec = reactiveMessageConsumerSpec;
        this.acknowledgeAsynchronously = reactiveMessageConsumerSpec.getAcknowledgeAsynchronously() == null || (reactiveMessageConsumerSpec.getAcknowledgeAsynchronously() != null && reactiveMessageConsumerSpec.getAcknowledgeAsynchronously().booleanValue());
    }

    static <T> Mono<Message<T>> readNextMessage(Consumer<T> consumer) {
        Objects.requireNonNull(consumer);
        return PulsarFutureAdapter.adaptPulsarFuture(consumer::receiveAsync);
    }

    public <R> Mono<R> consumeOne(Function<Message<T>, Publisher<MessageResult<R>>> function) {
        return createReactiveConsumerAdapter().usingConsumer(consumer -> {
            return Mono.using(this::pinAcknowledgeScheduler, scheduler -> {
                return readNextMessage(consumer).flatMap(message -> {
                    return Mono.from((Publisher) function.apply(message));
                }).delayUntil(messageResult -> {
                    return handleAcknowledgement(consumer, messageResult, scheduler);
                }).handle(this::handleMessageResult);
            }, (v0) -> {
                v0.dispose();
            });
        });
    }

    private Scheduler pinAcknowledgeScheduler() {
        return Schedulers.single(this.consumerSpec.getAcknowledgeScheduler() != null ? this.consumerSpec.getAcknowledgeScheduler() : Schedulers.boundedElastic());
    }

    private <R> Mono<?> handleAcknowledgement(Consumer<T> consumer, MessageResult<R> messageResult, Scheduler scheduler) {
        if (messageResult.getMessageId() == null) {
            return Mono.empty();
        }
        Mono<?> subscribeOn = (messageResult.isAcknowledgeMessage() ? Mono.fromFuture(() -> {
            return consumer.acknowledgeAsync(messageResult.getMessageId());
        }) : Mono.fromRunnable(() -> {
            consumer.negativeAcknowledge(messageResult.getMessageId());
        })).subscribeOn(scheduler);
        if (!this.acknowledgeAsynchronously) {
            return subscribeOn;
        }
        Objects.requireNonNull(subscribeOn);
        return Mono.fromRunnable(subscribeOn::subscribe);
    }

    private ReactiveConsumerAdapter<T> createReactiveConsumerAdapter() {
        return this.reactiveConsumerAdapterFactory.create(pulsarClient -> {
            ConsumerBuilder<T> newConsumer = pulsarClient.newConsumer(this.schema);
            configureConsumerBuilder(newConsumer);
            return newConsumer;
        });
    }

    private void configureConsumerBuilder(ConsumerBuilder<T> consumerBuilder) {
        if (this.consumerSpec.getTopicNames() != null && !this.consumerSpec.getTopicNames().isEmpty()) {
            consumerBuilder.topics(this.consumerSpec.getTopicNames());
        }
        if (this.consumerSpec.getTopicsPattern() != null) {
            consumerBuilder.topicsPattern(this.consumerSpec.getTopicsPattern());
        }
        if (this.consumerSpec.getTopicsPatternSubscriptionMode() != null) {
            consumerBuilder.subscriptionTopicsMode(this.consumerSpec.getTopicsPatternSubscriptionMode());
        }
        if (this.consumerSpec.getTopicsPatternAutoDiscoveryPeriod() != null) {
            consumerBuilder.patternAutoDiscoveryPeriod((int) (this.consumerSpec.getTopicsPatternAutoDiscoveryPeriod().toMillis() / 1000), TimeUnit.SECONDS);
        }
        if (this.consumerSpec.getSubscriptionName() != null) {
            consumerBuilder.subscriptionName(this.consumerSpec.getSubscriptionName());
        }
        if (this.consumerSpec.getSubscriptionMode() != null) {
            consumerBuilder.subscriptionMode(this.consumerSpec.getSubscriptionMode());
        }
        if (this.consumerSpec.getSubscriptionType() != null) {
            consumerBuilder.subscriptionType(this.consumerSpec.getSubscriptionType());
        }
        if (this.consumerSpec.getSubscriptionInitialPosition() != null) {
            consumerBuilder.subscriptionInitialPosition(this.consumerSpec.getSubscriptionInitialPosition());
        }
        if (this.consumerSpec.getKeySharedPolicy() != null) {
            consumerBuilder.keySharedPolicy(this.consumerSpec.getKeySharedPolicy());
        }
        if (this.consumerSpec.getReplicateSubscriptionState() != null) {
            consumerBuilder.replicateSubscriptionState(this.consumerSpec.getReplicateSubscriptionState().booleanValue());
        }
        if (this.consumerSpec.getSubscriptionProperties() != null && !this.consumerSpec.getSubscriptionProperties().isEmpty()) {
            consumerBuilder.subscriptionProperties(this.consumerSpec.getSubscriptionProperties());
        }
        if (this.consumerSpec.getConsumerName() != null) {
            consumerBuilder.consumerName(this.consumerSpec.getConsumerName());
        }
        if (this.consumerSpec.getProperties() != null && !this.consumerSpec.getProperties().isEmpty()) {
            consumerBuilder.properties(this.consumerSpec.getProperties());
        }
        if (this.consumerSpec.getPriorityLevel() != null) {
            consumerBuilder.priorityLevel(this.consumerSpec.getPriorityLevel().intValue());
        }
        if (this.consumerSpec.getReadCompacted() != null) {
            consumerBuilder.readCompacted(this.consumerSpec.getReadCompacted().booleanValue());
        }
        if (this.consumerSpec.getBatchIndexAckEnabled() != null) {
            consumerBuilder.enableBatchIndexAcknowledgment(this.consumerSpec.getBatchIndexAckEnabled().booleanValue());
        }
        if (this.consumerSpec.getAckTimeout() != null) {
            consumerBuilder.ackTimeout(this.consumerSpec.getAckTimeout().toMillis(), TimeUnit.MILLISECONDS);
        }
        if (this.consumerSpec.getAckTimeoutTickTime() != null) {
            consumerBuilder.ackTimeoutTickTime(this.consumerSpec.getAckTimeoutTickTime().toMillis(), TimeUnit.MILLISECONDS);
        }
        if (this.consumerSpec.getAcknowledgementsGroupTime() != null) {
            consumerBuilder.acknowledgmentGroupTime(this.consumerSpec.getAcknowledgementsGroupTime().toMillis(), TimeUnit.MILLISECONDS);
        }
        if (this.consumerSpec.getNegativeAckRedeliveryDelay() != null) {
            consumerBuilder.negativeAckRedeliveryDelay(this.consumerSpec.getNegativeAckRedeliveryDelay().toMillis(), TimeUnit.MILLISECONDS);
        }
        if (this.consumerSpec.getAckTimeoutRedeliveryBackoff() != null) {
            consumerBuilder.ackTimeoutRedeliveryBackoff(this.consumerSpec.getAckTimeoutRedeliveryBackoff());
        }
        if (this.consumerSpec.getNegativeAckRedeliveryBackoff() != null) {
            consumerBuilder.negativeAckRedeliveryBackoff(this.consumerSpec.getNegativeAckRedeliveryBackoff());
        }
        if (this.consumerSpec.getDeadLetterPolicy() != null) {
            consumerBuilder.deadLetterPolicy(this.consumerSpec.getDeadLetterPolicy());
        }
        if (this.consumerSpec.getRetryLetterTopicEnable() != null) {
            consumerBuilder.enableRetry(this.consumerSpec.getRetryLetterTopicEnable().booleanValue());
        }
        if (this.consumerSpec.getReceiverQueueSize() != null) {
            consumerBuilder.receiverQueueSize(this.consumerSpec.getReceiverQueueSize().intValue());
        }
        if (this.consumerSpec.getMaxTotalReceiverQueueSizeAcrossPartitions() != null) {
            consumerBuilder.maxTotalReceiverQueueSizeAcrossPartitions(this.consumerSpec.getMaxTotalReceiverQueueSizeAcrossPartitions().intValue());
        }
        if (this.consumerSpec.getAutoUpdatePartitions() != null) {
            consumerBuilder.autoUpdatePartitions(this.consumerSpec.getAutoUpdatePartitions().booleanValue());
        }
        if (this.consumerSpec.getAutoUpdatePartitionsInterval() != null) {
            consumerBuilder.autoUpdatePartitionsInterval((int) (this.consumerSpec.getAutoUpdatePartitionsInterval().toMillis() / 1000), TimeUnit.SECONDS);
        }
        if (this.consumerSpec.getCryptoKeyReader() != null) {
            consumerBuilder.cryptoKeyReader(this.consumerSpec.getCryptoKeyReader());
        }
        if (this.consumerSpec.getCryptoFailureAction() != null) {
            consumerBuilder.cryptoFailureAction(this.consumerSpec.getCryptoFailureAction());
        }
        if (this.consumerSpec.getMaxPendingChunkedMessage() != null) {
            consumerBuilder.maxPendingChunkedMessage(this.consumerSpec.getMaxPendingChunkedMessage().intValue());
        }
        if (this.consumerSpec.getAutoAckOldestChunkedMessageOnQueueFull() != null) {
            consumerBuilder.autoAckOldestChunkedMessageOnQueueFull(this.consumerSpec.getAutoAckOldestChunkedMessageOnQueueFull().booleanValue());
        }
        if (this.consumerSpec.getExpireTimeOfIncompleteChunkedMessage() != null) {
            consumerBuilder.expireTimeOfIncompleteChunkedMessage(this.consumerSpec.getExpireTimeOfIncompleteChunkedMessage().toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    public <R> Flux<R> consumeMany(Function<Flux<Message<T>>, Publisher<MessageResult<R>>> function) {
        return createReactiveConsumerAdapter().usingConsumerMany(consumer -> {
            return Flux.using(this::pinAcknowledgeScheduler, scheduler -> {
                return Flux.from((Publisher) function.apply(readNextMessage(consumer).repeat())).delayUntil(messageResult -> {
                    return handleAcknowledgement(consumer, messageResult, scheduler);
                }).handle(this::handleMessageResult);
            }, (v0) -> {
                v0.dispose();
            });
        });
    }

    public Mono<Void> consumeNothing() {
        return createReactiveConsumerAdapter().usingConsumer(consumer -> {
            return Mono.empty();
        });
    }

    private <R> void handleMessageResult(MessageResult<R> messageResult, SynchronousSink<R> synchronousSink) {
        Object value = messageResult.getValue();
        if (value != null) {
            synchronousSink.next(value);
        }
    }
}
