package io.axoniq.axondb.client.axon;

import com.google.protobuf.ByteString;
import io.axoniq.axondb.Event;
import io.axoniq.axondb.client.AppendEventTransaction;
import io.axoniq.axondb.client.AxonDBClient;
import io.axoniq.axondb.client.AxonDBConfiguration;
import io.axoniq.axondb.client.util.FlowControllingStreamObserver;
import io.axoniq.axondb.grpc.EventWithToken;
import io.axoniq.axondb.grpc.GetAggregateEventsRequest;
import io.axoniq.axondb.grpc.GetEventsRequest;
import io.axoniq.axondb.grpc.QueryEventsRequest;
import io.axoniq.axondb.grpc.QueryEventsResponse;
import io.axoniq.axondb.grpc.ReadHighestSequenceNrResponse;
import io.grpc.stub.StreamObserver;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.axonframework.common.Assert;
import org.axonframework.common.ObjectUtils;
import org.axonframework.common.jdbc.PersistenceExceptionResolver;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventsourcing.DomainEventMessage;
import org.axonframework.eventsourcing.GenericDomainEventMessage;
import org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine;
import org.axonframework.eventsourcing.eventstore.AbstractEventStore;
import org.axonframework.eventsourcing.eventstore.DomainEventData;
import org.axonframework.eventsourcing.eventstore.DomainEventStream;
import org.axonframework.eventsourcing.eventstore.EventStoreException;
import org.axonframework.eventsourcing.eventstore.EventUtils;
import org.axonframework.eventsourcing.eventstore.GlobalSequenceTrackingToken;
import org.axonframework.eventsourcing.eventstore.TrackedEventData;
import org.axonframework.eventsourcing.eventstore.TrackingEventStream;
import org.axonframework.eventsourcing.eventstore.TrackingToken;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.serialization.MessageSerializer;
import org.axonframework.serialization.SerializedObject;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.upcasting.event.EventUpcaster;
import org.axonframework.serialization.upcasting.event.NoOpEventUpcaster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/axoniq/axondb/client/axon/AxonDBEventStore.class */
public class AxonDBEventStore extends AbstractEventStore {
    private static final Logger logger = LoggerFactory.getLogger(AxonDBEventStore.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/axoniq/axondb/client/axon/AxonDBEventStore$AxonIQEventStorageEngine.class */
    public static class AxonIQEventStorageEngine extends AbstractEventStorageEngine {
        public static final int ALLOW_SNAPSHOTS_MAGIC_VALUE = -42;
        private final String APPEND_EVENT_TRANSACTION;
        private final EventUpcaster upcasterChain;
        private final AxonDBConfiguration configuration;
        private final AxonDBClient eventStoreClient;
        private final GrpcMetaDataConverter converter;

        private AxonIQEventStorageEngine(Serializer serializer, EventUpcaster eventUpcaster, AxonDBConfiguration axonDBConfiguration, AxonDBClient axonDBClient) {
            super(serializer, eventUpcaster, (PersistenceExceptionResolver) null);
            this.APPEND_EVENT_TRANSACTION = this + "/APPEND_EVENT_TRANSACTION";
            this.upcasterChain = (EventUpcaster) ObjectUtils.getOrDefault(eventUpcaster, NoOpEventUpcaster.INSTANCE);
            this.configuration = axonDBConfiguration;
            this.eventStoreClient = axonDBClient;
            this.converter = new GrpcMetaDataConverter(serializer);
        }

        private AxonIQEventStorageEngine(Serializer serializer, Serializer serializer2, EventUpcaster eventUpcaster, AxonDBConfiguration axonDBConfiguration, AxonDBClient axonDBClient) {
            super(serializer, eventUpcaster, (PersistenceExceptionResolver) null, serializer2, (Predicate) null);
            this.APPEND_EVENT_TRANSACTION = this + "/APPEND_EVENT_TRANSACTION";
            this.upcasterChain = (EventUpcaster) ObjectUtils.getOrDefault(eventUpcaster, NoOpEventUpcaster.INSTANCE);
            this.configuration = axonDBConfiguration;
            this.eventStoreClient = axonDBClient;
            this.converter = new GrpcMetaDataConverter(serializer);
        }

        protected void appendEvents(List<? extends EventMessage<?>> list, Serializer serializer) {
            AppendEventTransaction createAppendEventConnection = CurrentUnitOfWork.isStarted() ? (AppendEventTransaction) CurrentUnitOfWork.get().root().getOrComputeResource(this.APPEND_EVENT_TRANSACTION, str -> {
                AppendEventTransaction createAppendEventConnection2 = this.eventStoreClient.createAppendEventConnection();
                CurrentUnitOfWork.get().root().onRollback(unitOfWork -> {
                    createAppendEventConnection2.rollback(unitOfWork.getExecutionResult().getExceptionResult());
                });
                CurrentUnitOfWork.get().root().onCommit(unitOfWork2 -> {
                    createAppendEventConnection2.commit();
                });
                return createAppendEventConnection2;
            }) : this.eventStoreClient.createAppendEventConnection();
            Iterator<? extends EventMessage<?>> it = list.iterator();
            while (it.hasNext()) {
                createAppendEventConnection.append(map(it.next(), serializer));
            }
            if (CurrentUnitOfWork.isStarted()) {
                return;
            }
            createAppendEventConnection.commit();
        }

        public Event map(EventMessage eventMessage, Serializer serializer) {
            Event.Builder newBuilder = Event.newBuilder();
            if (eventMessage instanceof GenericDomainEventMessage) {
                newBuilder.setAggregateIdentifier(((GenericDomainEventMessage) eventMessage).getAggregateIdentifier()).setAggregateSequenceNumber(((GenericDomainEventMessage) eventMessage).getSequenceNumber()).setAggregateType(((GenericDomainEventMessage) eventMessage).getType());
            }
            SerializedObject serializePayload = MessageSerializer.serializePayload(eventMessage, serializer, byte[].class);
            newBuilder.setMessageIdentifier(eventMessage.getIdentifier()).setPayload(io.axoniq.platform.SerializedObject.newBuilder().setType(serializePayload.getType().getName()).setRevision((String) ObjectUtils.getOrDefault(serializePayload.getType().getRevision(), "")).setData(ByteString.copyFrom((byte[]) serializePayload.getData()))).setTimestamp(eventMessage.getTimestamp().toEpochMilli());
            eventMessage.getMetaData().forEach((str, obj) -> {
                newBuilder.putMetaData(str, this.converter.convertToMetaDataValue(obj));
            });
            return newBuilder.m40build();
        }

        protected void storeSnapshot(DomainEventMessage<?> domainEventMessage, Serializer serializer) {
            try {
                this.eventStoreClient.appendSnapshot(map(domainEventMessage, serializer)).whenComplete((confirmation, th) -> {
                    if (th != null) {
                        AxonDBEventStore.logger.warn("Error occurred while creating a snapshot", th);
                    } else if (confirmation != null) {
                        if (confirmation.getSuccess()) {
                            AxonDBEventStore.logger.info("Snapshot created");
                        } else {
                            AxonDBEventStore.logger.warn("Snapshot creation failed for unknown reason. Check server logs for details.");
                        }
                    }
                });
            } catch (Throwable th2) {
                throw AxonErrorMapping.convert(th2);
            }
        }

        protected Stream<? extends DomainEventData<?>> readEventData(String str, long j) {
            AxonDBEventStore.logger.debug("Reading events for aggregate id {}", str);
            GetAggregateEventsRequest.Builder aggregateId = GetAggregateEventsRequest.newBuilder().setAggregateId(str);
            if (j > 0) {
                aggregateId.setInitialSequence(j);
            } else if (j == -42) {
                aggregateId.setAllowSnapshots(true);
            }
            try {
                return this.eventStoreClient.listAggregateEvents(aggregateId.m300build()).map(GrpcBackedDomainEventData::new);
            } catch (Exception e) {
                throw AxonErrorMapping.convert(e);
            }
        }

        public TrackingEventStream openStream(TrackingToken trackingToken) {
            Assert.isTrue(trackingToken == null || (trackingToken instanceof GlobalSequenceTrackingToken), () -> {
                return "Invalid tracking token type. Must be GlobalSequenceTrackingToken.";
            });
            long globalIndex = trackingToken == null ? 0L : ((GlobalSequenceTrackingToken) trackingToken).getGlobalIndex() + 1;
            final EventBuffer eventBuffer = new EventBuffer(this.upcasterChain, getEventSerializer());
            AxonDBEventStore.logger.info("open stream: {}", Long.valueOf(globalIndex));
            FlowControllingStreamObserver flowControllingStreamObserver = new FlowControllingStreamObserver(this.eventStoreClient.listEvents(new StreamObserver<EventWithToken>() { // from class: io.axoniq.axondb.client.axon.AxonDBEventStore.AxonIQEventStorageEngine.1
                public void onNext(EventWithToken eventWithToken) {
                    AxonDBEventStore.logger.debug("Received event with token: {}", Long.valueOf(eventWithToken.getToken()));
                    eventBuffer.push(eventWithToken);
                }

                public void onError(Throwable th) {
                    AxonDBEventStore.logger.error("Failed to receive events", th);
                    eventBuffer.fail(new EventStoreException("Error while reading events from the server", th));
                }

                public void onCompleted() {
                }
            }), this.configuration, num -> {
                return GetEventsRequest.newBuilder().setNumberOfPermits(num.intValue()).m347build();
            }, getEventsRequest -> {
                return false;
            });
            flowControllingStreamObserver.onNext(GetEventsRequest.newBuilder().setTrackingToken(globalIndex).setNumberOfPermits(this.configuration.getInitialNrOfPermits().intValue()).m347build());
            eventBuffer.registerCloseListener(eventBuffer2 -> {
                flowControllingStreamObserver.onCompleted();
            });
            flowControllingStreamObserver.getClass();
            eventBuffer.registerConsumeListener(flowControllingStreamObserver::markConsumed);
            return eventBuffer;
        }

        public QueryResultStream query(String str, boolean z) {
            final QueryResultBuffer queryResultBuffer = new QueryResultBuffer();
            AxonDBEventStore.logger.debug("query: {}", str);
            FlowControllingStreamObserver flowControllingStreamObserver = new FlowControllingStreamObserver(this.eventStoreClient.query(new StreamObserver<QueryEventsResponse>() { // from class: io.axoniq.axondb.client.axon.AxonDBEventStore.AxonIQEventStorageEngine.2
                public void onNext(QueryEventsResponse queryEventsResponse) {
                    queryResultBuffer.push(queryEventsResponse);
                }

                public void onError(Throwable th) {
                    AxonDBEventStore.logger.info("Failed to receive events - {}", th.getMessage());
                    queryResultBuffer.fail(new EventStoreException("Error while reading query results from the server", th));
                }

                public void onCompleted() {
                    queryResultBuffer.close();
                }
            }), this.configuration, num -> {
                return QueryEventsRequest.newBuilder().setNumberOfPermits(num.intValue()).m535build();
            }, queryEventsRequest -> {
                return false;
            });
            flowControllingStreamObserver.onNext(QueryEventsRequest.newBuilder().setQuery(str).setNumberOfPermits(this.configuration.getInitialNrOfPermits().intValue()).setLiveEvents(z).m535build());
            queryResultBuffer.registerCloseListener(queryResultBuffer2 -> {
                flowControllingStreamObserver.onCompleted();
            });
            flowControllingStreamObserver.getClass();
            queryResultBuffer.registerConsumeListener(flowControllingStreamObserver::markConsumed);
            return queryResultBuffer;
        }

        public DomainEventStream readEvents(String str) {
            return DomainEventStream.of(readEventData(str, -42L).map(this::upcastAndDeserializeDomainEvent).filter((v0) -> {
                return Objects.nonNull(v0);
            }));
        }

        private DomainEventMessage<?> upcastAndDeserializeDomainEvent(DomainEventData<?> domainEventData) {
            DomainEventStream upcastAndDeserializeDomainEvents = EventUtils.upcastAndDeserializeDomainEvents(Stream.of(domainEventData), new GrpcMetaDataAwareSerializer(isSnapshot(domainEventData) ? getSerializer() : getEventSerializer()), this.upcasterChain, false);
            if (upcastAndDeserializeDomainEvents.hasNext()) {
                return upcastAndDeserializeDomainEvents.next();
            }
            return null;
        }

        private boolean isSnapshot(DomainEventData<?> domainEventData) {
            if (domainEventData instanceof GrpcBackedDomainEventData) {
                return ((GrpcBackedDomainEventData) domainEventData).isSnapshot();
            }
            return false;
        }

        public Optional<Long> lastSequenceNumberFor(String str) {
            try {
                ReadHighestSequenceNrResponse readHighestSequenceNrResponse = this.eventStoreClient.lastSequenceNumberFor(str).get();
                return readHighestSequenceNrResponse.getToSequenceNr() < 0 ? Optional.empty() : Optional.of(Long.valueOf(readHighestSequenceNrResponse.getToSequenceNr()));
            } catch (Throwable th) {
                throw AxonErrorMapping.convert(th);
            }
        }

        public TrackingToken createTailToken() {
            try {
                io.axoniq.axondb.grpc.TrackingToken trackingToken = this.eventStoreClient.getFirstToken().get();
                if (trackingToken.getToken() < 0) {
                    return null;
                }
                return new GlobalSequenceTrackingToken(trackingToken.getToken() - 1);
            } catch (Throwable th) {
                throw AxonErrorMapping.convert(th);
            }
        }

        public TrackingToken createHeadToken() {
            try {
                return new GlobalSequenceTrackingToken(this.eventStoreClient.getLastToken().get().getToken());
            } catch (Throwable th) {
                throw AxonErrorMapping.convert(th);
            }
        }

        public TrackingToken createTokenAt(Instant instant) {
            try {
                io.axoniq.axondb.grpc.TrackingToken trackingToken = this.eventStoreClient.getTokenAt(instant).get();
                if (trackingToken.getToken() < 0) {
                    return null;
                }
                return new GlobalSequenceTrackingToken(trackingToken.getToken() - 1);
            } catch (Throwable th) {
                throw AxonErrorMapping.convert(th);
            }
        }

        protected Stream<? extends TrackedEventData<?>> readEventData(TrackingToken trackingToken, boolean z) {
            throw new UnsupportedOperationException("This method is not optimized for the AxonIQ Event Store and should not be used");
        }

        protected Optional<? extends DomainEventData<?>> readSnapshotData(String str) {
            return Optional.empty();
        }
    }

    public AxonDBEventStore(AxonDBConfiguration axonDBConfiguration, Serializer serializer) {
        this(axonDBConfiguration, serializer, NoOpEventUpcaster.INSTANCE);
    }

    public AxonDBEventStore(AxonDBConfiguration axonDBConfiguration, Serializer serializer, EventUpcaster eventUpcaster) {
        super(new AxonIQEventStorageEngine(serializer, eventUpcaster, axonDBConfiguration, new AxonDBClient(axonDBConfiguration)));
    }

    public AxonDBEventStore(AxonDBConfiguration axonDBConfiguration, Serializer serializer, Serializer serializer2, EventUpcaster eventUpcaster) {
        super(new AxonIQEventStorageEngine(serializer, serializer2, eventUpcaster, axonDBConfiguration, new AxonDBClient(axonDBConfiguration)));
    }

    /* renamed from: openStream, reason: merged with bridge method [inline-methods] */
    public TrackingEventStream m103openStream(TrackingToken trackingToken) {
        return m102storageEngine().openStream(trackingToken);
    }

    public QueryResultStream query(String str, boolean z) {
        return m102storageEngine().query(str, z);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: storageEngine, reason: merged with bridge method [inline-methods] */
    public AxonIQEventStorageEngine m102storageEngine() {
        return super.storageEngine();
    }
}
