package org.elasticsoftware.akces.query.database;

import jakarta.annotation.Nonnull;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.WakeupException;
import org.elasticsoftware.akces.aggregate.CommandType;
import org.elasticsoftware.akces.aggregate.DomainEventType;
import org.elasticsoftware.akces.commands.Command;
import org.elasticsoftware.akces.control.AggregateServiceDomainEventType;
import org.elasticsoftware.akces.control.AggregateServiceRecord;
import org.elasticsoftware.akces.control.AkcesControlRecord;
import org.elasticsoftware.akces.control.AkcesRegistry;
import org.elasticsoftware.akces.gdpr.GDPRContextRepositoryFactory;
import org.elasticsoftware.akces.protocol.ProtocolRecord;
import org.elasticsoftware.akces.util.HostUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.boot.availability.AvailabilityChangeEvent;
import org.springframework.boot.availability.LivenessState;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;

/* loaded from: input_file:org/elasticsoftware/akces/query/database/AkcesDatabaseModelController.class */
public class AkcesDatabaseModelController extends Thread implements AutoCloseable, ConsumerRebalanceListener, ApplicationContextAware, AkcesRegistry {
    private static final Logger logger = LoggerFactory.getLogger(AkcesDatabaseModelController.class);
    private final ConsumerFactory<String, ProtocolRecord> consumerFactory;
    private final ConsumerFactory<String, AkcesControlRecord> controlRecordConsumerFactory;
    private final DatabaseModelRuntime databaseModelRuntime;
    private final Map<Integer, DatabaseModelPartition> databaseModelPartitions;
    private final ExecutorService executorService;
    private final Map<String, AggregateServiceRecord> aggregateServices;
    private final GDPRContextRepositoryFactory gdprContextRepositoryFactory;
    private final List<TopicPartition> partitionsToAssign;
    private final List<TopicPartition> partitionsToRevoke;
    private final CountDownLatch shutdownLatch;
    private Consumer<String, AkcesControlRecord> controlConsumer;
    private volatile AkcesDatabaseModelControllerState processState;
    private ApplicationContext applicationContext;

