package org.elasticsoftware.akces.query.database;

import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.WakeupException;
import org.elasticsoftware.akces.aggregate.DomainEventType;
import org.elasticsoftware.akces.control.AkcesRegistry;
import org.elasticsoftware.akces.gdpr.GDPRContextHolder;
import org.elasticsoftware.akces.gdpr.GDPRContextRepository;
import org.elasticsoftware.akces.gdpr.GDPRContextRepositoryFactory;
import org.elasticsoftware.akces.protocol.ProtocolRecord;
import org.elasticsoftware.akces.util.HostUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.ConsumerFactory;

/* loaded from: input_file:org/elasticsoftware/akces/query/database/DatabaseModelPartition.class */
public class DatabaseModelPartition implements Runnable, AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(DatabaseModelPartition.class);
    private final ConsumerFactory<String, ProtocolRecord> consumerFactory;
    private final DatabaseModelRuntime runtime;
    private final GDPRContextRepository gdprContextRepository;
    private final Integer id;
    private final Collection<DomainEventType<?>> externalDomainEventTypes;
    private Consumer<String, ProtocolRecord> consumer;
    private final AkcesRegistry akcesRegistry;
    private final TopicPartition gdprKeyPartition;
    private final Set<TopicPartition> externalEventPartitions = new HashSet();
    private final CountDownLatch shutdownLatch = new CountDownLatch(1);
    private Map<TopicPartition, Long> initializedEndOffsets = Collections.emptyMap();
    private volatile DatabaseModelPartitionState processState = DatabaseModelPartitionState.INITIALIZING;

    public DatabaseModelPartition(ConsumerFactory<String, ProtocolRecord> consumerFactory, DatabaseModelRuntime databaseModelRuntime, GDPRContextRepositoryFactory gDPRContextRepositoryFactory, Integer num, TopicPartition topicPartition, Collection<DomainEventType<?>> collection, AkcesRegistry akcesRegistry) {
        this.akcesRegistry = akcesRegistry;
        this.consumerFactory = consumerFactory;
        this.runtime = databaseModelRuntime;
        this.id = num;
        this.externalDomainEventTypes = collection;
        this.gdprContextRepository = gDPRContextRepositoryFactory.create(databaseModelRuntime.getName(), num);
        this.gdprKeyPartition = topicPartition;
    }

    public Integer getId() {
        return this.id;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            try {
                logger.info("Starting DatabaseModelPartition {} of {}DatabaseModel", this.id, this.runtime.getName());
                this.consumer = this.consumerFactory.createConsumer(this.runtime.getName() + "DatabaseModel-partition-" + this.id, this.runtime.getName() + "DatabaseModel-partition-" + this.id + "-" + HostUtils.getHostName(), (String) null);
                Stream<DomainEventType<?>> stream = this.externalDomainEventTypes.stream();
                AkcesRegistry akcesRegistry = this.akcesRegistry;
                Objects.requireNonNull(akcesRegistry);
                this.externalEventPartitions.addAll((Collection) ((Set) stream.map(akcesRegistry::resolveTopic).collect(Collectors.toSet())).stream().map(str -> {
                    return new TopicPartition(str, this.id.intValue());
                }).collect(Collectors.toSet()));
                this.consumer.assign(Stream.concat(this.runtime.shouldHandlePIIData() ? Stream.of(this.gdprKeyPartition) : Stream.empty(), this.externalEventPartitions.stream()).toList());
                logger.info("Assigned partitions {} for DatabaseModelPartition {} of {}DatabaseModel", new Object[]{this.consumer.assignment(), this.id, this.runtime.getName()});
                while (this.processState != DatabaseModelPartitionState.SHUTTING_DOWN) {
                    process();
                }
                logger.info("Shutting down DatabaseModelPartition {} of {}DatabaseModel", this.id, this.runtime.getName());
            } catch (Throwable th) {
                logger.error("Unexpected error in DatabaseModelPartition {} of {}DatabaseModel", new Object[]{this.id, this.runtime.getName(), th});
                try {
                    this.consumer.close(Duration.ofSeconds(5L));
                } catch (KafkaException e) {
                    logger.error("Error closing consumer/producer", e);
                } catch (InterruptException e2) {
                }
                try {
                    this.gdprContextRepository.close();
                } catch (IOException e3) {
                    logger.error("Error closing gdpr context repository", e3);
                }
            }
            logger.info("Finished Shutting down DatabaseModelPartition {} of {}DatabaseModel", this.id, this.runtime.getName());
            this.shutdownLatch.countDown();
        } finally {
            try {
                this.consumer.close(Duration.ofSeconds(5L));
            } catch (KafkaException e4) {
                logger.error("Error closing consumer/producer", e4);
            } catch (InterruptException e5) {
            }
            try {
                this.gdprContextRepository.close();
            } catch (IOException e6) {
                logger.error("Error closing gdpr context repository", e6);
            }
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.processState = DatabaseModelPartitionState.SHUTTING_DOWN;
        try {
            if (this.shutdownLatch.await(10L, TimeUnit.SECONDS)) {
                logger.info("DatabaseModelPartition={} has been shutdown", this.id);
            } else {
                logger.warn("DatabaseModelPartition={} did not shutdown within 10 seconds", this.id);
            }
        } catch (InterruptedException e) {
        }
    }

    private void setupGDPRContext(String str, String str2) {
        GDPRContextHolder.setCurrentGDPRContext(this.gdprContextRepository.get(str2));
    }

    private void tearDownGDPRContext() {
        GDPRContextHolder.resetCurrentGDPRContext();
    }

    private void process() {
        try {
            if (this.processState == DatabaseModelPartitionState.PROCESSING) {
                ConsumerRecords<String, ProtocolRecord> poll = this.consumer.poll(Duration.ofMillis(10L));
                if (!poll.isEmpty()) {
                    processRecords(poll);
                }
            } else if (this.processState == DatabaseModelPartitionState.LOADING_GDPR_KEYS) {
                ConsumerRecords poll2 = this.consumer.poll(Duration.ofMillis(10L));
                this.gdprContextRepository.process(poll2.records(this.gdprKeyPartition));
                if (poll2.isEmpty() && this.initializedEndOffsets.getOrDefault(this.gdprKeyPartition, 0L).longValue() <= this.consumer.position(this.gdprKeyPartition)) {
                    this.consumer.resume(this.externalEventPartitions);
                    this.processState = DatabaseModelPartitionState.PROCESSING;
                }
            } else if (this.processState == DatabaseModelPartitionState.INITIALIZING) {
                Logger logger2 = logger;
                Object[] objArr = new Object[3];
                objArr[0] = this.id;
                objArr[1] = this.runtime.getName();
                objArr[2] = this.runtime.shouldHandlePIIData() ? "Handle PII Data" : "Not Handle PII Data";
                logger2.info("Initializing DatabaseModelPartition {} of {}Aggregate. Will {}", objArr);
                this.runtime.initializeOffsets(this.externalEventPartitions).forEach((topicPartition, l) -> {
                    this.consumer.seek(topicPartition, l.longValue() + 1);
                });
                if (this.runtime.shouldHandlePIIData()) {
                    long offset = this.gdprContextRepository.getOffset();
                    if (offset >= 0) {
                        logger.info("Resuming GDPRKeys from offset {} for DatabaseModelPartition {} of {}DatabaseModel", new Object[]{Long.valueOf(offset), this.id, this.runtime.getName()});
                        this.consumer.seek(this.gdprKeyPartition, this.gdprContextRepository.getOffset() + 1);
                    } else {
                        this.consumer.seekToBeginning(Collections.singletonList(this.gdprKeyPartition));
                    }
                    this.initializedEndOffsets = this.consumer.endOffsets(List.of(this.gdprKeyPartition));
                    logger.info("Loading GDPR Keys for DatabaseModelPartition {} of {}DatabaseModel", this.id, this.runtime.getName());
                    this.consumer.pause(this.externalEventPartitions);
                    this.processState = DatabaseModelPartitionState.LOADING_GDPR_KEYS;
                } else {
                    this.consumer.resume(this.externalEventPartitions);
                    this.processState = DatabaseModelPartitionState.PROCESSING;
                }
            }
        } catch (KafkaException e) {
            logger.error("Fatal error during " + String.valueOf(this.processState) + " phase, shutting down DatabaseModelPartition " + this.id + " of " + this.runtime.getName() + "DatabaseModel", e);
            this.processState = DatabaseModelPartitionState.SHUTTING_DOWN;
        } catch (WakeupException | InterruptException e2) {
        }
    }

    private void processRecords(ConsumerRecords<String, ProtocolRecord> consumerRecords) {
        if (logger.isTraceEnabled()) {
            logger.trace("Processing {} records in a single batch", Integer.valueOf(consumerRecords.count()));
            logger.trace("Processing {} gdpr key records", Integer.valueOf(consumerRecords.records(this.gdprKeyPartition).size()));
            if (!this.externalEventPartitions.isEmpty()) {
                logger.trace("Processing {} external event records", Integer.valueOf(this.externalEventPartitions.stream().map(topicPartition -> {
                    return Integer.valueOf(consumerRecords.records(topicPartition).size());
                }).mapToInt((v0) -> {
                    return v0.intValue();
                }).sum()));
            }
        }
        List records = consumerRecords.records(this.gdprKeyPartition);
        if (!records.isEmpty()) {
            this.gdprContextRepository.process(records);
        }
        try {
            DatabaseModelRuntime databaseModelRuntime = this.runtime;
            Stream filter = consumerRecords.partitions().stream().filter(topicPartition2 -> {
                return !topicPartition2.equals(this.gdprKeyPartition);
            });
            Function function = topicPartition3 -> {
                return topicPartition3;
            };
            Objects.requireNonNull(consumerRecords);
            Map<TopicPartition, List<ConsumerRecord<String, ProtocolRecord>>> map = (Map) filter.collect(Collectors.toMap(function, consumerRecords::records));
            GDPRContextRepository gDPRContextRepository = this.gdprContextRepository;
            Objects.requireNonNull(gDPRContextRepository);
            databaseModelRuntime.apply(map, gDPRContextRepository::get);
        } catch (IOException e) {
            logger.error("IOException while processing events", e);
        }
    }

    public boolean isProcessing() {
        return this.processState == DatabaseModelPartitionState.PROCESSING;
    }
}
