package org.elasticsoftware.akces.query.models;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import jakarta.annotation.Nullable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.WakeupException;
import org.elasticsoftware.akces.gdpr.GDPRContext;
import org.elasticsoftware.akces.gdpr.GDPRContextHolder;
import org.elasticsoftware.akces.gdpr.GDPRContextRepository;
import org.elasticsoftware.akces.gdpr.GDPRContextRepositoryFactory;
import org.elasticsoftware.akces.protocol.DomainEventRecord;
import org.elasticsoftware.akces.protocol.ProtocolRecord;
import org.elasticsoftware.akces.query.QueryModel;
import org.elasticsoftware.akces.query.QueryModelState;
import org.elasticsoftware.akces.schemas.SchemaException;
import org.elasticsoftware.akces.util.HostUtils;
import org.elasticsoftware.akces.util.KafkaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.boot.availability.AvailabilityChangeEvent;
import org.springframework.boot.availability.LivenessState;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaAdminOperations;

/* loaded from: input_file:org/elasticsoftware/akces/query/models/AkcesQueryModelController.class */
public class AkcesQueryModelController extends Thread implements AutoCloseable, ApplicationContextAware, QueryModels {
    private static final Logger logger = LoggerFactory.getLogger(AkcesQueryModelController.class);
    private final Map<Class<? extends QueryModel>, QueryModelRuntime> enabledRuntimes;
    private final Map<Class<? extends QueryModel>, QueryModelRuntime> disabledRuntimes;
    private final KafkaAdminOperations kafkaAdmin;
    private final ConsumerFactory<String, ProtocolRecord> consumerFactory;
    private final GDPRContextRepositoryFactory gdprContextRepositoryFactory;
    private final Map<TopicPartition, GDPRContextRepository> gdprContextRepositories;
    private final BlockingQueue<HydrationRequest<?>> commandQueue;
    private final Map<TopicPartition, HydrationExecution<?>> hydrationExecutions;
    private final CountDownLatch shutdownLatch;
    private volatile AkcesQueryModelControllerState processState;
    private final Set<TopicPartition> gdprKeyPartitions;
    private Map<TopicPartition, Long> initializedEndOffsets;
    private final HashFunction hashFunction;
    private int totalPartitions;
    private ApplicationContext applicationContext;
    private final Cache<String, CachedQueryModelState<?>> queryModelStateCache;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsoftware/akces/query/models/AkcesQueryModelController$CachedQueryModelState.class */
    public static final class CachedQueryModelState<S extends QueryModelState> extends Record {
        private final S state;
        private final Long offset;

