package akka.persistence.r2dbc.query;

import akka.NotUsed;
import akka.persistence.r2dbc.journal.JournalEntry;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Source$;
import java.util.concurrent.atomic.AtomicBoolean;
import scala.Option;
import scala.Predef$;
import scala.collection.StringOps$;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: EventsByPersistenceIdStage.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005msAB\u000e\u001d\u0011\u0003aBE\u0002\u0004'9!\u0005Ad\n\u0005\u0006]\u0005!\t\u0001\r\u0005\u0006c\u0005!\tA\r\u0005\n\u0003\u0003\n\u0011\u0013!C\u0001\u0003\u0007B\u0011\"!\u0017\u0002#\u0003%I!a\u0011\u0007\u000b\u0019b\"\u0001\b\u001b\t\u0011a2!\u0011!Q\u0001\neB\u0001\u0002\u0010\u0004\u0003\u0002\u0003\u0006I!\u0010\u0005\t\u0011\u001a\u0011\t\u0011)A\u0005\u0013\"AAJ\u0002B\u0001B\u0003%\u0011\n\u0003\u0005N\r\t\u0015\r\u0011\"\u0001O\u0011!QfA!A!\u0002\u0013y\u0005\"\u0002\u0018\u0007\t\u0013Y\u0006bB1\u0007\u0005\u0004%)B\u0019\u0005\u0007]\u001a\u0001\u000bQB2\t\u000f=4\u0001\u0019!C\u0005a\"9\u0011O\u0002a\u0001\n\u0013\u0011\bB\u0002=\u0007A\u0003&\u0011\nC\u0004z\r\u0001\u0007I\u0011\u00029\t\u000fi4\u0001\u0019!C\u0005w\"1QP\u0002Q!\n%CqA \u0004A\u0002\u0013%\u0001\u000f\u0003\u0005��\r\u0001\u0007I\u0011BA\u0001\u0011\u001d\t)A\u0002Q!\n%Cq!a\u0002\u0007\t#\nI\u0001C\u0004\u0002\u001c\u0019!\t&!\b\u00025\u00153XM\u001c;t\u0005f\u0004VM]:jgR,gnY3JIN#\u0018mZ3\u000b\u0005uq\u0012!B9vKJL(BA\u0010!\u0003\u0015\u0011(\u0007\u001a2d\u0015\t\t#%A\u0006qKJ\u001c\u0018n\u001d;f]\u000e,'\"A\u0012\u0002\t\u0005\\7.\u0019\t\u0003K\u0005i\u0011\u0001\b\u0002\u001b\u000bZ,g\u000e^:CsB+'o]5ti\u0016t7-Z%e'R\fw-Z\n\u0003\u0003!\u0002\"!\u000b\u0017\u000e\u0003)R\u0011aK\u0001\u0006g\u000e\fG.Y\u0005\u0003[)\u0012a!\u00118z%\u00164\u0017A\u0002\u001fj]&$hh\u0001\u0001\u0015\u0003\u0011\nQ!\u00199qYf$2bMA\u001c\u0003s\tY$!\u0010\u0002@A\u0011QEB\n\u0003\rU\u0002\"!\n\u001c\n\u0005]b\"!D#wK:$8OQ=Ti\u0006<W-A\u0002eC>\u0004\"!\n\u001e\n\u0005mb\"\u0001C)vKJLH)Y8\u0002\u001bA,'o]5ti\u0016t7-Z%e!\tqTI\u0004\u0002@\u0007B\u0011\u0001IK\u0007\u0002\u0003*\u0011!iL\u0001\u0007yI|w\u000e\u001e \n\u0005\u0011S\u0013A\u0002)sK\u0012,g-\u0003\u0002G\u000f\n11\u000b\u001e:j]\u001eT!\u0001\u0012\u0016\u0002\u0013\u0019\u0014x.\\*fc:\u0013\bCA\u0015K\u0013\tY%F\u0001\u0003M_:<\u0017a\u0002;p'\u0016\fhJ]\u0001\u0010e\u00164'/Z:i\u0013:$XM\u001d<bYV\tq\nE\u0002*!JK!!\u0015\u0016\u0003\r=\u0003H/[8o!\t\u0019\u0006,D\u0001U\u0015\t)f+\u0001\u0005ekJ\fG/[8o\u0015\t9&&\u0001\u0006d_:\u001cWO\u001d:f]RL!!\u0017+\u0003\u001d\u0019Kg.\u001b;f\tV\u0014\u0018\r^5p]\u0006\u0001\"/\u001a4sKND\u0017J\u001c;feZ\fG\u000e\t\u000b\u0007gqkfl\u00181\t\u000baj\u0001\u0019A\u001d\t\u000bqj\u0001\u0019A\u001f\t\u000b!k\u0001\u0019A%\t\u000b1k\u0001\u0019A%\t\u000f5k\u0001\u0013!a\u0001\u001f\u0006q1m\\7qY\u0016$XmU<ji\u000eDW#A2\u0011\u0005\u0011dW\"A3\u000b\u0005\u0019<\u0017AB1u_6L7M\u0003\u0002XQ*\u0011\u0011N[\u0001\u0005kRLGNC\u0001l\u0003\u0011Q\u0017M^1\n\u00055,'!D!u_6L7MQ8pY\u0016\fg.A\bd_6\u0004H.\u001a;f'^LGo\u00195!\u0003A\u0001(o\\2fgN,G-\u00128ue&,7/F\u0001J\u0003Q\u0001(o\\2fgN,G-\u00128ue&,7o\u0018\u0013fcR\u00111O\u001e\t\u0003SQL!!\u001e\u0016\u0003\tUs\u0017\u000e\u001e\u0005\boF\t\t\u00111\u0001J\u0003\rAH%M\u0001\u0012aJ|7-Z:tK\u0012,e\u000e\u001e:jKN\u0004\u0013AC2veJ,g\u000e^*fc\u0006q1-\u001e:sK:$8+Z9`I\u0015\fHCA:}\u0011\u001d9H#!AA\u0002%\u000b1bY;se\u0016tGoU3rA\u0005IA/\u0019:hKR\u001cV-]\u0001\u000ei\u0006\u0014x-\u001a;TKF|F%Z9\u0015\u0007M\f\u0019\u0001C\u0004x/\u0005\u0005\t\u0019A%\u0002\u0015Q\f'oZ3u'\u0016\f\b%A\u0006qkNDW\rZ#oiJLHcA:\u0002\f!9\u0011QB\rA\u0002\u0005=\u0011!B3oiJL\b\u0003BA\t\u0003/i!!a\u0005\u000b\u0007\u0005Ua$A\u0004k_V\u0014h.\u00197\n\t\u0005e\u00111\u0003\u0002\r\u0015>,(O\\1m\u000b:$(/_\u0001\fM\u0016$8\r[#wK:$8\u000f\u0006\u0002\u0002 AA\u0011\u0011EA\u0016\u0003\u001f\ty#\u0004\u0002\u0002$)!\u0011QEA\u0014\u0003!\u00198-\u00197bINd'bAA\u0015E\u000511\u000f\u001e:fC6LA!!\f\u0002$\t11k\\;sG\u0016\u0004B!!\r\u000245\t!%C\u0002\u00026\t\u0012qAT8u+N,G\rC\u00039\u0007\u0001\u0007\u0011\bC\u0003=\u0007\u0001\u0007Q\bC\u0003I\u0007\u0001\u0007\u0011\nC\u0003M\u0007\u0001\u0007\u0011\nC\u0004N\u0007A\u0005\t\u0019A(\u0002\u001f\u0005\u0004\b\u000f\\=%I\u00164\u0017-\u001e7uIU*\"!!\u0012+\u0007=\u000b9e\u000b\u0002\u0002JA!\u00111JA+\u001b\t\tiE\u0003\u0003\u0002P\u0005E\u0013!C;oG\",7m[3e\u0015\r\t\u0019FK\u0001\u000bC:tw\u000e^1uS>t\u0017\u0002BA,\u0003\u001b\u0012\u0011#\u001e8dQ\u0016\u001c7.\u001a3WCJL\u0017M\\2f\u0003m!C.Z:tS:LG\u000fJ4sK\u0006$XM\u001d\u0013eK\u001a\fW\u000f\u001c;%k\u0001")
/* loaded from: input_file:akka/persistence/r2dbc/query/EventsByPersistenceIdStage.class */
public final class EventsByPersistenceIdStage extends EventsByStage {
    private final QueryDao dao;
    private final String persistenceId;
    private final long toSeqNr;
    private final Option<FiniteDuration> refreshInterval;
    private final AtomicBoolean completeSwitch;
    private long processedEntries;
    private long currentSeq;
    private long targetSeq;

