package org.apache.pulsar.reactive.client.internal.adapter;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.EndOfStreamAction;
import org.apache.pulsar.reactive.client.api.InstantStartAtSpec;
import org.apache.pulsar.reactive.client.api.MessageIdStartAtSpec;
import org.apache.pulsar.reactive.client.api.ReactiveMessageReader;
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderSpec;
import org.apache.pulsar.reactive.client.api.StartAtSpec;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReader.class */
class AdaptedReactiveMessageReader<T> implements ReactiveMessageReader<T> {
    private final Schema<T> schema;
    private final ReactiveMessageReaderSpec readerSpec;
    private final ReactiveReaderAdapterFactory reactiveReaderAdapterFactory;
    private final StartAtSpec startAtSpec;
    private final EndOfStreamAction endOfStreamAction;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AdaptedReactiveMessageReader(ReactiveReaderAdapterFactory reactiveReaderAdapterFactory, Schema<T> schema, ReactiveMessageReaderSpec reactiveMessageReaderSpec, StartAtSpec startAtSpec, EndOfStreamAction endOfStreamAction) {
        this.schema = schema;
        this.readerSpec = reactiveMessageReaderSpec;
        this.reactiveReaderAdapterFactory = reactiveReaderAdapterFactory;
        this.startAtSpec = startAtSpec;
        this.endOfStreamAction = endOfStreamAction;
    }

    static <T> Mono<Message<T>> readNextMessage(Reader<T> reader, EndOfStreamAction endOfStreamAction) {
        Objects.requireNonNull(reader);
        Mono<Message<T>> adaptPulsarFuture = PulsarFutureAdapter.adaptPulsarFuture(reader::readNextAsync);
        if (endOfStreamAction != EndOfStreamAction.COMPLETE) {
            return adaptPulsarFuture;
        }
        Objects.requireNonNull(reader);
        return PulsarFutureAdapter.adaptPulsarFuture(reader::hasMessageAvailableAsync).filter((v0) -> {
            return v0.booleanValue();
        }).flatMap(bool -> {
            return adaptPulsarFuture;
        });
    }

    ReactiveReaderAdapter<T> createReactiveReaderAdapter(StartAtSpec startAtSpec) {
        return this.reactiveReaderAdapterFactory.create(readerStartingAt(startAtSpec));
    }

    private Function<PulsarClient, ReaderBuilder<T>> readerStartingAt(StartAtSpec startAtSpec) {
        return pulsarClient -> {
            ReaderBuilder<T> newReader = pulsarClient.newReader(this.schema);
            if (startAtSpec != null) {
                if (startAtSpec instanceof MessageIdStartAtSpec) {
                    MessageIdStartAtSpec messageIdStartAtSpec = (MessageIdStartAtSpec) startAtSpec;
                    newReader.startMessageId(messageIdStartAtSpec.getMessageId());
                    if (messageIdStartAtSpec.isInclusive()) {
                        newReader.startMessageIdInclusive();
                    }
                } else {
                    long between = ChronoUnit.SECONDS.between(((InstantStartAtSpec) startAtSpec).getInstant(), Instant.now()) + 1;
                    if (between < 0) {
                        throw new IllegalArgumentException("InstantStartAtSpec must be in the past.");
                    }
                    newReader.startMessageFromRollbackDuration(between, TimeUnit.SECONDS);
                }
            }
            configureReaderBuilder(newReader);
            return newReader;
        };
    }

    private void configureReaderBuilder(ReaderBuilder<T> readerBuilder) {
        readerBuilder.topics(this.readerSpec.getTopicNames());
        if (this.readerSpec.getReaderName() != null) {
            readerBuilder.readerName(this.readerSpec.getReaderName());
        }
        if (this.readerSpec.getSubscriptionName() != null) {
            readerBuilder.subscriptionName(this.readerSpec.getSubscriptionName());
        }
        if (this.readerSpec.getGeneratedSubscriptionNamePrefix() != null) {
            readerBuilder.subscriptionRolePrefix(this.readerSpec.getGeneratedSubscriptionNamePrefix());
        }
        if (this.readerSpec.getReceiverQueueSize() != null) {
            readerBuilder.receiverQueueSize(this.readerSpec.getReceiverQueueSize().intValue());
        }
        if (this.readerSpec.getReadCompacted() != null) {
            readerBuilder.readCompacted(this.readerSpec.getReadCompacted().booleanValue());
        }
        if (this.readerSpec.getKeyHashRanges() != null && !this.readerSpec.getKeyHashRanges().isEmpty()) {
            readerBuilder.keyHashRange((Range[]) this.readerSpec.getKeyHashRanges().toArray(new Range[0]));
        }
        if (this.readerSpec.getCryptoKeyReader() != null) {
            readerBuilder.cryptoKeyReader(this.readerSpec.getCryptoKeyReader());
        }
        if (this.readerSpec.getCryptoFailureAction() != null) {
            readerBuilder.cryptoFailureAction(this.readerSpec.getCryptoFailureAction());
        }
    }

    public Mono<Message<T>> readOne() {
        return (Mono<Message<T>>) createReactiveReaderAdapter(this.startAtSpec).usingReader(reader -> {
            return readNextMessage(reader, this.endOfStreamAction);
        });
    }

    public Flux<Message<T>> readMany() {
        return (Flux<Message<T>>) createReactiveReaderAdapter(this.startAtSpec).usingReaderMany(reader -> {
            Mono readNextMessage = readNextMessage(reader, this.endOfStreamAction);
            return this.endOfStreamAction == EndOfStreamAction.COMPLETE ? readNextMessage.repeatWhen(flux -> {
                return flux.takeWhile(l -> {
                    return l.longValue() > 0;
                });
            }) : readNextMessage.repeat();
        });
    }
}
