package org.apache.james.backends.pulsar;

import akka.Done$;
import akka.NotUsed;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Source$;
import com.sksamuel.pulsar4s.AsyncHandler$;
import com.sksamuel.pulsar4s.ConsumerMessage;
import com.sksamuel.pulsar4s.Message;
import com.sksamuel.pulsar4s.MessageId$;
import com.sksamuel.pulsar4s.PulsarClient;
import com.sksamuel.pulsar4s.ReaderConfig;
import com.sksamuel.pulsar4s.ReaderConfig$;
import com.sksamuel.pulsar4s.SequenceId;
import com.sksamuel.pulsar4s.Topic;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.runtime.BoxesRunTime;

/* compiled from: PulsarReader.scala */
/* loaded from: input_file:org/apache/james/backends/pulsar/PulsarReader$.class */
public final class PulsarReader$ {
    public static final PulsarReader$ MODULE$ = new PulsarReader$();

    public Source<ConsumerMessage<String>, NotUsed> forTopic(Topic topic, Option<SequenceId> option, PulsarClient pulsarClient, ExecutionContext executionContext) {
        return Source$.MODULE$.unfoldResourceAsync(() -> {
            return Future$.MODULE$.successful(pulsarClient.reader(new ReaderConfig(topic, ReaderConfig$.MODULE$.apply$default$2(), new Message(MessageId$.MODULE$.earliest()), ReaderConfig$.MODULE$.apply$default$4(), ReaderConfig$.MODULE$.apply$default$5(), ReaderConfig$.MODULE$.apply$default$6(), ReaderConfig$.MODULE$.apply$default$7(), ReaderConfig$.MODULE$.apply$default$8()), PulsarReader$schemas$.MODULE$.schema()));
        }, reader -> {
            return reader.hasMessageAvailable() ? ((Future) reader.nextAsync(AsyncHandler$.MODULE$.handler(executionContext))).map(consumerMessage -> {
                return new Some(consumerMessage);
            }, executionContext) : Future$.MODULE$.successful(None$.MODULE$);
        }, reader2 -> {
            return ((Future) reader2.closeAsync(AsyncHandler$.MODULE$.handler(executionContext))).map(boxedUnit -> {
                return Done$.MODULE$;
            }, executionContext);
        }).takeWhile(consumerMessage -> {
            return BoxesRunTime.boxToBoolean($anonfun$forTopic$6(option, consumerMessage));
        }, true);
    }

    public Option<SequenceId> forTopic$default$2() {
        return None$.MODULE$;
    }

    public static final /* synthetic */ boolean $anonfun$forTopic$7(ConsumerMessage consumerMessage, SequenceId sequenceId) {
        return consumerMessage.sequenceId().value() < sequenceId.value();
    }

    public static final /* synthetic */ boolean $anonfun$forTopic$6(Option option, ConsumerMessage consumerMessage) {
        return option.forall(sequenceId -> {
            return BoxesRunTime.boxToBoolean($anonfun$forTopic$7(consumerMessage, sequenceId));
        });
    }

    private PulsarReader$() {
    }
}
