package com.github.j5ik2o.event.store.adapter.java.internal;

import com.github.j5ik2o.event.store.adapter.java.Aggregate;
import com.github.j5ik2o.event.store.adapter.java.AggregateId;
import com.github.j5ik2o.event.store.adapter.java.DeserializationException;
import com.github.j5ik2o.event.store.adapter.java.DeserializationRuntimeException;
import com.github.j5ik2o.event.store.adapter.java.Event;
import com.github.j5ik2o.event.store.adapter.java.EventSerializer;
import com.github.j5ik2o.event.store.adapter.java.EventStoreAsync;
import com.github.j5ik2o.event.store.adapter.java.EventStoreReadRuntimeException;
import com.github.j5ik2o.event.store.adapter.java.KeyResolver;
import com.github.j5ik2o.event.store.adapter.java.SerializationException;
import com.github.j5ik2o.event.store.adapter.java.SerializationRuntimeException;
import com.github.j5ik2o.event.store.adapter.java.SnapshotSerializer;
import com.github.j5ik2o.event.store.adapter.java.TransactionRuntimeException;
import io.vavr.Tuple2;
import io.vavr.control.Option;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsResponse;
import software.amazon.awssdk.services.dynamodb.model.TransactionCanceledException;

/* loaded from: input_file:com/github/j5ik2o/event/store/adapter/java/internal/EventStoreAsyncForDynamoDB.class */
public final class EventStoreAsyncForDynamoDB<AID extends AggregateId, A extends Aggregate<A, AID>, E extends Event<AID>> implements EventStoreAsync<AID, A, E> {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventStoreAsyncForDynamoDB.class);

    @Nonnull
    private final DynamoDbAsyncClient dynamoDbAsyncClient;

    @Nonnull
    private final String journalTableName;

    @Nonnull
    private final String snapshotTableName;

    @Nonnull
    private final String journalAidIndexName;

    @Nonnull
    private final String snapshotAidIndexName;
    private final long shardCount;

    @Nonnull
    private final Option<Long> keepSnapshotCount;

    @Nonnull
    private final Option<Duration> deleteTtl;

    @Nonnull
    private final KeyResolver<AID> keyResolver;

    @Nonnull
    private final EventSerializer<AID, E> eventSerializer;

    @Nonnull
    private final SnapshotSerializer<AID, A> snapshotSerializer;

    @Nonnull
    private final EventStoreSupport<AID, A, E> eventStoreSupport;

    EventStoreAsyncForDynamoDB(@Nonnull DynamoDbAsyncClient dynamoDbAsyncClient, @Nonnull String str, @Nonnull String str2, @Nonnull String str3, @Nonnull String str4, long j, @Nullable Long l, @Nullable Duration duration, @Nonnull KeyResolver<AID> keyResolver, @Nonnull EventSerializer<AID, E> eventSerializer, @Nonnull SnapshotSerializer<AID, A> snapshotSerializer) {
        this.dynamoDbAsyncClient = dynamoDbAsyncClient;
        this.journalTableName = str;
        this.snapshotTableName = str2;
        this.journalAidIndexName = str3;
        this.snapshotAidIndexName = str4;
        this.shardCount = j;
        this.keepSnapshotCount = Option.of(l);
        this.deleteTtl = Option.of(duration);
        this.keyResolver = keyResolver;
        this.eventSerializer = eventSerializer;
        this.snapshotSerializer = snapshotSerializer;
        this.eventStoreSupport = new EventStoreSupport<>(str, str2, str3, str4, j, this.keepSnapshotCount, this.deleteTtl, keyResolver, eventSerializer, snapshotSerializer);
    }

    public static <AID extends AggregateId, A extends Aggregate<A, AID>, E extends Event<AID>> EventStoreAsyncForDynamoDB<AID, A, E> create(@Nonnull DynamoDbAsyncClient dynamoDbAsyncClient, @Nonnull String str, @Nonnull String str2, @Nonnull String str3, @Nonnull String str4, long j) {
        return new EventStoreAsyncForDynamoDB<>(dynamoDbAsyncClient, str, str2, str3, str4, j);
    }

    EventStoreAsyncForDynamoDB(@Nonnull DynamoDbAsyncClient dynamoDbAsyncClient, @Nonnull String str, @Nonnull String str2, @Nonnull String str3, @Nonnull String str4, long j) {
        this(dynamoDbAsyncClient, str, str2, str3, str4, j, null, null, new DefaultKeyResolver(), new JsonEventSerializer(), new JsonSnapshotSerializer());
    }

    @Override // com.github.j5ik2o.event.store.adapter.java.EventStoreOptions
    @Nonnull
    public EventStoreAsyncForDynamoDB<AID, A, E> withKeepSnapshotCount(long j) {
        return new EventStoreAsyncForDynamoDB<>(this.dynamoDbAsyncClient, this.journalTableName, this.snapshotTableName, this.journalAidIndexName, this.snapshotAidIndexName, this.shardCount, Long.valueOf(j), (Duration) this.deleteTtl.getOrNull(), this.keyResolver, this.eventSerializer, this.snapshotSerializer);
    }

    @Override // com.github.j5ik2o.event.store.adapter.java.EventStoreOptions
    @Nonnull
    public EventStoreAsyncForDynamoDB<AID, A, E> withDeleteTtl(@Nonnull Duration duration) {
        return new EventStoreAsyncForDynamoDB<>(this.dynamoDbAsyncClient, this.journalTableName, this.snapshotTableName, this.journalAidIndexName, this.snapshotAidIndexName, this.shardCount, (Long) this.keepSnapshotCount.getOrNull(), duration, this.keyResolver, this.eventSerializer, this.snapshotSerializer);
    }

    @Override // com.github.j5ik2o.event.store.adapter.java.EventStoreOptions
    @Nonnull
    public EventStoreAsyncForDynamoDB<AID, A, E> withKeyResolver(@Nonnull KeyResolver<AID> keyResolver) {
        return new EventStoreAsyncForDynamoDB<>(this.dynamoDbAsyncClient, this.journalTableName, this.snapshotTableName, this.journalAidIndexName, this.snapshotAidIndexName, this.shardCount, (Long) this.keepSnapshotCount.getOrNull(), (Duration) this.deleteTtl.getOrNull(), keyResolver, this.eventSerializer, this.snapshotSerializer);
    }

    @Override // com.github.j5ik2o.event.store.adapter.java.EventStoreOptions
    @Nonnull
    public EventStoreAsyncForDynamoDB<AID, A, E> withEventSerializer(@Nonnull EventSerializer<AID, E> eventSerializer) {
        return new EventStoreAsyncForDynamoDB<>(this.dynamoDbAsyncClient, this.journalTableName, this.snapshotTableName, this.journalAidIndexName, this.snapshotAidIndexName, this.shardCount, (Long) this.keepSnapshotCount.getOrNull(), (Duration) this.deleteTtl.getOrNull(), this.keyResolver, eventSerializer, this.snapshotSerializer);
    }

    @Override // com.github.j5ik2o.event.store.adapter.java.EventStoreOptions
    @Nonnull
    public EventStoreAsyncForDynamoDB<AID, A, E> withSnapshotSerializer(@Nonnull SnapshotSerializer<AID, A> snapshotSerializer) {
        return new EventStoreAsyncForDynamoDB<>(this.dynamoDbAsyncClient, this.journalTableName, this.snapshotTableName, this.journalAidIndexName, this.snapshotAidIndexName, this.shardCount, (Long) this.keepSnapshotCount.getOrNull(), (Duration) this.deleteTtl.getOrNull(), this.keyResolver, this.eventSerializer, snapshotSerializer);
    }

    @Override // com.github.j5ik2o.event.store.adapter.java.EventStoreAsync
    @Nonnull
    public CompletableFuture<Optional<A>> getLatestSnapshotById(@Nonnull Class<A> cls, @Nonnull AID aid) {
        LOGGER.debug("getLatestSnapshotById({}, {}): start", cls, aid);
        return CompletableFuture.completedFuture(this.eventStoreSupport.getLatestSnapshotByIdQueryRequest(aid)).thenCompose(queryRequest -> {
            return this.dynamoDbAsyncClient.query(queryRequest).thenApply(queryResponse -> {
                try {
                    return this.eventStoreSupport.convertToAggregateAndVersion(queryResponse, cls);
                } catch (DeserializationException e) {
                    throw new SerializationRuntimeException(e);
                }
            }).handle((optional, th) -> {
                if (th != null) {
                    throw new EventStoreReadRuntimeException(th);
                }
                return optional;
            }).thenApply(optional2 -> {
                LOGGER.debug("getLatestSnapshotById({}, {}): finished", cls, aid);
                return optional2;
            });
        });
    }

    @Override // com.github.j5ik2o.event.store.adapter.java.EventStoreAsync
    @Nonnull
    public CompletableFuture<List<E>> getEventsByIdSinceSequenceNumber(@Nonnull Class<E> cls, @Nonnull AID aid, long j) {
        LOGGER.debug("getEventsByIdSinceSequenceNumber({}, {}, {}): start", new Object[]{cls, aid, Long.valueOf(j)});
        return CompletableFuture.completedFuture(this.eventStoreSupport.getEventsByIdSinceSequenceNumberQueryRequest(aid, j)).thenCompose(queryRequest -> {
            return this.dynamoDbAsyncClient.query(queryRequest).thenApply(queryResponse -> {
                try {
                    return this.eventStoreSupport.convertToEvents(queryResponse, cls);
                } catch (DeserializationException e) {
                    throw new DeserializationRuntimeException(e);
                }
            }).handle((list, th) -> {
                if (th != null) {
                    throw new EventStoreReadRuntimeException(th);
                }
                return list;
            }).thenApply(list2 -> {
                LOGGER.debug("getEventsByIdSinceSequenceNumber({}, {}, {}): finished", new Object[]{cls, aid, Long.valueOf(j)});
                return list2;
            });
        });
    }

    @Override // com.github.j5ik2o.event.store.adapter.java.EventStoreAsync
    @Nonnull
    public CompletableFuture<Void> persistEvent(@Nonnull E e, long j) {
        LOGGER.debug("persistEvent({}, {}): start", e, Long.valueOf(j));
        if (e.isCreated()) {
            throw new IllegalArgumentException("event is created");
        }
        return updateEventAndSnapshotOpt(e, j, Option.none()).thenCompose(transactWriteItemsResponse -> {
            return tryPurgeExcessSnapshots(e);
        }).thenRun(() -> {
            LOGGER.debug("persistEvent({}, {}): finished", e, Long.valueOf(j));
        });
    }

    @Override // com.github.j5ik2o.event.store.adapter.java.EventStoreAsync
    @Nonnull
    public CompletableFuture<Void> persistEventAndSnapshot(@Nonnull E e, @Nonnull A a) {
        LOGGER.debug("persistEventAndSnapshot({}, {}): start", e, a);
        return (e.isCreated() ? createEventAndSnapshot(e, a).thenRun(() -> {
        }) : updateEventAndSnapshotOpt(e, a.getVersion(), Option.some(a)).thenCompose(transactWriteItemsResponse -> {
            return tryPurgeExcessSnapshots(e);
        })).thenRun(() -> {
            LOGGER.debug("persistEventAndSnapshot({}, {}): finished", e, a);
        });
    }

    private CompletableFuture<TransactWriteItemsResponse> createEventAndSnapshot(@Nonnull E e, @Nonnull A a) {
        LOGGER.debug("createEventAndSnapshot({}, {}): start", e, a);
        CompletableFuture supplyAsync = CompletableFuture.supplyAsync(() -> {
            try {
                return this.eventStoreSupport.createEventAndSnapshotTransactWriteItemsRequest(e, a);
            } catch (SerializationException e2) {
                throw new SerializationRuntimeException(e2);
            }
        });
        DynamoDbAsyncClient dynamoDbAsyncClient = this.dynamoDbAsyncClient;
        Objects.requireNonNull(dynamoDbAsyncClient);
        return supplyAsync.thenCompose(dynamoDbAsyncClient::transactWriteItems).handle((transactWriteItemsResponse, th) -> {
            if (th == null) {
                return transactWriteItemsResponse;
            }
            if (th instanceof TransactionCanceledException) {
                throw new TransactionRuntimeException(th);
            }
            throw new EventStoreReadRuntimeException(th);
        }).thenApply(transactWriteItemsResponse2 -> {
            LOGGER.debug("createEventAndSnapshot({}, {}): finished", e, a);
            return transactWriteItemsResponse2;
        });
    }

    private CompletableFuture<TransactWriteItemsResponse> updateEventAndSnapshotOpt(@Nonnull E e, long j, Option<A> option) {
        LOGGER.debug("updateEventAndSnapshotOpt({}, {}, {}): start", new Object[]{e, Long.valueOf(j), option});
        CompletableFuture supplyAsync = CompletableFuture.supplyAsync(() -> {
            try {
                return this.eventStoreSupport.updateEventAndSnapshotOptTransactWriteItemsRequest(e, j, option);
            } catch (SerializationException e2) {
                throw new SerializationRuntimeException(e2);
            }
        });
        DynamoDbAsyncClient dynamoDbAsyncClient = this.dynamoDbAsyncClient;
        Objects.requireNonNull(dynamoDbAsyncClient);
        return supplyAsync.thenCompose(dynamoDbAsyncClient::transactWriteItems).handle((transactWriteItemsResponse, th) -> {
            if (th == null) {
                return transactWriteItemsResponse;
            }
            if (th instanceof TransactionCanceledException) {
                throw new TransactionRuntimeException(th);
            }
            throw new EventStoreReadRuntimeException(th);
        }).thenApply(transactWriteItemsResponse2 -> {
            LOGGER.debug("updateEventAndSnapshotOpt({}, {}, {}): finished", new Object[]{e, Long.valueOf(j), option});
            return transactWriteItemsResponse2;
        });
    }

    private CompletableFuture<Integer> getSnapshotCount(AID aid) {
        return this.dynamoDbAsyncClient.query(this.eventStoreSupport.getSnapshotCountQueryRequest(aid)).thenApply((v0) -> {
            return v0.count();
        });
    }

    private CompletableFuture<io.vavr.collection.List<Tuple2<String, String>>> getLastSnapshotKeys(AID aid, int i) {
        return CompletableFuture.completedFuture(this.eventStoreSupport.getLastSnapshotKeysQueryRequest(aid, i)).thenCompose(queryRequest -> {
            return this.dynamoDbAsyncClient.query(queryRequest).thenApply(queryResponse -> {
                ArrayList arrayList = new ArrayList();
                for (Map map : queryResponse.items()) {
                    arrayList.add(new Tuple2(((AttributeValue) map.get("pkey")).s(), ((AttributeValue) map.get("skey")).s()));
                }
                return io.vavr.collection.List.ofAll(arrayList);
            });
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    private CompletableFuture<Void> tryPurgeExcessSnapshots(E e) {
        return this.keepSnapshotCount.isDefined() ? this.deleteTtl.isDefined() ? updateTtlOfExcessSnapshots(e.getAggregateId()) : deleteExcessSnapshots(e.getAggregateId()) : CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> deleteExcessSnapshots(AID aid) {
        return this.keepSnapshotCount.isDefined() ? getSnapshotCount(aid).thenCompose(num -> {
            long intValue = (num.intValue() - 1) - ((Long) this.keepSnapshotCount.get()).longValue();
            return intValue > 0 ? getLastSnapshotKeys(aid, (int) intValue).thenCompose(list -> {
                return !list.isEmpty() ? this.dynamoDbAsyncClient.batchWriteItem(this.eventStoreSupport.batchDeleteSnapshotRequest(list)).thenRun(() -> {
                }) : CompletableFuture.completedFuture(null);
            }) : CompletableFuture.completedFuture(null);
        }) : CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> updateTtlOfExcessSnapshots(AID aid) {
        return (this.keepSnapshotCount.isDefined() && this.deleteTtl.isDefined()) ? getSnapshotCount(aid).thenCompose(num -> {
            long intValue = (num.intValue() - 1) - ((Long) this.keepSnapshotCount.get()).longValue();
            if (intValue <= 0) {
                return CompletableFuture.completedFuture(null);
            }
            CompletableFuture<io.vavr.collection.List<Tuple2<String, String>>> lastSnapshotKeys = getLastSnapshotKeys(aid, (int) intValue);
            Instant plus = Instant.now().plus((TemporalAmount) this.deleteTtl.get());
            return lastSnapshotKeys.thenCompose(list -> {
                return (CompletionStage) list.foldLeft(CompletableFuture.completedFuture(null), (completableFuture, tuple2) -> {
                    return completableFuture.thenCompose(r10 -> {
                        return this.dynamoDbAsyncClient.updateItem(this.eventStoreSupport.updateTtlOfExcessSnapshots((String) tuple2._1, (String) tuple2._2, plus.getEpochSecond())).thenRun(() -> {
                        });
                    });
                });
            });
        }) : CompletableFuture.completedFuture(null);
    }
}
