package akka.persistence.mysql.journal;

import akka.NotUsed;
import akka.persistence.r2dbc.client.R2dbc;
import akka.persistence.r2dbc.journal.AbstractJournalDao;
import akka.persistence.r2dbc.journal.JournalEntry;
import akka.persistence.r2dbc.journal.ResultUtils;
import akka.stream.scaladsl.Source;
import io.r2dbc.spi.Result;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.util.function.Tuples;
import scala.collection.JavaConverters;

/* loaded from: input_file:akka/persistence/mysql/journal/MySqlJournalDao.class */
final class MySqlJournalDao extends AbstractJournalDao {
    private static final String LAST_ID = "LAST_INSERT_ID()";
    private final R2dbc r2dbc;

    private static Publisher<Long> lastId(Result result) {
        return result.map((row, rowMetadata) -> {
            return (Long) row.get(LAST_ID, Long.class);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MySqlJournalDao(R2dbc r2dbc) {
        this.r2dbc = r2dbc;
    }

    public Source<Integer, NotUsed> doWriteEvents(List<JournalEntry> list) {
        return Source.fromPublisher(this.r2dbc.inTransaction(handle -> {
            return handle.executeQuery(MySqlJournalQueries.insertEventsQuery(list), (v0) -> {
                return v0.getRowsUpdated();
            }).thenMany(handle.executeQuery("SELECT LAST_INSERT_ID()", MySqlJournalDao::lastId)).flatMap(l -> {
                AtomicLong atomicLong = new AtomicLong(l.longValue());
                return Flux.fromStream(list.stream().map(journalEntry -> {
                    return Tuples.of(Long.valueOf(atomicLong.getAndIncrement()), JavaConverters.setAsJavaSet(journalEntry.tags()));
                }));
            }).collectList().filter(list2 -> {
                return list2.stream().map((v0) -> {
                    return v0.getT2();
                }).anyMatch(set -> {
                    return !set.isEmpty();
                });
            }).flatMapMany(list3 -> {
                return handle.executeQuery(MySqlJournalQueries.insertTagsQuery(list3), (v0) -> {
                    return v0.getRowsUpdated();
                });
            });
        }).defaultIfEmpty(0));
    }

    public Source<JournalEntry, NotUsed> doFetchEvents(String str, Long l, Long l2, Long l3) {
        return Source.fromPublisher(this.r2dbc.withHandle(handle -> {
            return handle.executeQuery(MySqlJournalQueries.findEventsQuery(str, l.longValue(), l2.longValue(), l3.longValue()), ResultUtils::toJournalEntry);
        }).take(l3.longValue()));
    }

    public Source<Integer, NotUsed> doDeleteEvents(String str, Long l) {
        Function function = handle -> {
            return handle.executeQuery(MySqlJournalQueries.markEventsAsDeletedQuery(str, l.longValue()), (v0) -> {
                return v0.getRowsUpdated();
            });
        };
        Function function2 = handle2 -> {
            return handle2.executeQuery(MySqlJournalQueries.highestMarkedSeqNrQuery(str), result -> {
                return ResultUtils.toSeqId(result, "sequence_nr");
            }).flatMap(l2 -> {
                return handle2.executeQuery(MySqlJournalQueries.deleteEventsQuery(str, l2.longValue() - 1), (v0) -> {
                    return v0.getRowsUpdated();
                });
            });
        };
        return Source.fromPublisher(this.r2dbc.inTransaction(handle3 -> {
            return ((Flux) function.apply(handle3)).thenMany((Publisher) function2.apply(handle3));
        }));
    }

    public Source<Long, NotUsed> doReadHighestSequenceNr(String str, Long l) {
        return Source.fromPublisher(this.r2dbc.withHandle(handle -> {
            return handle.executeQuery(MySqlJournalQueries.highestSeqNrQuery(str, l.longValue()), result -> {
                return ResultUtils.toSeqId(result, "sequence_nr");
            });
        }));
    }
}
