package pekko.contrib.persistence.mongodb.driver;

import com.mongodb.CursorType;
import org.apache.pekko.NotUsed;
import org.apache.pekko.persistence.query.Offset;
import org.apache.pekko.stream.KillSwitches$;
import org.apache.pekko.stream.SharedKillSwitch;
import org.apache.pekko.stream.scaladsl.Source;
import org.apache.pekko.stream.scaladsl.Source$;
import org.bson.BsonDocument;
import org.bson.BsonObjectId;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import org.mongodb.scala.FindObservable;
import org.mongodb.scala.bson.DefaultHelper$DefaultsTo$;
import org.mongodb.scala.model.Filters$;
import pekko.contrib.persistence.mongodb.Event;
import pekko.contrib.persistence.mongodb.JournalStream;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableOnceOps;
import scala.collection.IterableOps;
import scala.collection.JavaConverters$;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.package;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;
import scala.runtime.Statics;

/* compiled from: ScalaDriverPersistenceReadJournaller.scala */
@ScalaSignature(bytes = "\u0006\u0005}4A\u0001C\u0005\u0001)!A!\u0002\u0001B\u0001B\u0003%Q\bC\u0003B\u0001\u0011\u0005!\tC\u0004F\u0001\t\u0007I1\u0001$\t\r5\u0003\u0001\u0015!\u0003H\u0011\u001dq\u0005A1A\u0005\n=Ca!\u001c\u0001!\u0002\u0013\u0001\u0006\"\u00028\u0001\t\u0003y'\u0001G*dC2\fGI]5wKJTu.\u001e:oC2\u001cFO]3b[*\u0011!bC\u0001\u0007IJLg/\u001a:\u000b\u00051i\u0011aB7p]\u001e|GM\u0019\u0006\u0003\u001d=\t1\u0002]3sg&\u001cH/\u001a8dK*\u0011\u0001#E\u0001\bG>tGO]5c\u0015\u0005\u0011\u0012!\u00029fW.|7\u0001A\n\u0004\u0001UY\u0002C\u0001\f\u001a\u001b\u00059\"\"\u0001\r\u0002\u000bM\u001c\u0017\r\\1\n\u0005i9\"AB!osJ+g\rE\u0002\u001d;}i\u0011aC\u0005\u0003=-\u0011QBS8ve:\fGn\u0015;sK\u0006l\u0007\u0003\u0002\u0011+Yej\u0011!\t\u0006\u0003E\r\n\u0001b]2bY\u0006$7\u000f\u001c\u0006\u0003I\u0015\naa\u001d;sK\u0006l'B\u0001\n'\u0015\t9\u0003&\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002S\u0005\u0019qN]4\n\u0005-\n#AB*pkJ\u001cW\r\u0005\u0003\u0017[=\u0012\u0014B\u0001\u0018\u0018\u0005\u0019!V\u000f\u001d7feA\u0011A\u0004M\u0005\u0003c-\u0011Q!\u0012<f]R\u0004\"aM\u001c\u000e\u0003QR!!\u000e\u001c\u0002\u000bE,XM]=\u000b\u00059)\u0013B\u0001\u001d5\u0005\u0019yeMZ:fiB\u0011!hO\u0007\u0002K%\u0011A(\n\u0002\b\u001d>$Xk]3e!\tqt(D\u0001\n\u0013\t\u0001\u0015B\u0001\tTG\u0006d\u0017-T8oO>$%/\u001b<fe\u00061A(\u001b8jiz\"\"a\u0011#\u0011\u0005y\u0002\u0001\"\u0002\u0006\u0003\u0001\u0004i\u0014AA3d+\u00059\u0005C\u0001%L\u001b\u0005I%B\u0001&\u0018\u0003)\u0019wN\\2veJ,g\u000e^\u0005\u0003\u0019&\u0013\u0001#\u0012=fGV$\u0018n\u001c8D_:$X\r\u001f;\u0002\u0007\u0015\u001c\u0007%A\u0007dkJ\u001cxN\u001d\"vS2$WM]\u000b\u0002!B!a#U*T\u0013\t\u0011vCA\u0005Gk:\u001cG/[8ocA\u0019AkV-\u000e\u0003US!\u0001\u0007,\u000b\u00051A\u0013B\u0001-V\u000591\u0015N\u001c3PEN,'O^1cY\u0016\u0004\"A\u00176\u000f\u0005m;gB\u0001/f\u001d\tiFM\u0004\u0002_G:\u0011qLY\u0007\u0002A*\u0011\u0011mE\u0001\u0007yI|w\u000e\u001e \n\u0003%J!\u0001\u0004\u0015\n\u0005a1\u0016B\u00014V\u0003\u0011\u00117o\u001c8\n\u0005!L\u0017a\u00029bG.\fw-\u001a\u0006\u0003MVK!a\u001b7\u0003\u0019\t\u001bxN\u001c#pGVlWM\u001c;\u000b\u0005!L\u0017AD2veN|'OQ;jY\u0012,'\u000fI\u0001\u0007GV\u00148o\u001c:\u0015\u0005}\u0001\b\"B\u001b\b\u0001\u0004\t\bc\u0001\fsi&\u00111o\u0006\u0002\u0007\u001fB$\u0018n\u001c8\u0011\u0005UdhB\u0001<{\u001d\t9\b0D\u0001j\u0013\tI\u0018.A\u0006d_:4XM]:j_:\u001c\u0018B\u00015|\u0015\tI\u0018.\u0003\u0002~}\n!!i]8o\u0015\tA7\u0010")
/* loaded from: input_file:pekko/contrib/persistence/mongodb/driver/ScalaDriverJournalStream.class */
public class ScalaDriverJournalStream implements JournalStream<Source<Tuple2<Event, Offset>, NotUsed>> {
    public final ScalaMongoDriver pekko$contrib$persistence$mongodb$driver$ScalaDriverJournalStream$$driver;
    private final ExecutionContext ec;
    private final Function1<FindObservable<BsonDocument>, FindObservable<BsonDocument>> cursorBuilder;
    private SharedKillSwitch killSwitch;

