package org.elasticsoftware.akces.kafka;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import jakarta.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.common.errors.SerializationException;
import org.elasticsoftware.akces.aggregate.Aggregate;
import org.elasticsoftware.akces.aggregate.AggregateRuntime;
import org.elasticsoftware.akces.aggregate.AggregateState;
import org.elasticsoftware.akces.aggregate.AggregateStateType;
import org.elasticsoftware.akces.aggregate.CommandHandlerFunction;
import org.elasticsoftware.akces.aggregate.CommandType;
import org.elasticsoftware.akces.aggregate.DomainEventType;
import org.elasticsoftware.akces.aggregate.EventBridgeHandlerFunction;
import org.elasticsoftware.akces.aggregate.EventHandlerFunction;
import org.elasticsoftware.akces.aggregate.EventSourcingHandlerFunction;
import org.elasticsoftware.akces.aggregate.IndexParams;
import org.elasticsoftware.akces.aggregate.UpcastingHandlerFunction;
import org.elasticsoftware.akces.commands.Command;
import org.elasticsoftware.akces.commands.CommandBus;
import org.elasticsoftware.akces.errors.AggregateAlreadyExistsErrorEvent;
import org.elasticsoftware.akces.errors.CommandExecutionErrorEvent;
import org.elasticsoftware.akces.events.DomainEvent;
import org.elasticsoftware.akces.events.ErrorEvent;
import org.elasticsoftware.akces.gdpr.GDPRAnnotationUtils;
import org.elasticsoftware.akces.protocol.AggregateStateRecord;
import org.elasticsoftware.akces.protocol.CommandRecord;
import org.elasticsoftware.akces.protocol.DomainEventRecord;
import org.elasticsoftware.akces.protocol.PayloadEncoding;
import org.elasticsoftware.akces.protocol.ProtocolRecord;
import org.elasticsoftware.akces.schemas.KafkaSchemaRegistry;
import org.elasticsoftware.akces.schemas.SchemaException;
import org.everit.json.schema.ValidationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/elasticsoftware/akces/kafka/KafkaAggregateRuntime.class */
public class KafkaAggregateRuntime implements AggregateRuntime {
    private static final Logger log = LoggerFactory.getLogger(KafkaAggregateRuntime.class);
    private final AggregateStateType<?> stateType;
    private final Class<? extends Aggregate<?>> aggregateClass;
    private final CommandHandlerFunction<AggregateState, Command, DomainEvent> commandCreateHandler;
    private final EventHandlerFunction<AggregateState, DomainEvent, DomainEvent> eventCreateHandler;
    private final EventSourcingHandlerFunction<AggregateState, DomainEvent> createStateHandler;
    private final Map<Class<?>, DomainEventType<?>> domainEvents;
    private final Map<String, List<CommandType<?>>> commandTypes;
    private final Map<CommandType<?>, CommandHandlerFunction<AggregateState, Command, DomainEvent>> commandHandlers;
    private final Map<DomainEventType<?>, EventHandlerFunction<AggregateState, DomainEvent, DomainEvent>> eventHandlers;
    private final Map<DomainEventType<?>, EventSourcingHandlerFunction<AggregateState, DomainEvent>> eventSourcingHandlers;
    private final Map<DomainEventType<?>, EventBridgeHandlerFunction<AggregateState, DomainEvent>> eventBridgeHandlers;
    private final Map<AggregateStateType<?>, UpcastingHandlerFunction<AggregateState, AggregateState, AggregateStateType<AggregateState>, AggregateStateType<AggregateState>>> stateUpcastingHandlers;
    private final Map<DomainEventType<?>, UpcastingHandlerFunction<DomainEvent, DomainEvent, DomainEventType<DomainEvent>, DomainEventType<DomainEvent>>> eventUpcastingHandlers;
    private final boolean generateGDPRKeyOnCreate;
    private final boolean shouldHandlePIIData;
    private final KafkaSchemaRegistry schemaRegistry;
    private final ObjectMapper objectMapper;
    private final Map<Class<? extends DomainEvent>, JsonSchema> domainEventSchemas = new HashMap();
    private final Map<Class<? extends Command>, JsonSchema> commandSchemas = new HashMap();

    /* loaded from: input_file:org/elasticsoftware/akces/kafka/KafkaAggregateRuntime$Builder.class */
    public static class Builder {
        private KafkaSchemaRegistry schemaRegistry;
        private ObjectMapper objectMapper;
        private AggregateStateType<?> stateType;
        private Class<? extends Aggregate<?>> aggregateClass;
        private CommandHandlerFunction<AggregateState, Command, DomainEvent> commandCreateHandler;
        private EventHandlerFunction<AggregateState, DomainEvent, DomainEvent> eventCreateHandler;
        private EventSourcingHandlerFunction<AggregateState, DomainEvent> createStateHandler;
        private final Map<Class<?>, DomainEventType<?>> domainEvents = new HashMap();
        private final Map<String, List<CommandType<?>>> commandTypes = new HashMap();
        private final Map<CommandType<?>, CommandHandlerFunction<AggregateState, Command, DomainEvent>> commandHandlers = new HashMap();
        private final Map<DomainEventType<?>, EventHandlerFunction<AggregateState, DomainEvent, DomainEvent>> eventHandlers = new HashMap();
        private final Map<DomainEventType<?>, EventSourcingHandlerFunction<AggregateState, DomainEvent>> eventSourcingHandlers = new HashMap();
        private final Map<DomainEventType<?>, EventBridgeHandlerFunction<AggregateState, DomainEvent>> eventBridgeHandlers = new HashMap();
        private final Map<AggregateStateType<?>, UpcastingHandlerFunction<AggregateState, AggregateState, AggregateStateType<AggregateState>, AggregateStateType<AggregateState>>> stateUpcastingHandlers = new HashMap();
        private final Map<DomainEventType<?>, UpcastingHandlerFunction<DomainEvent, DomainEvent, DomainEventType<DomainEvent>, DomainEventType<DomainEvent>>> eventUpcastingHandlers = new HashMap();
        private boolean generateGDPRKeyOnCreate = false;

