package org.elasticsoftware.akces.query.database;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.elasticsoftware.akces.aggregate.DomainEventType;
import org.elasticsoftware.akces.annotations.DatabaseModelInfo;
import org.elasticsoftware.akces.events.DomainEvent;
import org.elasticsoftware.akces.gdpr.GDPRAnnotationUtils;
import org.elasticsoftware.akces.gdpr.GDPRContext;
import org.elasticsoftware.akces.gdpr.GDPRContextHolder;
import org.elasticsoftware.akces.protocol.DomainEventRecord;
import org.elasticsoftware.akces.protocol.ProtocolRecord;
import org.elasticsoftware.akces.query.DatabaseModel;
import org.elasticsoftware.akces.query.DatabaseModelEventHandlerFunction;
import org.elasticsoftware.akces.schemas.KafkaSchemaRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/elasticsoftware/akces/query/database/KafkaDatabaseModelRuntime.class */
public class KafkaDatabaseModelRuntime implements DatabaseModelRuntime {
    private static final Logger logger = LoggerFactory.getLogger(KafkaDatabaseModelRuntime.class);
    private final KafkaSchemaRegistry schemaRegistry;
    private final ObjectMapper objectMapper;
    private final Map<Class<?>, DomainEventType<?>> domainEvents;
    private final Map<DomainEventType<?>, DatabaseModelEventHandlerFunction<DomainEvent>> databaseModelEventHandlers;
    private final DatabaseModel databaseModel;
    private final String name;
    private final int version;
    private final boolean shouldHandlePIIData;

    /* loaded from: input_file:org/elasticsoftware/akces/query/database/KafkaDatabaseModelRuntime$Builder.class */
    public static class Builder {
        private final Map<Class<?>, DomainEventType<?>> domainEvents = new HashMap();
        private final Map<DomainEventType<?>, DatabaseModelEventHandlerFunction<DomainEvent>> databaseModelEventHandlers = new HashMap();
        private KafkaSchemaRegistry schemaRegistry;
        private ObjectMapper objectMapper;
        private DatabaseModelInfo databaseModelInfo;
        private DatabaseModel databaseModel;

        public Builder setSchemaRegistry(KafkaSchemaRegistry kafkaSchemaRegistry) {
            this.schemaRegistry = kafkaSchemaRegistry;
            return this;
        }

        public Builder setObjectMapper(ObjectMapper objectMapper) {
            this.objectMapper = objectMapper;
            return this;
        }

        public Builder setDatabaseModelInfo(DatabaseModelInfo databaseModelInfo) {
            this.databaseModelInfo = databaseModelInfo;
            return this;
        }

        public Builder setDatabaseModel(DatabaseModel databaseModel) {
            this.databaseModel = databaseModel;
            return this;
        }

        public Builder addDomainEvent(DomainEventType<?> domainEventType) {
            this.domainEvents.put(domainEventType.typeClass(), domainEventType);
            return this;
        }

        public Builder addDatabaseModelEventHandler(DomainEventType<?> domainEventType, DatabaseModelEventHandlerFunction<DomainEvent> databaseModelEventHandlerFunction) {
            this.databaseModelEventHandlers.put(domainEventType, databaseModelEventHandlerFunction);
            return this;
        }

        public KafkaDatabaseModelRuntime build() {
            return new KafkaDatabaseModelRuntime(this.schemaRegistry, this.objectMapper, this.domainEvents, this.databaseModelEventHandlers, this.databaseModel, this.databaseModelInfo.value(), this.databaseModelInfo.version(), this.domainEvents.values().stream().map((v0) -> {
                return v0.typeClass();
            }).anyMatch(GDPRAnnotationUtils::hasPIIDataAnnotation));
        }
    }

    private KafkaDatabaseModelRuntime(KafkaSchemaRegistry kafkaSchemaRegistry, ObjectMapper objectMapper, Map<Class<?>, DomainEventType<?>> map, Map<DomainEventType<?>, DatabaseModelEventHandlerFunction<DomainEvent>> map2, DatabaseModel databaseModel, String str, int i, boolean z) {
        this.schemaRegistry = kafkaSchemaRegistry;
        this.objectMapper = objectMapper;
        this.domainEvents = map;
        this.databaseModelEventHandlers = map2;
        this.databaseModel = databaseModel;
        this.name = str;
        this.version = i;
        this.shouldHandlePIIData = z;
    }