    public static EventsByPersistenceIdStage apply(QueryDao queryDao, String str, long j, long j2, Option<FiniteDuration> option) {
        return EventsByPersistenceIdStage$.MODULE$.apply(queryDao, str, j, j2, option);
    }

    @Override // akka.persistence.r2dbc.query.EventsByStage
    public Option<FiniteDuration> refreshInterval() {
        return this.refreshInterval;
    }

    @Override // akka.persistence.r2dbc.query.EventsByStage
    public final AtomicBoolean completeSwitch() {
        return this.completeSwitch;
    }

    private long processedEntries() {
        return this.processedEntries;
    }

    private void processedEntries_$eq(long j) {
        this.processedEntries = j;
    }

    private long currentSeq() {
        return this.currentSeq;
    }

    private void currentSeq_$eq(long j) {
        this.currentSeq = j;
    }

    private long targetSeq() {
        return this.targetSeq;
    }

    private void targetSeq_$eq(long j) {
        this.targetSeq = j;
    }

    @Override // akka.persistence.r2dbc.query.EventsByStage
    public void pushedEntry(JournalEntry journalEntry) {
        processedEntries_$eq(processedEntries() + 1);
        currentSeq_$eq(journalEntry.sequenceNr());
    }

    @Override // akka.persistence.r2dbc.query.EventsByStage
    public Source<JournalEntry, NotUsed> fetchEvents() {
        return this.dao.findHighestSeq(this.persistenceId).flatMapConcat(obj -> {
            return $anonfun$fetchEvents$1(this, BoxesRunTime.unboxToLong(obj));
        });
    }