        public Builder setSchemaRegistry(KafkaSchemaRegistry kafkaSchemaRegistry) {
            this.schemaRegistry = kafkaSchemaRegistry;
            return this;
        }

        public Builder setObjectMapper(ObjectMapper objectMapper) {
            this.objectMapper = objectMapper;
            return this;
        }

        public Builder setStateType(AggregateStateType<?> aggregateStateType) {
            this.stateType = aggregateStateType;
            return this;
        }

        public Builder setAggregateClass(Class<? extends Aggregate<?>> cls) {
            this.aggregateClass = cls;
            return this;
        }

        public Builder setCommandCreateHandler(CommandHandlerFunction<AggregateState, Command, DomainEvent> commandHandlerFunction) {
            this.commandCreateHandler = commandHandlerFunction;
            return this;
        }

        public Builder setEventCreateHandler(EventHandlerFunction<AggregateState, DomainEvent, DomainEvent> eventHandlerFunction) {
            this.eventCreateHandler = eventHandlerFunction;
            return this;
        }

        public Builder setEventSourcingCreateHandler(EventSourcingHandlerFunction<AggregateState, DomainEvent> eventSourcingHandlerFunction) {
            this.createStateHandler = eventSourcingHandlerFunction;
            return this;
        }

        public Builder addDomainEvent(DomainEventType<?> domainEventType) {
            this.domainEvents.put(domainEventType.typeClass(), domainEventType);
            return this;
        }

        public Builder addCommand(CommandType<?> commandType) {
            this.commandTypes.computeIfAbsent(commandType.typeName(), str -> {
                return new ArrayList();
            }).add(commandType);
            return this;
        }

        public Builder addCommandHandler(CommandType<?> commandType, CommandHandlerFunction<AggregateState, Command, DomainEvent> commandHandlerFunction) {
            this.commandHandlers.put(commandType, commandHandlerFunction);
            return this;
        }

        public Builder addExternalEventHandler(DomainEventType<?> domainEventType, EventHandlerFunction<AggregateState, DomainEvent, DomainEvent> eventHandlerFunction) {
            this.eventHandlers.put(domainEventType, eventHandlerFunction);
            return this;
        }

        public Builder addEventSourcingHandler(DomainEventType<?> domainEventType, EventSourcingHandlerFunction<AggregateState, DomainEvent> eventSourcingHandlerFunction) {
            this.eventSourcingHandlers.put(domainEventType, eventSourcingHandlerFunction);
            return this;
        }

        public Builder addEventBridgeHandler(DomainEventType<?> domainEventType, EventBridgeHandlerFunction<AggregateState, DomainEvent> eventBridgeHandlerFunction) {
            this.eventBridgeHandlers.put(domainEventType, eventBridgeHandlerFunction);
            return this;
        }

        public Builder addStateUpcastingHandler(AggregateStateType<?> aggregateStateType, UpcastingHandlerFunction<AggregateState, AggregateState, AggregateStateType<AggregateState>, AggregateStateType<AggregateState>> upcastingHandlerFunction) {
            this.stateUpcastingHandlers.put(aggregateStateType, upcastingHandlerFunction);
            return this;
        }

        public Builder addEventUpcastingHandler(DomainEventType<?> domainEventType, UpcastingHandlerFunction<DomainEvent, DomainEvent, DomainEventType<DomainEvent>, DomainEventType<DomainEvent>> upcastingHandlerFunction) {
            this.eventUpcastingHandlers.put(domainEventType, upcastingHandlerFunction);
            return this;
        }

        public Builder setGenerateGDPRKeyOnCreate(boolean z) {
            this.generateGDPRKeyOnCreate = z;
            return this;
        }