    @Override // pekko.contrib.persistence.mongodb.JournalStream
    public void stopAllStreams() {
        stopAllStreams();
    }

    @Override // pekko.contrib.persistence.mongodb.JournalStream
    public SharedKillSwitch killSwitch() {
        return this.killSwitch;
    }

    @Override // pekko.contrib.persistence.mongodb.JournalStream
    public void pekko$contrib$persistence$mongodb$JournalStream$_setter_$killSwitch_$eq(SharedKillSwitch sharedKillSwitch) {
        this.killSwitch = sharedKillSwitch;
    }

    public ExecutionContext ec() {
        return this.ec;
    }

    private Function1<FindObservable<BsonDocument>, FindObservable<BsonDocument>> cursorBuilder() {
        return this.cursorBuilder;
    }

    public Source<Tuple2<Event, Offset>, NotUsed> cursor(Option<Bson> option) {
        return this.pekko$contrib$persistence$mongodb$driver$ScalaDriverJournalStream$$driver.realtimeEnablePersistence() ? Source$.MODULE$.future(this.pekko$contrib$persistence$mongodb$driver$ScalaDriverJournalStream$$driver.realtime()).flatMapConcat(mongoCollection -> {
            return Source$.MODULE$.fromGraph(new ScalaDriverRealtimeGraphStage(this.pekko$contrib$persistence$mongodb$driver$ScalaDriverJournalStream$$driver, ScalaDriverRealtimeGraphStage$.MODULE$.$lessinit$greater$default$2(), option2 -> {
                Tuple2 tuple2 = new Tuple2(option, option2);
                if (tuple2 != null) {
                    Some some = (Option) tuple2._1();
                    Option option2 = (Option) tuple2._2();
                    if (some instanceof Some) {
                        Bson bson = (Bson) some.value();
                        if (None$.MODULE$.equals(option2)) {
                            return (FindObservable) this.cursorBuilder().apply(mongoCollection.find(bson, DefaultHelper$DefaultsTo$.MODULE$.default(), ClassTag$.MODULE$.apply(BsonDocument.class)));
                        }
                    }
                }
                if (tuple2 != null) {
                    Some some2 = (Option) tuple2._1();
                    Some some3 = (Option) tuple2._2();
                    if (some2 instanceof Some) {
                        Bson bson2 = (Bson) some2.value();
                        if (some3 instanceof Some) {
                            return (FindObservable) this.cursorBuilder().apply(mongoCollection.find(Filters$.MODULE$.and(ScalaRunTime$.MODULE$.wrapRefArray(new Bson[]{bson2, Filters$.MODULE$.gte("_id", (BsonObjectId) some3.value())})), DefaultHelper$DefaultsTo$.MODULE$.default(), ClassTag$.MODULE$.apply(BsonDocument.class)));
                        }
                    }
                }
                if (tuple2 != null) {
                    Option option3 = (Option) tuple2._1();
                    Option option4 = (Option) tuple2._2();
                    if (None$.MODULE$.equals(option3) && None$.MODULE$.equals(option4)) {
                        return (FindObservable) this.cursorBuilder().apply(mongoCollection.find(DefaultHelper$DefaultsTo$.MODULE$.default(), ClassTag$.MODULE$.apply(BsonDocument.class)));
                    }
                }
                if (tuple2 != null) {
                    Option option5 = (Option) tuple2._1();
                    Some some4 = (Option) tuple2._2();
                    if (None$.MODULE$.equals(option5) && (some4 instanceof Some)) {
                        return (FindObservable) this.cursorBuilder().apply(mongoCollection.find(Filters$.MODULE$.gte("_id", (BsonObjectId) some4.value()), DefaultHelper$DefaultsTo$.MODULE$.default(), ClassTag$.MODULE$.apply(BsonDocument.class)));
                    }
                }
                throw new MatchError(tuple2);
            }).named("rt-graph-stage").async()).via(this.killSwitch().flow()).mapConcat(bsonDocument -> {
                ObjectId value = bsonDocument.getObjectId("_id").getValue();
                return (List) Option$.MODULE$.apply(bsonDocument.get("events")).filter(bsonValue -> {
                    return BoxesRunTime.boxToBoolean(bsonValue.isArray());
                }).map(bsonValue2 -> {
                    return bsonValue2.asArray();
                }).map(bsonArray -> {
                    return ((IterableOnceOps) ((IterableOps) JavaConverters$.MODULE$.asScalaBufferConverter(bsonArray.getValues()).asScala()).collect(new ScalaDriverJournalStream$$anonfun$$nestedInanonfun$cursor$6$1(this, value))).toList();
                }).getOrElse(() -> {
                    return Nil$.MODULE$;
                });
            });
        }).named("rt-cursor-source") : Source$.MODULE$.empty();
    }

    public ScalaDriverJournalStream(ScalaMongoDriver scalaMongoDriver) {
        this.pekko$contrib$persistence$mongodb$driver$ScalaDriverJournalStream$$driver = scalaMongoDriver;
        pekko$contrib$persistence$mongodb$JournalStream$_setter_$killSwitch_$eq(KillSwitches$.MODULE$.shared("realtimeKillSwitch"));
        this.ec = scalaMongoDriver.querySideDispatcher();
        this.cursorBuilder = findObservable -> {
            return findObservable.cursorType(CursorType.TailableAwait).maxAwaitTime(new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(30)).seconds());
        };
        Statics.releaseFence();
    }
}