    public AkcesDatabaseModelController(ConsumerFactory<String, ProtocolRecord> consumerFactory, ConsumerFactory<String, AkcesControlRecord> consumerFactory2, GDPRContextRepositoryFactory gDPRContextRepositoryFactory, DatabaseModelRuntime databaseModelRuntime) {
        super(databaseModelRuntime.getName() + "-AkcesDatabaseModelController");
        this.databaseModelPartitions = new HashMap();
        this.aggregateServices = new ConcurrentHashMap();
        this.partitionsToAssign = new ArrayList();
        this.partitionsToRevoke = new ArrayList();
        this.shutdownLatch = new CountDownLatch(1);
        this.processState = AkcesDatabaseModelControllerState.INITIALIZING;
        this.consumerFactory = consumerFactory;
        this.controlRecordConsumerFactory = consumerFactory2;
        this.gdprContextRepositoryFactory = gDPRContextRepositoryFactory;
        this.databaseModelRuntime = databaseModelRuntime;
        this.executorService = Executors.newCachedThreadPool(new CustomizableThreadFactory(databaseModelRuntime.getName() + "DatabaseModelPartitionThread-"));
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            this.controlConsumer = this.controlRecordConsumerFactory.createConsumer(this.databaseModelRuntime.getName() + "DatabaseModel-Akces-Control", this.databaseModelRuntime.getName() + "-" + HostUtils.getHostName() + "DatabaseModel-Akces-Control", (String) null);
            this.controlConsumer.subscribe(List.of("Akces-Control"), this);
            while (this.processState != AkcesDatabaseModelControllerState.SHUTTING_DOWN) {
                process();
            }
            logger.info("Closing {} DatabaseModelPartitions", Integer.valueOf(this.databaseModelPartitions.size()));
            this.databaseModelPartitions.values().forEach(databaseModelPartition -> {
                if (databaseModelPartition != null) {
                    try {
                        databaseModelPartition.close();
                    } catch (Exception e) {
                        logger.error("Error closing DatabaseModelPartition " + databaseModelPartition.getId(), e);
                    }
                }
            });
            try {
                this.controlConsumer.close(Duration.ofSeconds(5L));
            } catch (InterruptException e) {
            } catch (KafkaException e2) {
                logger.error("Error closing controlConsumer", e2);
            }
            this.applicationContext.publishEvent(new AvailabilityChangeEvent(this, LivenessState.BROKEN));
            this.shutdownLatch.countDown();
        } catch (Exception e3) {
            logger.error("Error in AkcesDatabaseModelController", e3);
            this.processState = AkcesDatabaseModelControllerState.ERROR;
        }
    }

    private void process() {
        if (this.processState == AkcesDatabaseModelControllerState.RUNNING) {
            try {
                processControlRecords();
                return;
            } catch (KafkaException e) {
                logger.error("Unrecoverable exception in AkcesDatabaseModelController", e);
                this.processState = AkcesDatabaseModelControllerState.SHUTTING_DOWN;
                return;
            } catch (WakeupException | InterruptException e2) {
                return;
            }
        }
        if (this.processState == AkcesDatabaseModelControllerState.INITIALIZING) {
            this.databaseModelRuntime.validateDomainEventSchemas();
            processControlRecords();
            return;
        }
        if (this.processState == AkcesDatabaseModelControllerState.INITIAL_REBALANCING) {
            if (!this.partitionsToAssign.isEmpty()) {
                try {
                    this.controlConsumer.seekToBeginning(this.partitionsToAssign);
                    Map endOffsets = this.controlConsumer.endOffsets(this.partitionsToAssign);
                    ConsumerRecords poll = this.controlConsumer.poll(Duration.ofMillis(100L));
                    while (!endOffsets.isEmpty()) {
                        poll.forEach(consumerRecord -> {
                            AggregateServiceRecord aggregateServiceRecord = (AkcesControlRecord) consumerRecord.value();
                            if (!(aggregateServiceRecord instanceof AggregateServiceRecord)) {
                                logger.info("Received unknown AkcesControlRecord type: {}", aggregateServiceRecord.getClass().getSimpleName());
                                return;
                            }
                            AggregateServiceRecord aggregateServiceRecord2 = aggregateServiceRecord;
                            if (this.aggregateServices.putIfAbsent((String) consumerRecord.key(), aggregateServiceRecord2) == null) {
                                logger.info("Discovered service: {}", aggregateServiceRecord2.aggregateName());
                            }
                        });
                        if (poll.isEmpty()) {
                            endOffsets.entrySet().removeIf(entry -> {
                                return ((Long) entry.getValue()).longValue() <= this.controlConsumer.position((TopicPartition) entry.getKey());
                            });
                        }
                        poll = this.controlConsumer.poll(Duration.ofMillis(100L));
                    }
                } catch (WakeupException | InterruptException e3) {
                } catch (KafkaException e4) {
                    logger.error("Unrecoverable exception in AkcesDatabaseModelController", e4);
                    this.processState = AkcesDatabaseModelControllerState.SHUTTING_DOWN;
                }
            }
            this.processState = AkcesDatabaseModelControllerState.REBALANCING;
            return;
        }
        if (this.processState == AkcesDatabaseModelControllerState.REBALANCING) {
            Iterator<TopicPartition> it = this.partitionsToRevoke.iterator();
            while (it.hasNext()) {
                DatabaseModelPartition remove = this.databaseModelPartitions.remove(Integer.valueOf(it.next().partition()));
                if (remove != null) {
                    logger.info("Stopping DatabaseModelPartition {}", remove.getId());
                    try {
                        remove.close();
                    } catch (Exception e5) {
                        logger.error("Error closing DatabaseModelPartition", e5);
                    }
                }
            }
            this.partitionsToRevoke.clear();
            for (TopicPartition topicPartition : this.partitionsToAssign) {
                DatabaseModelPartition databaseModelPartition = new DatabaseModelPartition(this.consumerFactory, this.databaseModelRuntime, this.gdprContextRepositoryFactory, Integer.valueOf(topicPartition.partition()), new TopicPartition("Akces-GDPRKeys", topicPartition.partition()), this.databaseModelRuntime.getDomainEventTypes(), this);
                this.databaseModelPartitions.put(databaseModelPartition.getId(), databaseModelPartition);
                logger.info("Starting DatabaseModelPartition {}", databaseModelPartition.getId());
                this.executorService.submit(databaseModelPartition);
            }
            this.partitionsToAssign.clear();
            this.processState = AkcesDatabaseModelControllerState.RUNNING;
        }
    }

    private void processControlRecords() {
        ConsumerRecords poll = this.controlConsumer.poll(Duration.ofMillis(100L));
        if (poll.isEmpty()) {
            return;
        }
        poll.forEach(consumerRecord -> {
            AggregateServiceRecord aggregateServiceRecord = (AkcesControlRecord) consumerRecord.value();
            if (!(aggregateServiceRecord instanceof AggregateServiceRecord)) {
                logger.info("Received unknown AkcesControlRecord type: {}", aggregateServiceRecord.getClass().getSimpleName());
                return;
            }
            AggregateServiceRecord aggregateServiceRecord2 = aggregateServiceRecord;
            if (!this.aggregateServices.containsKey(consumerRecord.key())) {
                logger.info("Discovered service: {}", aggregateServiceRecord2.aggregateName());
            }
            this.aggregateServices.put((String) consumerRecord.key(), aggregateServiceRecord2);
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        logger.info("Shutting down AkcesDatabaseModelController");
        this.processState = AkcesDatabaseModelControllerState.SHUTTING_DOWN;
        try {
            if (this.shutdownLatch.await(10L, TimeUnit.SECONDS)) {
                logger.info("AkcesDatabaseModelController has been shutdown");
            } else {
                logger.warn("AkcesDatabaseModelController did not shutdown within 10 seconds");
            }
        } catch (InterruptedException e) {
        }
    }

    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        if (collection.isEmpty()) {
            return;
        }
        this.partitionsToRevoke.addAll(collection);
        if (this.processState == AkcesDatabaseModelControllerState.RUNNING) {
            logger.info("Switching from RUNNING to REBALANCING, revoking partitions: {}", collection.stream().map((v0) -> {
                return v0.partition();
            }).toList());
            this.processState = AkcesDatabaseModelControllerState.REBALANCING;
        } else if (this.processState == AkcesDatabaseModelControllerState.INITIALIZING) {
            logger.info("Switching from INITIALIZING to INITIAL_REBALANCING, revoking partitions: {}", collection.stream().map((v0) -> {
                return v0.partition();
            }).toList());
            this.processState = AkcesDatabaseModelControllerState.INITIAL_REBALANCING;
        }
    }

    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        if (collection.isEmpty()) {
            return;
        }
        this.partitionsToAssign.addAll(collection);
        if (this.processState == AkcesDatabaseModelControllerState.RUNNING) {
            logger.info("Switching from RUNNING to REBALANCING, assigning partitions : {}", collection.stream().map((v0) -> {
                return v0.partition();
            }).toList());
            this.processState = AkcesDatabaseModelControllerState.REBALANCING;
        } else if (this.processState == AkcesDatabaseModelControllerState.INITIALIZING) {
            logger.info("Switching from INITIALIZING to INITIAL_REBALANCING, assigning partitions : {}", collection.stream().map((v0) -> {
                return v0.partition();
            }).toList());
            this.processState = AkcesDatabaseModelControllerState.INITIAL_REBALANCING;
        }
    }

    private boolean producesDomainEvent(List<AggregateServiceDomainEventType> list, DomainEventType<?> domainEventType) {
        for (AggregateServiceDomainEventType aggregateServiceDomainEventType : list) {
            if (aggregateServiceDomainEventType.typeName().equals(domainEventType.typeName()) && aggregateServiceDomainEventType.version() == domainEventType.version()) {
                return true;
            }
        }
        return false;
    }

    public String resolveTopic(@Nonnull DomainEventType<?> domainEventType) {
        List<AggregateServiceRecord> list = this.aggregateServices.values().stream().filter(aggregateServiceRecord -> {
            return producesDomainEvent(aggregateServiceRecord.producedEvents(), domainEventType);
        }).toList();
        if (list.size() == 1) {
            return ((AggregateServiceRecord) list.getFirst()).domainEventTopic();
        }
        throw new IllegalStateException("Cannot determine which service produces DomainEvent " + domainEventType.typeName() + " v" + domainEventType.version());
    }

    public CommandType<?> resolveType(@Nonnull Class<? extends Command> cls) {
        throw new UnsupportedOperationException();
    }

    public String resolveTopic(@Nonnull Class<? extends Command> cls) {
        throw new UnsupportedOperationException();
    }

    public String resolveTopic(@Nonnull CommandType<?> commandType) {
        throw new UnsupportedOperationException();
    }

    public Integer resolvePartition(@Nonnull String str) {
        throw new UnsupportedOperationException();
    }

    public boolean isRunning() {
        return this.processState == AkcesDatabaseModelControllerState.RUNNING && this.databaseModelPartitions.values().stream().allMatch((v0) -> {
            return v0.isProcessing();
        });
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}