        private Builder validate() {
            if (this.createStateHandler == null && this.eventCreateHandler == null) {
                throw new IllegalStateException("No create handler (either from command or event) configured");
            }
            List<DomainEventType<?>> list = this.domainEvents.values().stream().filter(domainEventType -> {
                return !domainEventType.external();
            }).filter(domainEventType2 -> {
                return (this.eventSourcingHandlers.containsKey(domainEventType2) || this.createStateHandler.getEventType().equals(domainEventType2) || this.eventUpcastingHandlers.containsKey(domainEventType2)) ? false : true;
            }).filter(domainEventType3 -> {
                return !ErrorEvent.class.isAssignableFrom(domainEventType3.typeClass());
            }).toList();
            if (!list.isEmpty()) {
                throw new IllegalStateException("The following domain events are missing EventSourcingHandlers: " + ((String) list.stream().map(domainEventType4 -> {
                    return domainEventType4.typeName() + " v" + domainEventType4.version();
                }).collect(Collectors.joining(", "))));
            }
            Iterator it = ((Map) this.domainEvents.values().stream().filter(domainEventType5 -> {
                return !domainEventType5.external();
            }).collect(Collectors.groupingBy((v0) -> {
                return v0.typeName();
            }))).entrySet().iterator();
            while (it.hasNext()) {
                List list2 = (List) ((Map.Entry) it.next()).getValue();
                list2.sort(Comparator.comparingInt((v0) -> {
                    return v0.version();
                }));
                if (((DomainEventType) list2.getFirst()).version() != 1) {
                    throw new IllegalStateException("Event type " + ((DomainEventType) list2.getFirst()).typeName() + " is missing version 1. Event versions must start at 1.");
                }
                if (list2.size() > 1) {
                    for (int i = 0; i < list2.size() - 1; i++) {
                        DomainEventType domainEventType6 = (DomainEventType) list2.get(i);
                        DomainEventType domainEventType7 = (DomainEventType) list2.get(i + 1);
                        if (domainEventType7.version() - domainEventType6.version() > 1) {
                            throw new IllegalStateException("Gap detected in versions for event type " + domainEventType6.typeName() + ": missing version(s) between v" + domainEventType6.version() + " and v" + domainEventType7.version());
                        }
                    }
                    for (int i2 = 0; i2 < list2.size() - 1; i2++) {
                        DomainEventType domainEventType8 = (DomainEventType) list2.get(i2);
                        if (!this.eventUpcastingHandlers.containsKey(domainEventType8) && !this.eventSourcingHandlers.containsKey(domainEventType8)) {
                            throw new IllegalStateException("Missing handler for " + domainEventType8.typeName() + " v" + domainEventType8.version() + ". All older versions of domain events must have either upcasting handlers or event sourcing handlers.");
                        }
                    }
                }
            }
            ArrayList arrayList = new ArrayList(this.stateUpcastingHandlers.keySet());
            arrayList.add(this.stateType);
            Iterator it2 = ((Map) arrayList.stream().collect(Collectors.groupingBy((v0) -> {
                return v0.typeName();
            }))).entrySet().iterator();
            while (it2.hasNext()) {
                List<AggregateStateType> list3 = (List) ((Map.Entry) it2.next()).getValue();
                list3.sort(Comparator.comparingInt((v0) -> {
                    return v0.version();
                }));
                if (((AggregateStateType) list3.getFirst()).version() != 1) {
                    throw new IllegalStateException("State type " + ((AggregateStateType) list3.getFirst()).typeName() + " is missing version 1. State versions must start at 1.");
                }
                AggregateStateType<?> aggregateStateType = this.stateType;
                for (AggregateStateType aggregateStateType2 : list3) {
                    if (aggregateStateType2.version() < aggregateStateType.version() && !this.stateUpcastingHandlers.containsKey(aggregateStateType2)) {
                        throw new IllegalStateException("Missing upcasting handler for " + aggregateStateType2.typeName() + " v" + aggregateStateType2.version() + ". All older versions of state must have upcasting handlers.");
                    }
                }
            }
            List<DomainEventType<?>> list4 = this.eventHandlers.keySet().stream().filter(domainEventType9 -> {
                return !domainEventType9.external();
            }).toList();
            if (list4.isEmpty()) {
                return this;
            }
            throw new IllegalStateException("Event handlers found for non-external events: " + ((String) list4.stream().map(domainEventType10 -> {
                return domainEventType10.typeName() + " v" + domainEventType10.version();
            }).collect(Collectors.joining(", "))));
        }

        public KafkaAggregateRuntime build() {
            return new KafkaAggregateRuntime(this.schemaRegistry, this.objectMapper, this.stateType, this.aggregateClass, this.commandCreateHandler, this.eventCreateHandler, this.createStateHandler, this.domainEvents, this.commandTypes, this.commandHandlers, this.eventHandlers, this.eventSourcingHandlers, this.eventBridgeHandlers, this.stateUpcastingHandlers, this.eventUpcastingHandlers, this.generateGDPRKeyOnCreate, this.domainEvents.values().stream().map((v0) -> {
                return v0.typeClass();
            }).anyMatch(GDPRAnnotationUtils::hasPIIDataAnnotation) || this.commandTypes.values().stream().flatMap((v0) -> {
                return v0.stream();
            }).map((v0) -> {
                return v0.typeClass();
            }).anyMatch(GDPRAnnotationUtils::hasPIIDataAnnotation));
        }

        public KafkaAggregateRuntime validateAndBuild() {
            return validate().build();
        }
    }

