package com.github.j5ik2o.akka.persistence.dynamodb.query.dao;

import akka.NotUsed;
import akka.serialization.Serialization;
import akka.stream.Attributes;
import akka.stream.Graph;
import akka.stream.scaladsl.Concat$;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Source$;
import com.github.j5ik2o.akka.persistence.dynamodb.config.JournalColumnsDefConfig;
import com.github.j5ik2o.akka.persistence.dynamodb.config.QueryPluginConfig;
import com.github.j5ik2o.akka.persistence.dynamodb.journal.JournalRow;
import com.github.j5ik2o.akka.persistence.dynamodb.journal.PersistenceId;
import com.github.j5ik2o.akka.persistence.dynamodb.journal.PersistenceId$;
import com.github.j5ik2o.akka.persistence.dynamodb.journal.SequenceNumber;
import com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.DaoSupport;
import com.github.j5ik2o.akka.persistence.dynamodb.metrics.MetricsReporter;
import com.github.j5ik2o.reactive.aws.dynamodb.DynamoDbAsyncClient;
import com.github.j5ik2o.reactive.aws.dynamodb.akka.DynamoDbAkkaClient;
import com.github.j5ik2o.reactive.aws.dynamodb.akka.DynamoDbAkkaClient$;
import com.github.j5ik2o.reactive.aws.dynamodb.implicits$;
import com.github.j5ik2o.reactive.aws.dynamodb.model.ops.QueryRequestBuilderOps$;
import com.github.j5ik2o.reactive.aws.dynamodb.model.ops.QueryResponseOps$;
import com.github.j5ik2o.reactive.aws.dynamodb.model.ops.ScanRequestBuilderOps$;
import com.github.j5ik2o.reactive.aws.dynamodb.model.ops.ScanResponseOps$;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.immutable.Vector;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.concurrent.ExecutionContext;
import scala.math.Ordering$;
import scala.math.Ordering$Long$;
import scala.math.Ordering$String$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.Select;