    @Override // org.elasticsoftware.akces.query.database.DatabaseModelRuntime
    public String getName() {
        return this.name + "-v" + this.version;
    }

    @Override // org.elasticsoftware.akces.query.database.DatabaseModelRuntime
    public Map<TopicPartition, Long> initializeOffsets(Collection<TopicPartition> collection) {
        Map offsets = this.databaseModel.getOffsets((Set) collection.stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.toSet()));
        return (Map) collection.stream().collect(Collectors.toMap(topicPartition -> {
            return topicPartition;
        }, topicPartition2 -> {
            return (Long) offsets.getOrDefault(topicPartition2.toString(), 0L);
        }));
    }

    @Override // org.elasticsoftware.akces.query.database.DatabaseModelRuntime
    public void apply(Map<TopicPartition, List<ConsumerRecord<String, ProtocolRecord>>> map, Function<String, GDPRContext> function) throws IOException {
        DatabaseModelEventHandlerFunction<DomainEvent> databaseModelEventHandlerFunction;
        Object startTransaction = this.databaseModel.startTransaction();
        Iterator<TopicPartition> it = map.keySet().iterator();
        while (it.hasNext()) {
            for (DomainEventRecord domainEventRecord : map.get(it.next()).stream().map(consumerRecord -> {
                return (DomainEventRecord) consumerRecord.value();
            }).toList()) {
                DomainEventType<?> domainEventType = getDomainEventType(domainEventRecord);
                if (domainEventType != null && (databaseModelEventHandlerFunction = this.databaseModelEventHandlers.get(domainEventType)) != null) {
                    try {
                        if (Boolean.TRUE.equals(Boolean.valueOf(domainEventType.piiData()))) {
                            GDPRContextHolder.setCurrentGDPRContext(function.apply(domainEventRecord.aggregateId()));
                        }
                        databaseModelEventHandlerFunction.accept(materialize(domainEventType, domainEventRecord));
                        GDPRContextHolder.resetCurrentGDPRContext();
                    } catch (Throwable th) {
                        GDPRContextHolder.resetCurrentGDPRContext();
                        throw th;
                    }
                }
            }
        }
        this.databaseModel.commitTransaction(startTransaction, (Map) map.entrySet().stream().collect(Collectors.toMap(entry -> {
            return ((TopicPartition) entry.getKey()).toString();
        }, entry2 -> {
            return Long.valueOf(((List) entry2.getValue()).stream().mapToLong((v0) -> {
                return v0.offset();
            }).max().orElse(0L));
        })));
    }

    @Override // org.elasticsoftware.akces.query.database.DatabaseModelRuntime
    public void validateDomainEventSchemas() {
        Iterator<DomainEventType<?>> it = this.domainEvents.values().iterator();
        while (it.hasNext()) {
            this.schemaRegistry.validate(it.next());
        }
    }

    @Override // org.elasticsoftware.akces.query.database.DatabaseModelRuntime
    public Collection<DomainEventType<?>> getDomainEventTypes() {
        return this.domainEvents.values();
    }

    @Override // org.elasticsoftware.akces.query.database.DatabaseModelRuntime
    public boolean shouldHandlePIIData() {
        return this.shouldHandlePIIData;
    }

    private DomainEvent materialize(DomainEventType<?> domainEventType, DomainEventRecord domainEventRecord) throws IOException {
        return (DomainEvent) this.objectMapper.readValue(domainEventRecord.payload(), domainEventType.typeClass());
    }

    private DomainEventType<?> getDomainEventType(DomainEventRecord domainEventRecord) {
        return (DomainEventType) this.domainEvents.entrySet().stream().filter(entry -> {
            return ((DomainEventType) entry.getValue()).external();
        }).filter(entry2 -> {
            return ((DomainEventType) entry2.getValue()).typeName().equals(domainEventRecord.name());
        }).filter(entry3 -> {
            return ((DomainEventType) entry3.getValue()).version() <= domainEventRecord.version();
        }).max(Comparator.comparingInt(entry4 -> {
            return ((DomainEventType) entry4.getValue()).version();
        })).map((v0) -> {
            return v0.getValue();
        }).orElse(null);
    }

    public String toString() {
        return "KafkaDatabaseModelRuntime{" + getName() + "}";
    }
}