    private KafkaAggregateRuntime(KafkaSchemaRegistry kafkaSchemaRegistry, ObjectMapper objectMapper, AggregateStateType<?> aggregateStateType, Class<? extends Aggregate<?>> cls, CommandHandlerFunction<AggregateState, Command, DomainEvent> commandHandlerFunction, EventHandlerFunction<AggregateState, DomainEvent, DomainEvent> eventHandlerFunction, EventSourcingHandlerFunction<AggregateState, DomainEvent> eventSourcingHandlerFunction, Map<Class<?>, DomainEventType<?>> map, Map<String, List<CommandType<?>>> map2, Map<CommandType<?>, CommandHandlerFunction<AggregateState, Command, DomainEvent>> map3, Map<DomainEventType<?>, EventHandlerFunction<AggregateState, DomainEvent, DomainEvent>> map4, Map<DomainEventType<?>, EventSourcingHandlerFunction<AggregateState, DomainEvent>> map5, Map<DomainEventType<?>, EventBridgeHandlerFunction<AggregateState, DomainEvent>> map6, Map<AggregateStateType<?>, UpcastingHandlerFunction<AggregateState, AggregateState, AggregateStateType<AggregateState>, AggregateStateType<AggregateState>>> map7, Map<DomainEventType<?>, UpcastingHandlerFunction<DomainEvent, DomainEvent, DomainEventType<DomainEvent>, DomainEventType<DomainEvent>>> map8, boolean z, boolean z2) {
        this.stateType = aggregateStateType;
        this.aggregateClass = cls;
        this.commandCreateHandler = commandHandlerFunction;
        this.eventCreateHandler = eventHandlerFunction;
        this.createStateHandler = eventSourcingHandlerFunction;
        this.domainEvents = map;
        this.commandTypes = map2;
        this.commandHandlers = map3;
        this.eventHandlers = map4;
        this.eventSourcingHandlers = map5;
        this.eventBridgeHandlers = map6;
        this.stateUpcastingHandlers = map7;
        this.eventUpcastingHandlers = map8;
        this.generateGDPRKeyOnCreate = z;
        this.shouldHandlePIIData = z2;
        this.schemaRegistry = kafkaSchemaRegistry;
        this.objectMapper = objectMapper;
    }

    @Override // org.elasticsoftware.akces.aggregate.AggregateRuntime
    public String getName() {
        return this.stateType.typeName();
    }

    @Override // org.elasticsoftware.akces.aggregate.AggregateRuntime
    public Class<? extends Aggregate<?>> getAggregateClass() {
        return this.aggregateClass;
    }

    @Override // org.elasticsoftware.akces.aggregate.AggregateRuntime
    public void handleCommandRecord(CommandRecord commandRecord, Consumer<ProtocolRecord> consumer, BiConsumer<DomainEventRecord, IndexParams> biConsumer, Supplier<AggregateStateRecord> supplier) throws IOException {
        CommandType<?> commandType = getCommandType(commandRecord);
        try {
            if (commandType.create()) {
                if (supplier.get() != null) {
                    log.warn("Command {} wants to create a {} Aggregate with id {}, but the state already exists. Generating a AggregateAlreadyExistsError", new Object[]{commandRecord.name(), getName(), commandRecord.aggregateId()});
                    aggregateAlreadyExists(commandRecord, consumer);
                } else {
                    handleCreateCommand(commandType, commandRecord, consumer, biConsumer);
                }
            } else if (this.commandHandlers.containsKey(commandType)) {
                handleCommand(commandType, commandRecord, consumer, biConsumer, supplier);
            } else {
                commandExecutionError(commandRecord, consumer, "No handler found for command " + commandRecord.name());
            }
        } catch (Throwable th) {
            log.error("Exception while handling command, sending CommandExecutionError", th);
            commandExecutionError(commandRecord, consumer, th);
        }
    }

    private void aggregateAlreadyExists(ProtocolRecord protocolRecord, Consumer<ProtocolRecord> consumer) {
        AggregateAlreadyExistsErrorEvent aggregateAlreadyExistsErrorEvent = new AggregateAlreadyExistsErrorEvent(protocolRecord.aggregateId(), getName());
        DomainEventType<?> domainEventType = getDomainEventType(AggregateAlreadyExistsErrorEvent.class);
        consumer.accept(new DomainEventRecord(protocolRecord.tenantId(), domainEventType.typeName(), domainEventType.version(), serialize((DomainEvent) aggregateAlreadyExistsErrorEvent), getEncoding(domainEventType), aggregateAlreadyExistsErrorEvent.getAggregateId(), protocolRecord.correlationId(), -1L));
    }

    private void commandExecutionError(CommandRecord commandRecord, Consumer<ProtocolRecord> consumer, Throwable th) {
        commandExecutionError(commandRecord, consumer, th.getMessage());
    }

    private void commandExecutionError(CommandRecord commandRecord, Consumer<ProtocolRecord> consumer, String str) {
        CommandExecutionErrorEvent commandExecutionErrorEvent = new CommandExecutionErrorEvent(commandRecord.aggregateId(), getName(), commandRecord.name(), str);
        DomainEventType<?> domainEventType = getDomainEventType(AggregateAlreadyExistsErrorEvent.class);
        consumer.accept(new DomainEventRecord(commandRecord.tenantId(), domainEventType.typeName(), domainEventType.version(), serialize((DomainEvent) commandExecutionErrorEvent), getEncoding(domainEventType), commandExecutionErrorEvent.getAggregateId(), commandRecord.correlationId(), -1L));
    }