    public static final /* synthetic */ Source $anonfun$fetchEvents$1(EventsByPersistenceIdStage eventsByPersistenceIdStage, long j) {
        long j2;
        if (eventsByPersistenceIdStage.targetSeq() == j) {
            return Source$.MODULE$.empty();
        }
        if (j >= eventsByPersistenceIdStage.toSeqNr) {
            eventsByPersistenceIdStage.completeSwitch().set(true);
            j2 = eventsByPersistenceIdStage.toSeqNr;
        } else {
            j2 = j;
        }
        eventsByPersistenceIdStage.targetSeq_$eq(j2);
        return eventsByPersistenceIdStage.dao.fetchByPersistenceId(eventsByPersistenceIdStage.persistenceId, eventsByPersistenceIdStage.processedEntries() == 0 ? eventsByPersistenceIdStage.currentSeq() : eventsByPersistenceIdStage.currentSeq() + 1, eventsByPersistenceIdStage.targetSeq());
    }

    public EventsByPersistenceIdStage(QueryDao queryDao, String str, long j, long j2, Option<FiniteDuration> option) {
        this.dao = queryDao;
        this.persistenceId = str;
        this.toSeqNr = j2;
        this.refreshInterval = option;
        Predef$.MODULE$.require(queryDao != null, () -> {
            return "the 'dao' must be provided";
        });
        Predef$.MODULE$.require(str != null && StringOps$.MODULE$.nonEmpty$extension(Predef$.MODULE$.augmentString(str)), () -> {
            return "the 'persistenceId' must be provided";
        });
        Predef$.MODULE$.require(j >= 0, () -> {
            return "the 'fromSeqNr' must be >= 0";
        });
        Predef$.MODULE$.require(j2 >= 0, () -> {
            return "the 'toSeqNr' must be >= 0";
        });
        Predef$.MODULE$.require(j < j2, () -> {
            return "the 'fromSeqNr' must be < the 'toSeqNr'";
        });
        this.completeSwitch = new AtomicBoolean(false);
        this.processedEntries = 0L;
        this.currentSeq = j;
        this.targetSeq = 0L;
    }
}