/* compiled from: ReadJournalDaoImpl.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005%f\u0001\u0002\u000f\u001e\u00019B\u0001\u0002\u0011\u0001\u0003\u0002\u0003\u0006I!\u0011\u0005\t\u0015\u0002\u0011\t\u0011)A\u0005\u0017\"A\u0011\u000b\u0001B\u0001B\u0003%!\u000b\u0003\u0005Y\u0001\t\u0015\r\u0011\"\u0015Z\u0011!\u0001\u0007A!A!\u0002\u0013Q\u0006\u0002C1\u0001\u0005\u0003\u0005\u000b1\u00022\t\u000b!\u0004A\u0011A5\t\u000fE\u0004!\u0019!C\u0005e\"11\u0010\u0001Q\u0001\nMDq\u0001 \u0001C\u0002\u0013ES\u0010C\u0004\u0002\b\u0001\u0001\u000b\u0011\u0002@\t\u0013\u0005%\u0001A1A\u0005\u0012\u0005-\u0001\u0002CA\n\u0001\u0001\u0006I!!\u0004\t\u0013\u0005U\u0001A1A\u0005B\u0005]\u0001\u0002CA\u0018\u0001\u0001\u0006I!!\u0007\t\u0013\u0005E\u0002A1A\u0005B\u0005]\u0001\u0002CA\u001a\u0001\u0001\u0006I!!\u0007\t\u0013\u0005U\u0002A1A\u0005B\u0005]\u0002\u0002CA \u0001\u0001\u0006I!!\u000f\t\u0013\u0005\u0005\u0003A1A\u0005R\u0005\r\u0003\u0002CA&\u0001\u0001\u0006I!!\u0012\t\u0013\u00055\u0003A1A\u0005R\u0005-\u0001\u0002CA(\u0001\u0001\u0006I!!\u0004\t\u000f\u0005E\u0003\u0001\"\u0011\u0002T!9\u0011q\u0010\u0001\u0005B\u0005\u0005\u0005bBAM\u0001\u0011\u0005\u00131\u0014\u0005\b\u0003K\u0003A\u0011IAT\u0005I\u0011V-\u00193K_V\u0014h.\u00197EC>LU\u000e\u001d7\u000b\u0005yy\u0012a\u00013b_*\u0011\u0001%I\u0001\u0006cV,'/\u001f\u0006\u0003E\r\n\u0001\u0002Z=oC6|GM\u0019\u0006\u0003I\u0015\n1\u0002]3sg&\u001cH/\u001a8dK*\u0011aeJ\u0001\u0005C.\\\u0017M\u0003\u0002)S\u00051!.N5le=T!AK\u0016\u0002\r\u001dLG\u000f[;c\u0015\u0005a\u0013aA2p[\u000e\u00011\u0003\u0002\u00010ke\u0002\"\u0001M\u001a\u000e\u0003ER\u0011AM\u0001\u0006g\u000e\fG.Y\u0005\u0003iE\u0012a!\u00118z%\u00164\u0007C\u0001\u001c8\u001b\u0005i\u0012B\u0001\u001d\u001e\u00059\u0011V-\u00193K_V\u0014h.\u00197EC>\u0004\"A\u000f \u000e\u0003mR!A\b\u001f\u000b\u0005u\n\u0013a\u00026pkJt\u0017\r\\\u0005\u0003\u007fm\u0012!\u0002R1p'V\u0004\bo\u001c:u\u0003-\t7/\u001f8d\u00072LWM\u001c;\u0011\u0005\tCU\"A\"\u000b\u0005\t\"%BA#G\u0003\r\two\u001d\u0006\u0003\u000f\u001e\n\u0001B]3bGRLg/Z\u0005\u0003\u0013\u000e\u00131\u0003R=oC6|GIY!ts:\u001c7\t\\5f]R\fQb]3sS\u0006d\u0017N_1uS>t\u0007C\u0001'P\u001b\u0005i%B\u0001&O\u0015\u00051\u0013B\u0001)N\u00055\u0019VM]5bY&T\u0018\r^5p]\u0006a\u0001\u000f\\;hS:\u001cuN\u001c4jOB\u00111KV\u0007\u0002)*\u0011Q+I\u0001\u0007G>tg-[4\n\u0005]#&!E)vKJL\b\u000b\\;hS:\u001cuN\u001c4jO\u0006yQ.\u001a;sS\u000e\u001c(+\u001a9peR,'/F\u0001[!\tYf,D\u0001]\u0015\ti\u0016%A\u0004nKR\u0014\u0018nY:\n\u0005}c&aD'fiJL7m\u001d*fa>\u0014H/\u001a:\u0002!5,GO]5dgJ+\u0007o\u001c:uKJ\u0004\u0013AA3d!\t\u0019g-D\u0001e\u0015\t)\u0017'\u0001\u0006d_:\u001cWO\u001d:f]RL!a\u001a3\u0003!\u0015CXmY;uS>t7i\u001c8uKb$\u0018A\u0002\u001fj]&$h\bF\u0003k[:|\u0007\u000f\u0006\u0002lYB\u0011a\u0007\u0001\u0005\u0006C\u001e\u0001\u001dA\u0019\u0005\u0006\u0001\u001e\u0001\r!\u0011\u0005\u0006\u0015\u001e\u0001\ra\u0013\u0005\u0006#\u001e\u0001\rA\u0015\u0005\u00061\u001e\u0001\rAW\u0001\u0007Y><w-\u001a:\u0016\u0003M\u0004\"\u0001^=\u000e\u0003UT!A^<\u0002\u000bMdg\r\u000e6\u000b\u0003a\f1a\u001c:h\u0013\tQXO\u0001\u0004M_\u001e<WM]\u0001\bY><w-\u001a:!\u00031\u0019HO]3b[\u000ec\u0017.\u001a8u+\u0005q\bcA@\u0002\u00045\u0011\u0011\u0011\u0001\u0006\u0003M\rKA!!\u0002\u0002\u0002\t\u0011B)\u001f8b[>$%-Q6lC\u000ec\u0017.\u001a8u\u00035\u0019HO]3b[\u000ec\u0017.\u001a8uA\u0005Q1\u000f[1sI\u000e{WO\u001c;\u0016\u0005\u00055\u0001c\u0001\u0019\u0002\u0010%\u0019\u0011\u0011C\u0019\u0003\u0007%sG/A\u0006tQ\u0006\u0014HmQ8v]R\u0004\u0013!\u0003;bE2,g*Y7f+\t\tI\u0002\u0005\u0003\u0002\u001c\u0005%b\u0002BA\u000f\u0003K\u00012!a\b2\u001b\t\t\tCC\u0002\u0002$5\na\u0001\u0010:p_Rt\u0014bAA\u0014c\u00051\u0001K]3eK\u001aLA!a\u000b\u0002.\t11\u000b\u001e:j]\u001eT1!a\n2\u0003)!\u0018M\u00197f\u001d\u0006lW\rI\u0001\u0018O\u0016$(j\\;s]\u0006d'k\\<t\u0013:$W\r\u001f(b[\u0016\f\u0001dZ3u\u0015>,(O\\1m%><8/\u00138eKbt\u0015-\\3!\u0003A\u0019w\u000e\\;n]N$UMZ\"p]\u001aLw-\u0006\u0002\u0002:A\u00191+a\u000f\n\u0007\u0005uBKA\fK_V\u0014h.\u00197D_2,XN\\:EK\u001a\u001cuN\u001c4jO\u0006\t2m\u001c7v[:\u001cH)\u001a4D_:4\u0017n\u001a\u0011\u0002\u001d\r|gn]5ti\u0016tGOU3bIV\u0011\u0011Q\t\t\u0004a\u0005\u001d\u0013bAA%c\t9!i\\8mK\u0006t\u0017aD2p]NL7\u000f^3oiJ+\u0017\r\u001a\u0011\u0002\u001dE,XM]=CCR\u001c\u0007nU5{K\u0006y\u0011/^3ss\n\u000bGo\u00195TSj,\u0007%A\tbY2\u0004VM]:jgR,gnY3JIN$B!!\u0016\u0002vAA\u0011qKA1\u0003K\ni'\u0004\u0002\u0002Z)!\u00111LA/\u0003!\u00198-\u00197bINd'bAA0\u001d\u000611\u000f\u001e:fC6LA!a\u0019\u0002Z\t11k\\;sG\u0016\u0004B!a\u001a\u0002j5\tA(C\u0002\u0002lq\u0012Q\u0002U3sg&\u001cH/\u001a8dK&#\u0007\u0003BA8\u0003cj\u0011AT\u0005\u0004\u0003gr%a\u0002(piV\u001bX\r\u001a\u0005\b\u0003oB\u0002\u0019AA=\u0003\ri\u0017\r\u001f\t\u0004a\u0005m\u0014bAA?c\t!Aj\u001c8h\u0003-)g/\u001a8ug\nKH+Y4\u0015\u0015\u0005\r\u00151RAH\u0003'\u000b9\n\u0005\u0005\u0002X\u0005\u0005\u0014QQA7!\u0011\t9'a\"\n\u0007\u0005%EH\u0001\u0006K_V\u0014h.\u00197S_^Dq!!$\u001a\u0001\u0004\tI\"A\u0002uC\u001eDq!!%\u001a\u0001\u0004\tI(\u0001\u0004pM\u001a\u001cX\r\u001e\u0005\b\u0003+K\u0002\u0019AA=\u0003%i\u0017\r_(gMN,G\u000fC\u0004\u0002xe\u0001\r!!\u001f\u0002\u001f)|WO\u001d8bYN+\u0017/^3oG\u0016$b!!(\u0002 \u0006\u0005\u0006\u0003CA,\u0003C\nI(!\u001c\t\u000f\u0005E%\u00041\u0001\u0002z!9\u00111\u0015\u000eA\u0002\u0005e\u0014!\u00027j[&$\u0018AE7bq*{WO\u001d8bYN+\u0017/^3oG\u0016$\"!!(")
/* loaded from: input_file:com/github/j5ik2o/akka/persistence/dynamodb/query/dao/ReadJournalDaoImpl.class */
public class ReadJournalDaoImpl implements ReadJournalDao, DaoSupport {
    private final QueryPluginConfig pluginConfig;
    private final MetricsReporter metricsReporter;
    private final Logger com$github$j5ik2o$akka$persistence$dynamodb$query$dao$ReadJournalDaoImpl$$logger;
    private final DynamoDbAkkaClient streamClient;
    private final int shardCount;
    private final String tableName;
    private final String getJournalRowsIndexName;
    private final JournalColumnsDefConfig columnsDefConfig;
    private final boolean consistentRead;
    private final int queryBatchSize;
    private final Logger com$github$j5ik2o$akka$persistence$dynamodb$journal$dao$DaoSupport$$logger;
    private final Attributes logLevels;
    private final Source<Object, NotUsed> startTimeSource;

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.query.dao.ReadJournalDao, com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.DaoSupport
    public Source<JournalRow, NotUsed> getMessages(PersistenceId persistenceId, SequenceNumber sequenceNumber, SequenceNumber sequenceNumber2, long j, Option<Object> option) {
        return getMessages(persistenceId, sequenceNumber, sequenceNumber2, j, option);
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.query.dao.ReadJournalDao, com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.DaoSupport
    public Option<Object> getMessages$default$5() {
        return getMessages$default$5();
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.DaoSupport
    public JournalRow convertToJournalRow(Map<String, AttributeValue> map) {
        return convertToJournalRow(map);
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.DaoSupport
    public Logger com$github$j5ik2o$akka$persistence$dynamodb$journal$dao$DaoSupport$$logger() {
        return this.com$github$j5ik2o$akka$persistence$dynamodb$journal$dao$DaoSupport$$logger;
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.DaoSupport
    public Attributes logLevels() {
        return this.logLevels;
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.DaoSupport
    public Source<Object, NotUsed> startTimeSource() {
        return this.startTimeSource;
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.DaoSupport
    public final void com$github$j5ik2o$akka$persistence$dynamodb$journal$dao$DaoSupport$_setter_$com$github$j5ik2o$akka$persistence$dynamodb$journal$dao$DaoSupport$$logger_$eq(Logger logger) {
        this.com$github$j5ik2o$akka$persistence$dynamodb$journal$dao$DaoSupport$$logger = logger;
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.DaoSupport
    public void com$github$j5ik2o$akka$persistence$dynamodb$journal$dao$DaoSupport$_setter_$logLevels_$eq(Attributes attributes) {
        this.logLevels = attributes;
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.DaoSupport
    public void com$github$j5ik2o$akka$persistence$dynamodb$journal$dao$DaoSupport$_setter_$startTimeSource_$eq(Source<Object, NotUsed> source) {
        this.startTimeSource = source;
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.DaoSupport
    public MetricsReporter metricsReporter() {
        return this.metricsReporter;
    }

    public Logger com$github$j5ik2o$akka$persistence$dynamodb$query$dao$ReadJournalDaoImpl$$logger() {
        return this.com$github$j5ik2o$akka$persistence$dynamodb$query$dao$ReadJournalDaoImpl$$logger;
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.DaoSupport
    public DynamoDbAkkaClient streamClient() {
        return this.streamClient;
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.DaoSupport
    public int shardCount() {
        return this.shardCount;
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.DaoSupport
    public String tableName() {
        return this.tableName;
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.DaoSupport
    public String getJournalRowsIndexName() {
        return this.getJournalRowsIndexName;
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.DaoSupport
    public JournalColumnsDefConfig columnsDefConfig() {
        return this.columnsDefConfig;
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.DaoSupport
    public boolean consistentRead() {
        return this.consistentRead;
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.DaoSupport
    public int queryBatchSize() {
        return this.queryBatchSize;
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.query.dao.ReadJournalDao
    public Source<PersistenceId, NotUsed> allPersistenceIds(long j) {
        return startTimeSource().flatMapConcat(obj -> {
            return $anonfun$allPersistenceIds$1(this, j, BoxesRunTime.unboxToLong(obj));
        });
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.query.dao.ReadJournalDao
    public Source<JournalRow, NotUsed> eventsByTag(String str, long j, long j2, long j3) {
        return startTimeSource().flatMapConcat(obj -> {
            return $anonfun$eventsByTag$1(this, str, j, j2, j3, BoxesRunTime.unboxToLong(obj));
        });
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.query.dao.ReadJournalDao
    public Source<Object, NotUsed> journalSequence(long j, long j2) {
        return startTimeSource().flatMapConcat(obj -> {
            return $anonfun$journalSequence$1(this, j, j2, BoxesRunTime.unboxToLong(obj));
        });
    }

    @Override // com.github.j5ik2o.akka.persistence.dynamodb.query.dao.ReadJournalDao
    public Source<Object, NotUsed> maxJournalSequence() {
        return Source$.MODULE$.single(BoxesRunTime.boxToLong(Long.MAX_VALUE));
    }

    public static final /* synthetic */ Graph $anonfun$allPersistenceIds$5(int i) {
        return Concat$.MODULE$.apply(i);
    }

    private final Source loop$1(Option option, Source source, long j, int i, long j2) {
        com$github$j5ik2o$akka$persistence$dynamodb$query$dao$ReadJournalDaoImpl$$logger().debug(new StringBuilder(18).append("index = ").append(i).append(", count = ").append(j).toString());
        return Source$.MODULE$.single((ScanRequest) ScanRequestBuilderOps$.MODULE$.exclusiveStartKeyAsScala$extension(implicits$.MODULE$.toScanRequestBuilderOps(ScanRequest.builder().tableName(tableName()).select(Select.SPECIFIC_ATTRIBUTES).attributesToGet(new String[]{columnsDefConfig().deletedColumnName(), columnsDefConfig().persistenceIdColumnName()}).limit(Predef$.MODULE$.int2Integer(queryBatchSize()))), option).build()).via(streamClient().scanFlow(1)).flatMapConcat(scanResponse -> {
            if (!scanResponse.sdkHttpResponse().isSuccessful()) {
                int statusCode = scanResponse.sdkHttpResponse().statusCode();
                Optional statusText = scanResponse.sdkHttpResponse().statusText();
                this.com$github$j5ik2o$akka$persistence$dynamodb$query$dao$ReadJournalDaoImpl$$logger().debug(new StringBuilder(41).append("allPersistenceIdsSource(max = ").append(j2).append("): finished").toString());
                return Source$.MODULE$.failed(new IOException(new StringBuilder(12).append("statusCode: ").append(statusCode).append(implicits$.MODULE$.toOption(statusText).fold(() -> {
                    return "";
                }, optional -> {
                    return new StringBuilder(2).append(", ").append(optional).toString();
                })).toString()));
            }
            Vector vector = ((TraversableOnce) ScanResponseOps$.MODULE$.itemsAsScala$extension(implicits$.MODULE$.toScanResponseOps(scanResponse)).getOrElse(() -> {
                return Seq$.MODULE$.empty();
            })).toVector();
            Map map = (Map) ScanResponseOps$.MODULE$.lastEvaluatedKeyAsScala$extension(implicits$.MODULE$.toScanResponseOps(scanResponse)).getOrElse(() -> {
                return Predef$.MODULE$.Map().empty();
            });
            Source combine = Source$.MODULE$.combine(source, Source$.MODULE$.apply(vector), Predef$.MODULE$.wrapRefArray(new Source[0]), obj -> {
                return $anonfun$allPersistenceIds$5(BoxesRunTime.unboxToInt(obj));
            });
            if (!map.nonEmpty() || j + Predef$.MODULE$.Integer2int(scanResponse.count()) >= j2) {
                return combine;
            }
            this.com$github$j5ik2o$akka$persistence$dynamodb$query$dao$ReadJournalDaoImpl$$logger().debug(new StringBuilder(19).append("index = ").append(i).append(", next loop").toString());
            return this.loop$1(implicits$.MODULE$.toOption(map), combine, j + Predef$.MODULE$.Integer2int(scanResponse.count()), i + 1, j2);
        });
    }

    public static final /* synthetic */ boolean $anonfun$allPersistenceIds$8(ReadJournalDaoImpl readJournalDaoImpl, Map map) {
        return Predef$.MODULE$.Boolean2boolean((Boolean) implicits$.MODULE$.toOption(((AttributeValue) map.apply(readJournalDaoImpl.columnsDefConfig().deletedColumnName())).bool()).get());
    }

    public static final /* synthetic */ Source $anonfun$allPersistenceIds$1(ReadJournalDaoImpl readJournalDaoImpl, long j, long j2) {
        readJournalDaoImpl.com$github$j5ik2o$akka$persistence$dynamodb$query$dao$ReadJournalDaoImpl$$logger().debug(new StringBuilder(38).append("allPersistenceIdsSource(max = ").append(j).append("): start").toString());
        return readJournalDaoImpl.loop$1(None$.MODULE$, Source$.MODULE$.empty(), 0L, 1, j).filterNot(map -> {
            return BoxesRunTime.boxToBoolean($anonfun$allPersistenceIds$8(readJournalDaoImpl, map));
        }).map(map2 -> {
            return (String) implicits$.MODULE$.toOption(((AttributeValue) map2.apply(readJournalDaoImpl.columnsDefConfig().persistenceIdColumnName())).s()).get();
        }).fold(Predef$.MODULE$.Set().empty(), (set, str) -> {
            return set.$plus(str);
        }).mapConcat(set2 -> {
            return set2.toVector();
        }).map(PersistenceId$.MODULE$).take(j).map(persistenceId -> {
            readJournalDaoImpl.metricsReporter().setAllPersistenceIdsCallDuration(System.nanoTime() - j2);
            readJournalDaoImpl.metricsReporter().incrementAllPersistenceIdsCallCounter();
            readJournalDaoImpl.com$github$j5ik2o$akka$persistence$dynamodb$query$dao$ReadJournalDaoImpl$$logger().debug(new StringBuilder(41).append("allPersistenceIdsSource(max = ").append(j).append("): finished").toString());
            return persistenceId;
        }).recoverWithRetries(1, new ReadJournalDaoImpl$$anonfun$$nestedInanonfun$allPersistenceIds$1$1(readJournalDaoImpl, j2, j)).withAttributes(readJournalDaoImpl.logLevels());
    }

    public static final /* synthetic */ Graph $anonfun$eventsByTag$6(int i) {
        return Concat$.MODULE$.apply(i);
    }

    private final Source loop$2(Option option, Source source, long j, int i, String str, long j2, long j3, long j4, long j5) {
        com$github$j5ik2o$akka$persistence$dynamodb$query$dao$ReadJournalDaoImpl$$logger().debug(new StringBuilder(18).append("index = ").append(i).append(", count = ").append(j).toString());
        return Source$.MODULE$.single((ScanRequest) ScanRequestBuilderOps$.MODULE$.exclusiveStartKeyAsScala$extension(implicits$.MODULE$.toScanRequestBuilderOps(ScanRequestBuilderOps$.MODULE$.expressionAttributeValuesAsScala$extension(implicits$.MODULE$.toScanRequestBuilderOps(ScanRequestBuilderOps$.MODULE$.expressionAttributeNamesAsScala$extension(implicits$.MODULE$.toScanRequestBuilderOps(ScanRequest.builder().tableName(tableName()).indexName(this.pluginConfig.tagsIndexName()).filterExpression("contains(#tags, :tag)")), implicits$.MODULE$.toOption(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("#tags"), columnsDefConfig().tagsColumnName())}))))), implicits$.MODULE$.toOption(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(":tag"), AttributeValue.builder().s(str).build())})))).limit(Predef$.MODULE$.int2Integer(queryBatchSize()))), option).build()).via(streamClient().scanFlow(1)).flatMapConcat(scanResponse -> {
            this.metricsReporter().setEventsByTagItemDuration(System.nanoTime() - j2);
            if (!scanResponse.sdkHttpResponse().isSuccessful()) {
                this.metricsReporter().incrementEventsByTagItemCallErrorCounter();
                int statusCode = scanResponse.sdkHttpResponse().statusCode();
                Optional statusText = scanResponse.sdkHttpResponse().statusText();
                this.com$github$j5ik2o$akka$persistence$dynamodb$query$dao$ReadJournalDaoImpl$$logger().debug(new StringBuilder(62).append("eventsByTag(tag = ").append(str).append(", offset = ").append(j3).append(", maxOffset = ").append(j4).append(", max = ").append(j5).append("): finished").toString());
                return Source$.MODULE$.failed(new IOException(new StringBuilder(12).append("statusCode: ").append(statusCode).append(implicits$.MODULE$.toOption(statusText).fold(() -> {
                    return "";
                }, optional -> {
                    return new StringBuilder(2).append(", ").append(optional).toString();
                })).toString()));
            }
            this.metricsReporter().incrementEventsByTagItemCallCounter();
            if (Predef$.MODULE$.Integer2int(scanResponse.count()) > 0) {
                this.metricsReporter().addEventsByTagItemCounter(Predef$.MODULE$.Integer2int(scanResponse.count()));
            }
            Vector vector = ((TraversableOnce) ScanResponseOps$.MODULE$.itemsAsScala$extension(implicits$.MODULE$.toScanResponseOps(scanResponse)).getOrElse(() -> {
                return Seq$.MODULE$.empty();
            })).toVector();
            Map map = (Map) ScanResponseOps$.MODULE$.lastEvaluatedKeyAsScala$extension(implicits$.MODULE$.toScanResponseOps(scanResponse)).getOrElse(() -> {
                return Predef$.MODULE$.Map().empty();
            });
            Source combine = Source$.MODULE$.combine(source, Source$.MODULE$.apply(vector), Predef$.MODULE$.wrapRefArray(new Source[0]), obj -> {
                return $anonfun$eventsByTag$6(BoxesRunTime.unboxToInt(obj));
            });
            if (!map.nonEmpty()) {
                return combine;
            }
            this.com$github$j5ik2o$akka$persistence$dynamodb$query$dao$ReadJournalDaoImpl$$logger().debug(new StringBuilder(19).append("index = ").append(i).append(", next loop").toString());
            return this.loop$2(implicits$.MODULE$.toOption(map), combine, j + Predef$.MODULE$.Integer2int(scanResponse.count()), i + 1, str, j2, j3, j4, j5);
        });
    }

    public static final /* synthetic */ boolean $anonfun$eventsByTag$16(long j, long j2, JournalRow journalRow) {
        return journalRow.ordering() > j && journalRow.ordering() <= j2;
    }

    public static final /* synthetic */ Source $anonfun$eventsByTag$2(ReadJournalDaoImpl readJournalDaoImpl, String str, long j, long j2, long j3, long j4, long j5) {
        return readJournalDaoImpl.loop$2(None$.MODULE$, Source$.MODULE$.empty(), 0L, 1, str, j5, j, j2, j3).map(map -> {
            return readJournalDaoImpl.convertToJournalRow(map);
        }).fold(ArrayBuffer$.MODULE$.empty(), (arrayBuffer, journalRow) -> {
            return arrayBuffer.$plus$eq(journalRow);
        }).map(arrayBuffer2 -> {
            return (ArrayBuffer) arrayBuffer2.sortBy(journalRow2 -> {
                return new Tuple2(journalRow2.persistenceId().value(), BoxesRunTime.boxToLong(journalRow2.sequenceNumber().value()));
            }, Ordering$.MODULE$.Tuple2(Ordering$String$.MODULE$, Ordering$Long$.MODULE$));
        }).mapConcat(arrayBuffer3 -> {
            return arrayBuffer3.toVector();
        }).statefulMapConcat(() -> {
            AtomicLong atomicLong = new AtomicLong();
            return journalRow2 -> {
                return new $colon.colon(journalRow2.withOrdering(atomicLong.incrementAndGet()), Nil$.MODULE$);
            };
        }).filter(journalRow2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$eventsByTag$16(j, j2, journalRow2));
        }).take(j3).map(journalRow3 -> {
            readJournalDaoImpl.metricsReporter().setEventsByTagCallDuration(System.nanoTime() - j4);
            readJournalDaoImpl.metricsReporter().incrementEventsByTagCallCounter();
            readJournalDaoImpl.com$github$j5ik2o$akka$persistence$dynamodb$query$dao$ReadJournalDaoImpl$$logger().debug(new StringBuilder(62).append("eventsByTag(tag = ").append(str).append(", offset = ").append(j).append(", maxOffset = ").append(j2).append(", max = ").append(j3).append("): finished").toString());
            return journalRow3;
        }).recoverWithRetries(1, new ReadJournalDaoImpl$$anonfun$$nestedInanonfun$eventsByTag$2$1(readJournalDaoImpl, j4, str, j, j2, j3)).withAttributes(readJournalDaoImpl.logLevels());
    }

    public static final /* synthetic */ Source $anonfun$eventsByTag$1(ReadJournalDaoImpl readJournalDaoImpl, String str, long j, long j2, long j3, long j4) {
        readJournalDaoImpl.com$github$j5ik2o$akka$persistence$dynamodb$query$dao$ReadJournalDaoImpl$$logger().debug(new StringBuilder(59).append("eventsByTag(tag = ").append(str).append(", offset = ").append(j).append(", maxOffset = ").append(j2).append(", max = ").append(j3).append("): start").toString());
        return readJournalDaoImpl.startTimeSource().flatMapConcat(obj -> {
            return $anonfun$eventsByTag$2(readJournalDaoImpl, str, j, j2, j3, j4, BoxesRunTime.unboxToLong(obj));
        });
    }

    public static final /* synthetic */ Graph $anonfun$journalSequence$6(int i) {
        return Concat$.MODULE$.apply(i);
    }

    private final Source loop$3(Option option, Source source, long j, int i, long j2, long j3, long j4) {
        com$github$j5ik2o$akka$persistence$dynamodb$query$dao$ReadJournalDaoImpl$$logger().debug(new StringBuilder(18).append("index = ").append(i).append(", count = ").append(j).toString());
        return Source$.MODULE$.single((QueryRequest) QueryRequestBuilderOps$.MODULE$.exclusiveStartKeyAsScala$extension(implicits$.MODULE$.toQueryRequestBuilderOps(QueryRequest.builder().tableName(tableName()).select(Select.SPECIFIC_ATTRIBUTES).attributesToGet(new String[]{columnsDefConfig().orderingColumnName()}).limit(Predef$.MODULE$.int2Integer(queryBatchSize()))), option).build()).via(streamClient().queryFlow(1)).flatMapConcat(queryResponse -> {
            this.metricsReporter().setJournalSequenceItemDuration(System.nanoTime() - j2);
            if (!queryResponse.sdkHttpResponse().isSuccessful()) {
                this.metricsReporter().incrementEventsByTagItemCallErrorCounter();
                int statusCode = queryResponse.sdkHttpResponse().statusCode();
                Optional statusText = queryResponse.sdkHttpResponse().statusText();
                this.com$github$j5ik2o$akka$persistence$dynamodb$query$dao$ReadJournalDaoImpl$$logger().debug(new StringBuilder(46).append("journalSequence(offset = ").append(j3).append(", limit = ").append(j4).append("): finished").toString());
                return Source$.MODULE$.failed(new IOException(new StringBuilder(12).append("statusCode: ").append(statusCode).append(implicits$.MODULE$.toOption(statusText).fold(() -> {
                    return "";
                }, optional -> {
                    return new StringBuilder(2).append(", ").append(optional).toString();
                })).toString()));
            }
            this.metricsReporter().addJournalSequenceItemCounter(Predef$.MODULE$.Integer2int(queryResponse.count()));
            Vector vector = ((TraversableOnce) QueryResponseOps$.MODULE$.itemsAsScala$extension(implicits$.MODULE$.toQueryResponseOps(queryResponse)).getOrElse(() -> {
                return Seq$.MODULE$.empty();
            })).toVector();
            Map map = (Map) QueryResponseOps$.MODULE$.lastEvaluatedKeyAsScala$extension(implicits$.MODULE$.toQueryResponseOps(queryResponse)).getOrElse(() -> {
                return Predef$.MODULE$.Map().empty();
            });
            Source combine = Source$.MODULE$.combine(source, Source$.MODULE$.apply(vector), Predef$.MODULE$.wrapRefArray(new Source[0]), obj -> {
                return $anonfun$journalSequence$6(BoxesRunTime.unboxToInt(obj));
            });
            if (!map.nonEmpty()) {
                return combine;
            }
            this.com$github$j5ik2o$akka$persistence$dynamodb$query$dao$ReadJournalDaoImpl$$logger().debug(new StringBuilder(19).append("index = ").append(i).append(", next loop").toString());
            return this.loop$3(implicits$.MODULE$.toOption(map), combine, j + Predef$.MODULE$.Integer2int(queryResponse.count()), i + 1, j2, j3, j4);
        });
    }

    public static final /* synthetic */ long $anonfun$journalSequence$9(ReadJournalDaoImpl readJournalDaoImpl, Map map) {
        return new StringOps(Predef$.MODULE$.augmentString(((AttributeValue) map.apply(readJournalDaoImpl.columnsDefConfig().orderingColumnName())).n())).toLong();
    }

    public static final /* synthetic */ Source $anonfun$journalSequence$2(ReadJournalDaoImpl readJournalDaoImpl, long j, long j2, long j3, long j4) {
        return readJournalDaoImpl.loop$3(None$.MODULE$, Source$.MODULE$.empty(), 0L, 1, j4, j, j2).map(map -> {
            return BoxesRunTime.boxToLong($anonfun$journalSequence$9(readJournalDaoImpl, map));
        }).drop(j).take(j2).withAttributes(readJournalDaoImpl.logLevels()).map(j5 -> {
            readJournalDaoImpl.metricsReporter().setJournalSequenceCallDuration(System.nanoTime() - j3);
            readJournalDaoImpl.metricsReporter().incrementJournalSequenceCallCounter();
            readJournalDaoImpl.com$github$j5ik2o$akka$persistence$dynamodb$query$dao$ReadJournalDaoImpl$$logger().debug(new StringBuilder(46).append("journalSequence(offset = ").append(j).append(", limit = ").append(j2).append("): finished").toString());
            return j5;
        }).recoverWithRetries(1, new ReadJournalDaoImpl$$anonfun$$nestedInanonfun$journalSequence$2$1(readJournalDaoImpl, j3, j, j2));
    }

    public static final /* synthetic */ Source $anonfun$journalSequence$1(ReadJournalDaoImpl readJournalDaoImpl, long j, long j2, long j3) {
        readJournalDaoImpl.com$github$j5ik2o$akka$persistence$dynamodb$query$dao$ReadJournalDaoImpl$$logger().debug(new StringBuilder(43).append("journalSequence(offset = ").append(j).append(", limit = ").append(j2).append("): start").toString());
        return readJournalDaoImpl.startTimeSource().flatMapConcat(obj -> {
            return $anonfun$journalSequence$2(readJournalDaoImpl, j, j2, j3, BoxesRunTime.unboxToLong(obj));
        });
    }

    public ReadJournalDaoImpl(DynamoDbAsyncClient dynamoDbAsyncClient, Serialization serialization, QueryPluginConfig queryPluginConfig, MetricsReporter metricsReporter, ExecutionContext executionContext) {
        this.pluginConfig = queryPluginConfig;
        this.metricsReporter = metricsReporter;
        DaoSupport.$init$(this);
        this.com$github$j5ik2o$akka$persistence$dynamodb$query$dao$ReadJournalDaoImpl$$logger = LoggerFactory.getLogger(getClass());
        this.streamClient = DynamoDbAkkaClient$.MODULE$.apply(dynamoDbAsyncClient);
        this.shardCount = queryPluginConfig.shardCount();
        this.tableName = queryPluginConfig.tableName();
        this.getJournalRowsIndexName = queryPluginConfig.getJournalRowsIndexName();
        this.columnsDefConfig = queryPluginConfig.columnsDefConfig();
        this.consistentRead = queryPluginConfig.consistentRead();
        this.queryBatchSize = queryPluginConfig.queryBatchSize();
    }
}