    private CommandType<?> getCommandType(CommandRecord commandRecord) {
        return this.commandTypes.getOrDefault(commandRecord.name(), Collections.emptyList()).stream().filter(commandType -> {
            return commandType.version() == commandRecord.version();
        }).findFirst().orElseThrow(RuntimeException::new);
    }

    private void indexDomainEventIfRequired(DomainEventRecord domainEventRecord, AggregateState aggregateState, BiConsumer<DomainEventRecord, IndexParams> biConsumer, boolean z) {
        if (this.stateType.indexed()) {
            biConsumer.accept(domainEventRecord, new IndexParams(this.stateType.indexName(), aggregateState.getIndexKey(), z));
        }
    }

    private void handleCreateCommand(CommandType<?> commandType, CommandRecord commandRecord, Consumer<ProtocolRecord> consumer, BiConsumer<DomainEventRecord, IndexParams> biConsumer) throws IOException {
        Iterator it = this.commandCreateHandler.apply(materialize(commandType, commandRecord), (AggregateState) null).iterator();
        DomainEvent domainEvent = (DomainEvent) it.next();
        DomainEventType<?> domainEventType = getDomainEventType(domainEvent.getClass());
        AggregateState apply = this.createStateHandler.getEventType().equals(domainEventType) ? this.createStateHandler.apply(domainEvent, (AggregateState) null) : this.createStateHandler.apply(upcast(domainEvent, domainEventType), (AggregateState) null);
        AggregateStateRecord aggregateStateRecord = new AggregateStateRecord(commandRecord.tenantId(), this.stateType.typeName(), this.stateType.version(), serialize(apply), getEncoding(this.stateType), apply.getAggregateId(), commandRecord.correlationId(), 1L);
        consumer.accept(aggregateStateRecord);
        DomainEventRecord domainEventRecord = new DomainEventRecord(commandRecord.tenantId(), domainEventType.typeName(), domainEventType.version(), serialize(domainEvent), getEncoding(domainEventType), domainEvent.getAggregateId(), commandRecord.correlationId(), aggregateStateRecord.generation());
        consumer.accept(domainEventRecord);
        indexDomainEventIfRequired(domainEventRecord, apply, biConsumer, true);
        AggregateStateRecord aggregateStateRecord2 = aggregateStateRecord;
        while (true) {
            AggregateStateRecord aggregateStateRecord3 = aggregateStateRecord2;
            if (!it.hasNext()) {
                return;
            } else {
                aggregateStateRecord2 = processDomainEvent(commandRecord.correlationId(), consumer, biConsumer, aggregateStateRecord3, (DomainEvent) it.next());
            }
        }
    }

    private void handleCommand(CommandType<?> commandType, CommandRecord commandRecord, Consumer<ProtocolRecord> consumer, BiConsumer<DomainEventRecord, IndexParams> biConsumer, Supplier<AggregateStateRecord> supplier) throws IOException {
        Command materialize = materialize(commandType, commandRecord);
        AggregateStateRecord aggregateStateRecord = supplier.get();
        Iterator it = this.commandHandlers.get(commandType).apply(materialize, materialize(aggregateStateRecord)).toList().iterator();
        while (it.hasNext()) {
            aggregateStateRecord = processDomainEvent(commandRecord.correlationId(), consumer, biConsumer, aggregateStateRecord, (DomainEvent) it.next());
        }
    }

    private void handleCreateEvent(DomainEventType<?> domainEventType, DomainEventRecord domainEventRecord, Consumer<ProtocolRecord> consumer, BiConsumer<DomainEventRecord, IndexParams> biConsumer) throws IOException {
        Iterator it = this.eventCreateHandler.apply(materialize(domainEventType, domainEventRecord), (AggregateState) null).iterator();
        DomainEvent domainEvent = (DomainEvent) it.next();
        AggregateState apply = this.createStateHandler.apply(domainEvent, (AggregateState) null);
        AggregateStateRecord aggregateStateRecord = new AggregateStateRecord(domainEventRecord.tenantId(), this.stateType.typeName(), this.stateType.version(), serialize(apply), getEncoding(this.stateType), apply.getAggregateId(), domainEventRecord.correlationId(), 1L);
        consumer.accept(aggregateStateRecord);
        DomainEventType<?> domainEventType2 = getDomainEventType(domainEvent.getClass());
        DomainEventRecord domainEventRecord2 = new DomainEventRecord(domainEventRecord.tenantId(), domainEventType2.typeName(), domainEventType2.version(), serialize(domainEvent), getEncoding(domainEventType2), domainEvent.getAggregateId(), domainEventRecord.correlationId(), aggregateStateRecord.generation());
        consumer.accept(domainEventRecord2);
        indexDomainEventIfRequired(domainEventRecord2, apply, biConsumer, true);
        AggregateStateRecord aggregateStateRecord2 = aggregateStateRecord;
        while (true) {
            AggregateStateRecord aggregateStateRecord3 = aggregateStateRecord2;
            if (!it.hasNext()) {
                return;
            }
            aggregateStateRecord2 = processDomainEvent(domainEventRecord.correlationId(), consumer, biConsumer, aggregateStateRecord3, (DomainEvent) it.next());
        }
    }