        private CachedQueryModelState(S s, Long l) {
            this.state = s;
            this.offset = l;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, CachedQueryModelState.class), CachedQueryModelState.class, "state;offset", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$CachedQueryModelState;->state:Lorg/elasticsoftware/akces/query/QueryModelState;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$CachedQueryModelState;->offset:Ljava/lang/Long;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, CachedQueryModelState.class), CachedQueryModelState.class, "state;offset", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$CachedQueryModelState;->state:Lorg/elasticsoftware/akces/query/QueryModelState;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$CachedQueryModelState;->offset:Ljava/lang/Long;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, CachedQueryModelState.class, Object.class), CachedQueryModelState.class, "state;offset", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$CachedQueryModelState;->state:Lorg/elasticsoftware/akces/query/QueryModelState;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$CachedQueryModelState;->offset:Ljava/lang/Long;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public S state() {
            return this.state;
        }

        public Long offset() {
            return this.offset;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationExecution.class */
    public static final class HydrationExecution<S extends QueryModelState> extends Record {
        private final QueryModelRuntime<S> runtime;
        private final CompletableFuture<S> completableFuture;
        private final String id;
        private final S currentState;
        private final Long currentOffset;
        private final TopicPartition indexPartition;
        private final Long endOffset;

        private HydrationExecution(QueryModelRuntime<S> queryModelRuntime, CompletableFuture<S> completableFuture, String str, S s, Long l, TopicPartition topicPartition, Long l2) {
            this.runtime = queryModelRuntime;
            this.completableFuture = completableFuture;
            this.id = str;
            this.currentState = s;
            this.currentOffset = l;
            this.indexPartition = topicPartition;
            this.endOffset = l2;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public HydrationExecution<S> withEndOffset(Long l) {
            return new HydrationExecution<>(this.runtime, this.completableFuture, this.id, this.currentState, this.currentOffset, this.indexPartition, l);
        }

        HydrationExecution<S> withCurrentState(S s) {
            return new HydrationExecution<>(this.runtime, this.completableFuture, this.id, s, this.currentOffset, this.indexPartition, this.endOffset);
        }

        void complete() {
            if (this.currentState != null) {
                this.completableFuture.complete(this.currentState);
            } else {
                this.completableFuture.completeExceptionally(new QueryModelIdNotFoundException(this.runtime.getQueryModelClass(), this.id));
            }
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, HydrationExecution.class), HydrationExecution.class, "runtime;completableFuture;id;currentState;currentOffset;indexPartition;endOffset", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationExecution;->runtime:Lorg/elasticsoftware/akces/query/models/QueryModelRuntime;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationExecution;->completableFuture:Ljava/util/concurrent/CompletableFuture;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationExecution;->id:Ljava/lang/String;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationExecution;->currentState:Lorg/elasticsoftware/akces/query/QueryModelState;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationExecution;->currentOffset:Ljava/lang/Long;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationExecution;->indexPartition:Lorg/apache/kafka/common/TopicPartition;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationExecution;->endOffset:Ljava/lang/Long;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, HydrationExecution.class), HydrationExecution.class, "runtime;completableFuture;id;currentState;currentOffset;indexPartition;endOffset", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationExecution;->runtime:Lorg/elasticsoftware/akces/query/models/QueryModelRuntime;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationExecution;->completableFuture:Ljava/util/concurrent/CompletableFuture;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationExecution;->id:Ljava/lang/String;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationExecution;->currentState:Lorg/elasticsoftware/akces/query/QueryModelState;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationExecution;->currentOffset:Ljava/lang/Long;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationExecution;->indexPartition:Lorg/apache/kafka/common/TopicPartition;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationExecution;->endOffset:Ljava/lang/Long;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, HydrationExecution.class, Object.class), HydrationExecution.class, "runtime;completableFuture;id;currentState;currentOffset;indexPartition;endOffset", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationExecution;->runtime:Lorg/elasticsoftware/akces/query/models/QueryModelRuntime;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationExecution;->completableFuture:Ljava/util/concurrent/CompletableFuture;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationExecution;->id:Ljava/lang/String;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationExecution;->currentState:Lorg/elasticsoftware/akces/query/QueryModelState;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationExecution;->currentOffset:Ljava/lang/Long;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationExecution;->indexPartition:Lorg/apache/kafka/common/TopicPartition;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationExecution;->endOffset:Ljava/lang/Long;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public QueryModelRuntime<S> runtime() {
            return this.runtime;
        }

        public CompletableFuture<S> completableFuture() {
            return this.completableFuture;
        }

        public String id() {
            return this.id;
        }

        public S currentState() {
            return this.currentState;
        }

        public Long currentOffset() {
            return this.currentOffset;
        }

        public TopicPartition indexPartition() {
            return this.indexPartition;
        }

        public Long endOffset() {
            return this.endOffset;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationRequest.class */
    public static final class HydrationRequest<S extends QueryModelState> extends Record {
        private final QueryModelRuntime<S> runtime;
        private final CompletableFuture<S> completableFuture;
        private final String id;
        private final S currentState;
        private final Long currentOffset;

        private HydrationRequest(QueryModelRuntime<S> queryModelRuntime, CompletableFuture<S> completableFuture, String str, S s, Long l) {
            this.runtime = queryModelRuntime;
            this.completableFuture = completableFuture;
            this.id = str;
            this.currentState = s;
            this.currentOffset = l;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, HydrationRequest.class), HydrationRequest.class, "runtime;completableFuture;id;currentState;currentOffset", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationRequest;->runtime:Lorg/elasticsoftware/akces/query/models/QueryModelRuntime;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationRequest;->completableFuture:Ljava/util/concurrent/CompletableFuture;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationRequest;->id:Ljava/lang/String;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationRequest;->currentState:Lorg/elasticsoftware/akces/query/QueryModelState;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationRequest;->currentOffset:Ljava/lang/Long;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, HydrationRequest.class), HydrationRequest.class, "runtime;completableFuture;id;currentState;currentOffset", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationRequest;->runtime:Lorg/elasticsoftware/akces/query/models/QueryModelRuntime;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationRequest;->completableFuture:Ljava/util/concurrent/CompletableFuture;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationRequest;->id:Ljava/lang/String;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationRequest;->currentState:Lorg/elasticsoftware/akces/query/QueryModelState;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationRequest;->currentOffset:Ljava/lang/Long;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, HydrationRequest.class, Object.class), HydrationRequest.class, "runtime;completableFuture;id;currentState;currentOffset", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationRequest;->runtime:Lorg/elasticsoftware/akces/query/models/QueryModelRuntime;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationRequest;->completableFuture:Ljava/util/concurrent/CompletableFuture;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationRequest;->id:Ljava/lang/String;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationRequest;->currentState:Lorg/elasticsoftware/akces/query/QueryModelState;", "FIELD:Lorg/elasticsoftware/akces/query/models/AkcesQueryModelController$HydrationRequest;->currentOffset:Ljava/lang/Long;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public QueryModelRuntime<S> runtime() {
            return this.runtime;
        }

        public CompletableFuture<S> completableFuture() {
            return this.completableFuture;
        }

        public String id() {
            return this.id;
        }

        public S currentState() {
            return this.currentState;
        }

        public Long currentOffset() {
            return this.currentOffset;
        }
    }

    public AkcesQueryModelController(KafkaAdminOperations kafkaAdminOperations, ConsumerFactory<String, ProtocolRecord> consumerFactory, GDPRContextRepositoryFactory gDPRContextRepositoryFactory) {
        super("AkcesQueryModelController");
        this.enabledRuntimes = new ConcurrentHashMap();
        this.disabledRuntimes = new ConcurrentHashMap();
        this.gdprContextRepositories = new HashMap();
        this.commandQueue = new LinkedBlockingQueue();
        this.hydrationExecutions = new HashMap();
        this.shutdownLatch = new CountDownLatch(1);
        this.processState = AkcesQueryModelControllerState.INITIALIZING;
        this.gdprKeyPartitions = new HashSet();
        this.initializedEndOffsets = Collections.emptyMap();
        this.hashFunction = Hashing.murmur3_32_fixed();
        this.queryModelStateCache = Caffeine.newBuilder().maximumSize(1000L).build();
        this.kafkaAdmin = kafkaAdminOperations;
        this.consumerFactory = consumerFactory;
        this.gdprContextRepositoryFactory = gDPRContextRepositoryFactory;
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
        this.enabledRuntimes.putAll((Map) applicationContext.getBeansOfType(QueryModelRuntime.class).values().stream().collect(Collectors.toMap(queryModelRuntime -> {
            return queryModelRuntime.getQueryModelClass();
        }, queryModelRuntime2 -> {
            return queryModelRuntime2;
        })));
    }

    private <S extends QueryModelState> QueryModelRuntime<S> getEnabledRuntime(Class<? extends QueryModel<S>> cls) {
        return this.enabledRuntimes.get(cls);
    }

    private <S extends QueryModelState> boolean isRuntimeDisabled(Class<? extends QueryModel<S>> cls) {
        return this.disabledRuntimes.containsKey(cls);
    }

    @Override // org.elasticsoftware.akces.query.models.QueryModels
    public <S extends QueryModelState> CompletionStage<S> getHydratedState(Class<? extends QueryModel<S>> cls, String str) {
        QueryModelRuntime<S> enabledRuntime = getEnabledRuntime(cls);
        if (enabledRuntime == null) {
            return isRuntimeDisabled(cls) ? CompletableFuture.failedFuture(new QueryModelExecutionDisabledException(cls)) : CompletableFuture.failedFuture(new QueryModelNotFoundException(cls));
        }
        CachedQueryModelState cachedQueryModelState = (CachedQueryModelState) this.queryModelStateCache.getIfPresent(enabledRuntime.getName() + "-" + str);
        QueryModelState state = cachedQueryModelState != null ? cachedQueryModelState.state() : null;
        Long offset = cachedQueryModelState != null ? cachedQueryModelState.offset() : null;
        CompletableFuture completableFuture = new CompletableFuture();
        this.commandQueue.add(new HydrationRequest<>(enabledRuntime, completableFuture, str, state, offset));
        return completableFuture;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        Consumer<String, ProtocolRecord> createConsumer = this.consumerFactory.createConsumer(HostUtils.getHostName() + "-AkcesQueryModelController", HostUtils.getHostName() + "-AkcesQueryModelController", (String) null);
        while (this.processState != AkcesQueryModelControllerState.SHUTTING_DOWN) {
            try {
                process(createConsumer);
            } catch (Throwable th) {
                if (createConsumer != null) {
                    try {
                        createConsumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        logger.info("AkcesQueryModelController is shutting down");
        ArrayList arrayList = new ArrayList();
        this.commandQueue.drainTo(arrayList);
        arrayList.forEach(hydrationRequest -> {
            hydrationRequest.completableFuture.completeExceptionally(new QueryModelExecutionCancelledException(hydrationRequest.runtime().getQueryModelClass()));
        });
        Iterator<HydrationExecution<?>> it = this.hydrationExecutions.values().iterator();
        while (it.hasNext()) {
            HydrationExecution<?> next = it.next();
            ((HydrationExecution) next).completableFuture.completeExceptionally(new QueryModelExecutionCancelledException(next.runtime().getQueryModelClass()));
            it.remove();
        }
        Iterator<GDPRContextRepository> it2 = this.gdprContextRepositories.values().iterator();
        while (it2.hasNext()) {
            try {
                it2.next().close();
            } catch (IOException e) {
            }
        }
        this.applicationContext.publishEvent(new AvailabilityChangeEvent(this, LivenessState.BROKEN));
        if (createConsumer != null) {
            createConsumer.close();
        }
        this.shutdownLatch.countDown();
    }

    /* JADX WARN: Type inference failed for: r4v7, types: [org.elasticsoftware.akces.query.QueryModelState] */
    private void process(Consumer<String, ProtocolRecord> consumer) {
        if (this.processState == AkcesQueryModelControllerState.RUNNING) {
            try {
                Map<? extends TopicPartition, ? extends HydrationExecution<?>> processHydrationRequests = processHydrationRequests(consumer);
                this.hydrationExecutions.putAll(processHydrationRequests);
                consumer.assign(Stream.concat(this.hydrationExecutions.keySet().stream(), this.gdprKeyPartitions.stream()).toList());
                if (!processHydrationRequests.isEmpty()) {
                    logger.info("Processing {} new HydrationExecutions", Integer.valueOf(processHydrationRequests.size()));
                    consumer.endOffsets(processHydrationRequests.keySet()).forEach((topicPartition, l) -> {
                        this.hydrationExecutions.computeIfPresent(topicPartition, (topicPartition, hydrationExecution) -> {
                            return hydrationExecution.withEndOffset(l);
                        });
                    });
                }
                processHydrationRequests.forEach((topicPartition2, hydrationExecution) -> {
                    if (hydrationExecution.currentOffset() != null) {
                        consumer.seek(topicPartition2, hydrationExecution.currentOffset().longValue());
                    } else if (this.hydrationExecutions.get(topicPartition2).endOffset().longValue() > 0) {
                        consumer.seek(topicPartition2, 0L);
                    } else {
                        consumer.seekToBeginning(List.of(topicPartition2));
                    }
                });
                if (!this.hydrationExecutions.isEmpty()) {
                    logger.info("Processing HydrationExecutions {}", this.hydrationExecutions);
                }
                ConsumerRecords poll = consumer.poll(Duration.ofMillis(10L));
                if (!poll.isEmpty()) {
                    logger.info("Processing {}", poll.partitions());
                    if (!this.gdprKeyPartitions.isEmpty()) {
                        List<TopicPartition> list = poll.partitions().stream().filter(topicPartition3 -> {
                            return topicPartition3.topic().equals("Akces-GDPRKeys");
                        }).toList();
                        logger.info("Processing {} GDPRKeyPartitions", Integer.valueOf(list.size()));
                        for (TopicPartition topicPartition4 : list) {
                            this.gdprContextRepositories.get(topicPartition4).process(poll.records(topicPartition4));
                        }
                    }
                    if (!this.hydrationExecutions.isEmpty()) {
                        List<TopicPartition> list2 = poll.partitions().stream().filter(topicPartition5 -> {
                            return !topicPartition5.topic().equals("Akces-GDPRKeys");
                        }).toList();
                        logger.info("Processing {} indexPartitions", Integer.valueOf(list2.size()));
                        for (TopicPartition topicPartition6 : list2) {
                            this.hydrationExecutions.computeIfPresent(topicPartition6, (topicPartition7, hydrationExecution2) -> {
                                return processHydrationExecution(hydrationExecution2.runtime().shouldHandlePIIData() ? getGDPRContextRepository(hydrationExecution2.id()) : null, hydrationExecution2, poll.records(topicPartition6));
                            });
                        }
                    }
                }
                Iterator<HydrationExecution<?>> it = this.hydrationExecutions.values().iterator();
                while (it.hasNext()) {
                    HydrationExecution<?> next = it.next();
                    if (next.endOffset().longValue() <= consumer.position(next.indexPartition())) {
                        logger.info("HydrationExecution on index {} with id {} and runtime {} is complete: indexPartition {} endOffset {} consumerPosition {}", new Object[]{next.runtime().getIndexName(), next.id(), next.runtime().getName(), next.indexPartition(), next.endOffset(), Long.valueOf(consumer.position(next.indexPartition()))});
                        next.complete();
                        it.remove();
                        this.queryModelStateCache.put(next.runtime().getName() + "-" + next.id(), new CachedQueryModelState(next.currentState(), Long.valueOf(consumer.position(next.indexPartition()))));
                    }
                }
                return;
            } catch (WakeupException | InterruptException e) {
                return;
            } catch (KafkaException e2) {
                logger.error("Unrecoverable exception in AkcesQueryModelController while {}", this.processState, e2);
                this.processState = AkcesQueryModelControllerState.SHUTTING_DOWN;
                return;
            }
        }
        if (this.processState == AkcesQueryModelControllerState.LOADING_GDPR_KEYS) {
            try {
                ConsumerRecords poll2 = consumer.poll(Duration.ofMillis(10L));
                for (TopicPartition topicPartition8 : this.gdprKeyPartitions) {
                    this.gdprContextRepositories.get(topicPartition8).process(poll2.records(topicPartition8));
                    this.initializedEndOffsets.computeIfPresent(topicPartition8, (topicPartition9, l2) -> {
                        if (l2.longValue() <= consumer.position(topicPartition8)) {
                            return null;
                        }
                        return l2;
                    });
                }
                if (poll2.isEmpty() && this.initializedEndOffsets.isEmpty()) {
                    this.processState = AkcesQueryModelControllerState.RUNNING;
                }
                return;
            } catch (KafkaException e3) {
                logger.error("Unrecoverable exception in AkcesQueryModelController while {}", this.processState, e3);
                this.processState = AkcesQueryModelControllerState.SHUTTING_DOWN;
                return;
            } catch (WakeupException | InterruptException e4) {
                return;
            }
        }
        if (this.processState == AkcesQueryModelControllerState.INITIALIZING) {
            try {
                Iterator<QueryModelRuntime> it2 = this.enabledRuntimes.values().iterator();
                while (it2.hasNext()) {
                    QueryModelRuntime next2 = it2.next();
                    try {
                        next2.validateDomainEventSchemas();
                        logger.info("Enabling {} QueryModelRuntime", next2.getName());
                    } catch (SchemaException e5) {
                        logger.error("SchemaException while validating DomainEventSchemas for QueryModel {}. Disabling QueryModel", next2.getName(), e5);
                        it2.remove();
                        this.disabledRuntimes.put(next2.getQueryModelClass(), next2);
                    }
                }
                if (this.enabledRuntimes.isEmpty() && !this.disabledRuntimes.isEmpty()) {
                    logger.error("No QueryModelRuntimes enabled. This is an error. Shutting down");
                    this.processState = AkcesQueryModelControllerState.SHUTTING_DOWN;
                } else if (this.enabledRuntimes.values().stream().anyMatch((v0) -> {
                    return v0.shouldHandlePIIData();
                })) {
                    logger.info("Loading GDPR keys");
                    this.totalPartitions = ((TopicDescription) this.kafkaAdmin.describeTopics(new String[]{"Akces-Control"}).get("Akces-Control")).partitions().size();
                    for (int i = 0; i < this.totalPartitions; i++) {
                        this.gdprKeyPartitions.add(new TopicPartition("Akces-GDPRKeys", i));
                    }
                    this.gdprKeyPartitions.forEach(topicPartition10 -> {
                        this.gdprContextRepositories.put(topicPartition10, this.gdprContextRepositoryFactory.create("AkcesQueryModelController", Integer.valueOf(topicPartition10.partition())));
                    });
                    consumer.assign(this.gdprKeyPartitions);
                    this.gdprKeyPartitions.forEach(topicPartition11 -> {
                        consumer.seek(topicPartition11, this.gdprContextRepositories.get(topicPartition11).getOffset() + 1);
                    });
                    this.initializedEndOffsets = consumer.endOffsets(this.gdprKeyPartitions);
                    this.processState = AkcesQueryModelControllerState.LOADING_GDPR_KEYS;
                } else {
                    this.processState = AkcesQueryModelControllerState.RUNNING;
                }
            } catch (KafkaException e6) {
                logger.error("Unrecoverable exception in AkcesQueryModelController while {}", this.processState, e6);
                this.processState = AkcesQueryModelControllerState.SHUTTING_DOWN;
            } catch (WakeupException | InterruptException e7) {
            }
        }
    }

    private GDPRContextRepository getGDPRContextRepository(String str) {
        return this.gdprContextRepositories.get(new TopicPartition("Akces-GDPRKeys", Integer.valueOf(Math.abs(this.hashFunction.hashString(str, StandardCharsets.UTF_8).asInt()) % this.totalPartitions).intValue()));
    }

    /* JADX WARN: Type inference failed for: r7v1, types: [org.elasticsoftware.akces.query.QueryModelState] */
    private Map<TopicPartition, HydrationExecution<?>> processHydrationRequests(Consumer<String, ProtocolRecord> consumer) {
        HashMap hashMap = new HashMap();
        try {
            HydrationRequest<?> poll = this.commandQueue.poll(100L, TimeUnit.MILLISECONDS);
            while (poll != null) {
                logger.info("Processing HydrationRequest on index {} with id {} and runtime {}", new Object[]{poll.runtime().getIndexName(), poll.id(), poll.runtime().getName()});
                QueryModelRuntime<?> runtime = poll.runtime();
                String indexTopicName = KafkaUtils.getIndexTopicName(runtime.getIndexName(), poll.id());
                if (consumer.partitionsFor(indexTopicName).isEmpty()) {
                    logger.warn("KafkaTopic {} not found for HydrationRequest on index {} with id {}", new Object[]{indexTopicName, poll.runtime().getIndexName(), poll.id()});
                    poll.completableFuture().completeExceptionally(new QueryModelIdNotFoundException(poll.runtime().getQueryModelClass(), poll.id()));
                } else {
                    TopicPartition topicPartition = new TopicPartition(indexTopicName, 0);
                    hashMap.put(topicPartition, new HydrationExecution(runtime, poll.completableFuture(), poll.id(), poll.currentState(), poll.currentOffset(), topicPartition, null));
                }
                poll = this.commandQueue.poll();
            }
            return hashMap;
        } catch (InterruptedException e) {
            logger.warn("Interrupted while processing HydrationRequests", e);
            Thread.currentThread().interrupt();
            return hashMap;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <S extends QueryModelState> HydrationExecution<S> processHydrationExecution(@Nullable GDPRContextRepository gDPRContextRepository, HydrationExecution<S> hydrationExecution, List<ConsumerRecord<String, ProtocolRecord>> list) {
        if (gDPRContextRepository != null) {
            try {
                try {
                    GDPRContext gDPRContext = gDPRContextRepository.get(hydrationExecution.id());
                    logger.info("Setting GDPRContext {} for aggregateId {}", gDPRContext.getClass().getSimpleName(), hydrationExecution.id());
                    GDPRContextHolder.setCurrentGDPRContext(gDPRContext);
                } catch (IOException e) {
                    logger.error("Exception while processing HydrationExecution", e);
                    ((HydrationExecution) hydrationExecution).completableFuture.completeExceptionally(new QueryModelExecutionException("Exception while processing HydrationExecution", hydrationExecution.runtime().getQueryModelClass(), e));
                    GDPRContextHolder.resetCurrentGDPRContext();
                    return null;
                }
            } catch (Throwable th) {
                GDPRContextHolder.resetCurrentGDPRContext();
                throw th;
            }
        }
        logger.info("Processing {} records HydrationExecution on index {} with id {} and runtime {}", new Object[]{Integer.valueOf(list.size()), hydrationExecution.runtime().getIndexName(), hydrationExecution.id(), hydrationExecution.runtime().getName()});
        HydrationExecution<S> withCurrentState = hydrationExecution.withCurrentState(hydrationExecution.runtime().apply(list.stream().map(consumerRecord -> {
            return (DomainEventRecord) consumerRecord.value();
        }).toList(), hydrationExecution.currentState()));
        GDPRContextHolder.resetCurrentGDPRContext();
        return withCurrentState;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.processState = AkcesQueryModelControllerState.SHUTTING_DOWN;
        try {
            if (this.shutdownLatch.await(10L, TimeUnit.SECONDS)) {
                logger.info("AkcesQueryModelController has been shutdown");
            } else {
                logger.warn("AkcesQueryModelController did not shutdown within 10 seconds");
            }
        } catch (InterruptedException e) {
        }
    }

    public boolean isRunning() {
        return this.processState == AkcesQueryModelControllerState.RUNNING;
    }
}