    private void handleEvent(DomainEventType<?> domainEventType, DomainEventRecord domainEventRecord, Consumer<ProtocolRecord> consumer, BiConsumer<DomainEventRecord, IndexParams> biConsumer, Supplier<AggregateStateRecord> supplier) throws IOException {
        DomainEvent materialize = materialize(domainEventType, domainEventRecord);
        AggregateStateRecord aggregateStateRecord = supplier.get();
        Iterator it = this.eventHandlers.get(domainEventType).apply(materialize, materialize(aggregateStateRecord)).toList().iterator();
        while (it.hasNext()) {
            aggregateStateRecord = processDomainEvent(domainEventRecord.correlationId(), consumer, biConsumer, aggregateStateRecord, (DomainEvent) it.next());
        }
    }

    private void handleBridgedEvent(DomainEventType<?> domainEventType, DomainEventRecord domainEventRecord, CommandBus commandBus) throws IOException {
        this.eventBridgeHandlers.get(domainEventType).apply(materialize(domainEventType, domainEventRecord), commandBus);
    }

    private AggregateStateRecord processDomainEvent(String str, Consumer<ProtocolRecord> consumer, BiConsumer<DomainEventRecord, IndexParams> biConsumer, AggregateStateRecord aggregateStateRecord, DomainEvent domainEvent) throws IOException {
        AggregateState materialize = materialize(aggregateStateRecord);
        DomainEventType<?> domainEventType = getDomainEventType(domainEvent.getClass());
        if (domainEvent instanceof ErrorEvent) {
            consumer.accept(new DomainEventRecord(aggregateStateRecord.tenantId(), domainEventType.typeName(), domainEventType.version(), serialize(domainEvent), getEncoding(domainEventType), domainEvent.getAggregateId(), str, -1L));
            return aggregateStateRecord;
        }
        AggregateState apply = this.eventSourcingHandlers.containsKey(domainEventType) ? this.eventSourcingHandlers.get(domainEventType).apply(domainEvent, materialize) : this.eventSourcingHandlers.get(getDomainEventType(upcast(domainEvent, domainEventType).getClass())).apply(upcast(domainEvent, domainEventType), materialize);
        AggregateStateRecord aggregateStateRecord2 = new AggregateStateRecord(aggregateStateRecord.tenantId(), this.stateType.typeName(), this.stateType.version(), serialize(apply), getEncoding(this.stateType), aggregateStateRecord.aggregateId(), str, aggregateStateRecord.generation() + 1);
        consumer.accept(aggregateStateRecord2);
        DomainEventRecord domainEventRecord = new DomainEventRecord(aggregateStateRecord.tenantId(), domainEventType.typeName(), domainEventType.version(), serialize(domainEvent), getEncoding(domainEventType), domainEvent.getAggregateId(), str, aggregateStateRecord2.generation());
        consumer.accept(domainEventRecord);
        indexDomainEventIfRequired(domainEventRecord, apply, biConsumer, false);
        return aggregateStateRecord2;
    }

    @Override // org.elasticsoftware.akces.aggregate.AggregateRuntime
    public void handleExternalDomainEventRecord(DomainEventRecord domainEventRecord, Consumer<ProtocolRecord> consumer, BiConsumer<DomainEventRecord, IndexParams> biConsumer, Supplier<AggregateStateRecord> supplier, CommandBus commandBus) throws IOException {
        DomainEventType<?> domainEventType = getDomainEventType(domainEventRecord);
        if (domainEventType != null) {
            DomainEventType<?> upcastedType = upcastedType(domainEventType);
            if (this.eventCreateHandler != null && this.eventCreateHandler.getEventType().equals(upcastedType(upcastedType))) {
                if (supplier.get() == null) {
                    handleCreateEvent(domainEventType, domainEventRecord, consumer, biConsumer);
                    return;
                } else {
                    log.warn("External DomainEvent {} wants to create a {} Aggregate with id {}, but the state already exists. Generate a AggregateAlreadyExistsError", new Object[]{domainEventRecord.name(), getName(), domainEventRecord.aggregateId()});
                    aggregateAlreadyExists(domainEventRecord, consumer);
                    return;
                }
            }
            if (this.eventHandlers.containsKey(upcastedType)) {
                handleEvent(domainEventType, domainEventRecord, consumer, biConsumer, supplier);
            } else if (this.eventBridgeHandlers.containsKey(upcastedType)) {
                handleBridgedEvent(domainEventType, domainEventRecord, commandBus);
            }
        }
    }

    @Nullable
    private DomainEventType<?> getDomainEventType(DomainEventRecord domainEventRecord) {
        return (DomainEventType) this.domainEvents.entrySet().stream().filter(entry -> {
            return ((DomainEventType) entry.getValue()).external();
        }).filter(entry2 -> {
            return ((DomainEventType) entry2.getValue()).typeName().equals(domainEventRecord.name());
        }).filter(entry3 -> {
            return ((DomainEventType) entry3.getValue()).version() <= domainEventRecord.version();
        }).max(Comparator.comparingInt(entry4 -> {
            return ((DomainEventType) entry4.getValue()).version();
        })).map((v0) -> {
            return v0.getValue();
        }).orElse(null);
    }

    @Override // org.elasticsoftware.akces.aggregate.AggregateRuntime
    public Collection<DomainEventType<?>> getAllDomainEventTypes() {
        return this.domainEvents.values();
    }

    @Override // org.elasticsoftware.akces.aggregate.AggregateRuntime
    public Collection<DomainEventType<?>> getProducedDomainEventTypes() {
        return (Collection) this.domainEvents.values().stream().filter(domainEventType -> {
            return !domainEventType.external();
        }).collect(Collectors.toSet());
    }

    @Override // org.elasticsoftware.akces.aggregate.AggregateRuntime
    public Collection<DomainEventType<?>> getExternalDomainEventTypes() {
        return (Collection) this.domainEvents.values().stream().filter((v0) -> {
            return v0.external();
        }).collect(Collectors.toSet());
    }

    @Override // org.elasticsoftware.akces.aggregate.AggregateRuntime
    public Collection<CommandType<?>> getAllCommandTypes() {
        return (Collection) this.commandTypes.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toSet());
    }

    @Override // org.elasticsoftware.akces.aggregate.AggregateRuntime
    public Collection<CommandType<?>> getLocalCommandTypes() {
        return (Collection) this.commandTypes.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).filter(commandType -> {
            return !commandType.external();
        }).collect(Collectors.toSet());
    }

    @Override // org.elasticsoftware.akces.aggregate.AggregateRuntime
    public Collection<CommandType<?>> getExternalCommandTypes() {
        return (Collection) this.commandTypes.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).filter((v0) -> {
            return v0.external();
        }).collect(Collectors.toSet());
    }

    private DomainEventType<?> getDomainEventType(Class<?> cls) {
        return this.domainEvents.get(cls);
    }

    @Override // org.elasticsoftware.akces.aggregate.AggregateRuntime
    public CommandType<?> getLocalCommandType(String str, int i) {
        return this.commandTypes.getOrDefault(str, Collections.emptyList()).stream().filter(commandType -> {
            return commandType.version() == i;
        }).findFirst().orElseThrow(() -> {
            return new IllegalArgumentException("No CommandType found for type " + str + " and version " + i);
        });
    }

    @Override // org.elasticsoftware.akces.aggregate.AggregateRuntime
    public boolean shouldGenerateGDPRKey(CommandRecord commandRecord) {
        return getCommandType(commandRecord).create() && this.generateGDPRKeyOnCreate;
    }

    @Override // org.elasticsoftware.akces.aggregate.AggregateRuntime
    public boolean shouldGenerateGDPRKey(DomainEventRecord domainEventRecord) {
        return ((Boolean) Optional.ofNullable(getDomainEventType(domainEventRecord)).map(domainEventType -> {
            return Boolean.valueOf(domainEventType.create() && this.generateGDPRKeyOnCreate);
        }).orElse(false)).booleanValue();
    }

    @Override // org.elasticsoftware.akces.aggregate.AggregateRuntime
    public boolean requiresGDPRContext(DomainEventRecord domainEventRecord) {
        DomainEventType<?> domainEventType = getDomainEventType(domainEventRecord);
        if (domainEventType == null) {
            return false;
        }
        return domainEventType.piiData() || (!this.eventBridgeHandlers.containsKey(domainEventType) && this.stateType.piiData());
    }

    @Override // org.elasticsoftware.akces.aggregate.AggregateRuntime
    public boolean requiresGDPRContext(CommandRecord commandRecord) {
        return this.stateType.piiData() || getCommandType(commandRecord).piiData();
    }

    @Override // org.elasticsoftware.akces.aggregate.AggregateRuntime
    public boolean shouldHandlePIIData() {
        return this.shouldHandlePIIData || this.generateGDPRKeyOnCreate;
    }

    @Override // org.elasticsoftware.akces.aggregate.AggregateRuntime
    public void registerAndValidate(DomainEventType<?> domainEventType, boolean z) throws SchemaException {
        this.domainEventSchemas.put(domainEventType.typeClass(), this.schemaRegistry.registerAndValidate(domainEventType, z));
    }

    @Override // org.elasticsoftware.akces.aggregate.AggregateRuntime
    public void registerAndValidate(CommandType<?> commandType, boolean z) throws SchemaException {
        if (this.commandSchemas.containsKey(commandType.typeClass())) {
            return;
        }
        this.commandSchemas.put(commandType.typeClass(), this.schemaRegistry.registerAndValidate(commandType, z));
        if (commandType.external()) {
            addCommand(commandType);
        }
    }

    @Override // org.elasticsoftware.akces.aggregate.AggregateRuntime
    public Command materialize(CommandType<?> commandType, CommandRecord commandRecord) throws IOException {
        return (Command) this.objectMapper.readValue(commandRecord.payload(), commandType.typeClass());
    }

    private DomainEvent materialize(DomainEventType<?> domainEventType, DomainEventRecord domainEventRecord) throws IOException {
        DomainEvent domainEvent = (DomainEvent) this.objectMapper.readValue(domainEventRecord.payload(), domainEventType.typeClass());
        return this.eventUpcastingHandlers.containsKey(domainEventType) ? upcast(domainEvent, domainEventType) : domainEvent;
    }

    private DomainEvent upcast(DomainEvent domainEvent, DomainEventType<?> domainEventType) {
        UpcastingHandlerFunction<DomainEvent, DomainEvent, DomainEventType<DomainEvent>, DomainEventType<DomainEvent>> upcastingHandlerFunction = this.eventUpcastingHandlers.get(domainEventType);
        DomainEvent domainEvent2 = (DomainEvent) upcastingHandlerFunction.apply(domainEvent);
        return this.eventUpcastingHandlers.containsKey(upcastingHandlerFunction.getOutputType()) ? upcast(domainEvent2, (DomainEventType<?>) upcastingHandlerFunction.getOutputType()) : domainEvent2;
    }

    private DomainEventType<?> upcastedType(DomainEventType<?> domainEventType) {
        return this.eventUpcastingHandlers.containsKey(domainEventType) ? upcastedType((DomainEventType) this.eventUpcastingHandlers.get(domainEventType).getOutputType()) : domainEventType;
    }

    private AggregateState materialize(AggregateStateRecord aggregateStateRecord) throws IOException {
        AggregateStateType<?> aggregateStateType = getAggregateStateType(aggregateStateRecord);
        AggregateState aggregateState = (AggregateState) this.objectMapper.readValue(aggregateStateRecord.payload(), getAggregateStateType(aggregateStateRecord).typeClass());
        return !aggregateStateType.equals(this.stateType) ? upcast(aggregateState, aggregateStateType) : aggregateState;
    }

    private AggregateState upcast(AggregateState aggregateState, AggregateStateType<?> aggregateStateType) {
        UpcastingHandlerFunction<AggregateState, AggregateState, AggregateStateType<AggregateState>, AggregateStateType<AggregateState>> upcastingHandlerFunction = this.stateUpcastingHandlers.get(aggregateStateType);
        AggregateState aggregateState2 = (AggregateState) upcastingHandlerFunction.apply(aggregateState);
        return !upcastingHandlerFunction.getOutputType().equals(this.stateType) ? upcast(aggregateState2, (AggregateStateType<?>) upcastingHandlerFunction.getOutputType()) : aggregateState2;
    }

    private byte[] serialize(AggregateState aggregateState) throws IOException {
        return this.objectMapper.writeValueAsBytes(aggregateState);
    }

    private byte[] serialize(DomainEvent domainEvent) throws SerializationException {
        JsonNode jsonNode = (JsonNode) this.objectMapper.convertValue(domainEvent, JsonNode.class);
        try {
            this.domainEventSchemas.get(domainEvent.getClass()).validate(jsonNode);
            return this.objectMapper.writeValueAsBytes(jsonNode);
        } catch (JsonProcessingException e) {
            throw new SerializationException("Serialization Failed while Serializing DomainEventClass " + domainEvent.getClass().getName(), e);
        } catch (ValidationException e2) {
            throw new SerializationException("Validation Failed while Serializing DomainEventClass " + domainEvent.getClass().getName(), e2);
        }
    }

    @Override // org.elasticsoftware.akces.aggregate.AggregateRuntime
    public byte[] serialize(Command command) throws SerializationException {
        try {
            this.commandSchemas.get(command.getClass()).validate((JsonNode) this.objectMapper.convertValue(command, JsonNode.class));
            return this.objectMapper.writeValueAsBytes(command);
        } catch (JsonProcessingException e) {
            throw new SerializationException("Serialization Failed while Serializing CommandClass " + command.getClass().getName(), e);
        } catch (ValidationException e2) {
            throw new SerializationException("Validation Failed while Serializing CommandClass " + command.getClass().getName(), e2);
        }
    }

    private PayloadEncoding getEncoding(CommandType<?> commandType) {
        return PayloadEncoding.JSON;
    }

    private PayloadEncoding getEncoding(DomainEventType<?> domainEventType) {
        return PayloadEncoding.JSON;
    }

    private PayloadEncoding getEncoding(AggregateStateType<?> aggregateStateType) {
        return PayloadEncoding.JSON;
    }

    private AggregateStateType<?> getAggregateStateType(AggregateStateRecord aggregateStateRecord) {
        return (this.stateType.typeName().equals(aggregateStateRecord.name()) && this.stateType.version() == aggregateStateRecord.version()) ? this.stateType : this.stateUpcastingHandlers.keySet().stream().filter(aggregateStateType -> {
            return aggregateStateType.version() == aggregateStateRecord.version();
        }).findAny().orElseThrow(() -> {
            return new IllegalStateException("Aggregate state type for " + aggregateStateRecord.name() + " with version " + aggregateStateRecord.version() + " does not exist");
        });
    }

    private void addCommand(CommandType<?> commandType) {
        this.commandTypes.computeIfAbsent(commandType.typeName(), str -> {
            return new ArrayList();
        }).add(commandType);
    }
}
