package org.elasticsoftware.akces;

import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import jakarta.annotation.Nonnull;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.stream.IntStream;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.WakeupException;
import org.elasticsoftware.akces.aggregate.AggregateRuntime;
import org.elasticsoftware.akces.aggregate.CommandType;
import org.elasticsoftware.akces.aggregate.DomainEventType;
import org.elasticsoftware.akces.aggregate.SchemaType;
import org.elasticsoftware.akces.annotations.CommandInfo;
import org.elasticsoftware.akces.commands.Command;
import org.elasticsoftware.akces.control.AggregateServiceCommandType;
import org.elasticsoftware.akces.control.AggregateServiceDomainEventType;
import org.elasticsoftware.akces.control.AggregateServiceRecord;
import org.elasticsoftware.akces.control.AkcesControlRecord;
import org.elasticsoftware.akces.control.AkcesRegistry;
import org.elasticsoftware.akces.gdpr.GDPRAnnotationUtils;
import org.elasticsoftware.akces.gdpr.GDPRContextRepositoryFactory;
import org.elasticsoftware.akces.kafka.AggregatePartition;
import org.elasticsoftware.akces.kafka.PartitionUtils;
import org.elasticsoftware.akces.protocol.ProtocolRecord;
import org.elasticsoftware.akces.schemas.IncompatibleSchemaException;
import org.elasticsoftware.akces.schemas.SchemaException;
import org.elasticsoftware.akces.state.AggregateStateRepositoryFactory;
import org.elasticsoftware.akces.util.HostUtils;
import org.elasticsoftware.akces.util.KafkaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.boot.availability.AvailabilityChangeEvent;
import org.springframework.boot.availability.LivenessState;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.EnvironmentAware;
import org.springframework.core.env.Environment;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaAdminOperations;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;

/* loaded from: input_file:org/elasticsoftware/akces/AkcesAggregateController.class */
public class AkcesAggregateController extends Thread implements AutoCloseable, ConsumerRebalanceListener, AkcesRegistry, EnvironmentAware, ApplicationContextAware {
    private static final Logger logger = LoggerFactory.getLogger(AkcesAggregateController.class);
    private final ConsumerFactory<String, ProtocolRecord> consumerFactory;
    private final ProducerFactory<String, ProtocolRecord> producerFactory;
    private final ProducerFactory<String, AkcesControlRecord> controlProducerFactory;
    private final ConsumerFactory<String, AkcesControlRecord> controlRecordConsumerFactory;
    private final AggregateRuntime aggregateRuntime;
    private final KafkaAdminOperations kafkaAdmin;
    private final Map<Integer, AggregatePartition> aggregatePartitions;
    private final ExecutorService executorService;
    private final HashFunction hashFunction;
    private final Map<String, AggregateServiceRecord> aggregateServices;
    private final AggregateStateRepositoryFactory aggregateStateRepositoryFactory;
    private final GDPRContextRepositoryFactory gdprContextRepositoryFactory;
    private final List<TopicPartition> partitionsToAssign;
    private final List<TopicPartition> partitionsToRevoke;
    private final CountDownLatch shutdownLatch;
    private Integer partitions;
    private Short replicationFactor;
    private Consumer<String, AkcesControlRecord> controlConsumer;
    private volatile AkcesControllerState processState;
    private boolean forceRegisterOnIncompatible;
    private ApplicationContext applicationContext;

    public AkcesAggregateController(ConsumerFactory<String, ProtocolRecord> consumerFactory, ProducerFactory<String, ProtocolRecord> producerFactory, ConsumerFactory<String, AkcesControlRecord> consumerFactory2, ProducerFactory<String, AkcesControlRecord> producerFactory2, AggregateStateRepositoryFactory aggregateStateRepositoryFactory, GDPRContextRepositoryFactory gDPRContextRepositoryFactory, AggregateRuntime aggregateRuntime, KafkaAdminOperations kafkaAdminOperations) {
        super(aggregateRuntime.getName() + "-AkcesController");
        this.aggregatePartitions = new HashMap();
        this.hashFunction = Hashing.murmur3_32_fixed();
        this.aggregateServices = new ConcurrentHashMap();
        this.partitionsToAssign = new ArrayList();
        this.partitionsToRevoke = new ArrayList();
        this.shutdownLatch = new CountDownLatch(1);
        this.partitions = null;
        this.replicationFactor = null;
        this.processState = AkcesControllerState.INITIALIZING;
        this.forceRegisterOnIncompatible = false;
        this.consumerFactory = consumerFactory;
        this.producerFactory = producerFactory;
        this.controlProducerFactory = producerFactory2;
        this.controlRecordConsumerFactory = consumerFactory2;
        this.aggregateStateRepositoryFactory = aggregateStateRepositoryFactory;
        this.gdprContextRepositoryFactory = gDPRContextRepositoryFactory;
        this.aggregateRuntime = aggregateRuntime;
        this.kafkaAdmin = kafkaAdminOperations;
        this.executorService = Executors.newCachedThreadPool(new CustomizableThreadFactory(aggregateRuntime.getName() + "AggregatePartitionThread-"));
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            this.controlConsumer = this.controlRecordConsumerFactory.createConsumer(this.aggregateRuntime.getName() + "-Akces-Control", this.aggregateRuntime.getName() + "-" + HostUtils.getHostName() + "-Akces-Control", (String) null);
            this.controlConsumer.subscribe(List.of("Akces-Control"), this);
            while (this.processState != AkcesControllerState.SHUTTING_DOWN) {
                process();
            }
            logger.info("Closing {} AggregatePartitions", Integer.valueOf(this.aggregatePartitions.size()));
            this.aggregatePartitions.values().forEach(aggregatePartition -> {
                if (aggregatePartition != null) {
                    try {
                        aggregatePartition.close();
                    } catch (Exception e) {
                        logger.error("Error closing AggregatePartition " + aggregatePartition.getId(), e);
                    }
                }
            });
            try {
                this.controlConsumer.close(Duration.ofSeconds(5L));
            } catch (InterruptException e) {
            } catch (KafkaException e2) {
                logger.error("Error closing controlConsumer", e2);
            }
            this.applicationContext.publishEvent(new AvailabilityChangeEvent(this, LivenessState.BROKEN));
            this.shutdownLatch.countDown();
        } catch (Exception e3) {
            logger.error("Error in AkcesController", e3);
            this.processState = AkcesControllerState.ERROR;
        }
    }

    private void process() {
        if (this.processState == AkcesControllerState.RUNNING) {
            try {
                processControlRecords();
                return;
            } catch (KafkaException e) {
                logger.error("Unrecoverable exception in AkcesController", e);
                this.processState = AkcesControllerState.SHUTTING_DOWN;
                return;
            } catch (WakeupException | InterruptException e2) {
                return;
            }
        }
        if (this.processState == AkcesControllerState.INITIALIZING) {
            TopicDescription topicDescription = (TopicDescription) this.kafkaAdmin.describeTopics(new String[]{"Akces-Control"}).get("Akces-Control");
            this.partitions = Integer.valueOf(topicDescription.partitions().size());
            this.replicationFactor = Short.valueOf((short) ((TopicPartitionInfo) topicDescription.partitions().getFirst()).replicas().size());
            for (DomainEventType<?> domainEventType : this.aggregateRuntime.getProducedDomainEventTypes()) {
                try {
                    this.aggregateRuntime.registerAndValidate(domainEventType);
                } catch (IncompatibleSchemaException e3) {
                    if (!this.forceRegisterOnIncompatible) {
                        throw e3;
                    }
                    if (!protocolRecordTypeNotYetProduced(domainEventType, (v0, v1) -> {
                        return PartitionUtils.toDomainEventTopicPartition(v0, v1);
                    })) {
                        logger.warn("Cannot update schema for DomainEvent {} v{} because it has already been produced", domainEventType.typeName(), Integer.valueOf(domainEventType.version()));
                        throw e3;
                    }
                    this.aggregateRuntime.registerAndValidate(domainEventType, true);
                }
            }
            for (CommandType<?> commandType : this.aggregateRuntime.getLocalCommandTypes()) {
                try {
                    this.aggregateRuntime.registerAndValidate(commandType);
                } catch (IncompatibleSchemaException e4) {
                    if (!this.forceRegisterOnIncompatible) {
                        throw e4;
                    }
                    if (!protocolRecordTypeNotYetProduced(commandType, (v0, v1) -> {
                        return PartitionUtils.toCommandTopicPartition(v0, v1);
                    })) {
                        logger.warn("Cannot update schema for Command {} v{} because it has already been produced", commandType.typeName(), Integer.valueOf(commandType.version()));
                        throw e4;
                    }
                    this.aggregateRuntime.registerAndValidate(commandType, true);
                }
            }
            publishControlRecord(this.partitions.intValue());
            processControlRecords();
            return;
        }
        if (this.processState != AkcesControllerState.INITIAL_REBALANCING) {
            if (this.processState == AkcesControllerState.REBALANCING) {
                Iterator<TopicPartition> it = this.partitionsToRevoke.iterator();
                while (it.hasNext()) {
                    AggregatePartition remove = this.aggregatePartitions.remove(Integer.valueOf(it.next().partition()));
                    if (remove != null) {
                        logger.info("Stopping AggregatePartition {}", remove.getId());
                        try {
                            remove.close();
                        } catch (Exception e5) {
                            logger.error("Error closing AggregatePartition", e5);
                        }
                    }
                }
                this.partitionsToRevoke.clear();
                for (TopicPartition topicPartition : this.partitionsToAssign) {
                    AggregatePartition aggregatePartition = new AggregatePartition(this.consumerFactory, this.producerFactory, this.aggregateRuntime, this.aggregateStateRepositoryFactory, this.gdprContextRepositoryFactory, Integer.valueOf(topicPartition.partition()), PartitionUtils.toCommandTopicPartition(this.aggregateRuntime, topicPartition.partition()), PartitionUtils.toDomainEventTopicPartition(this.aggregateRuntime, topicPartition.partition()), PartitionUtils.toAggregateStateTopicPartition(this.aggregateRuntime, topicPartition.partition()), PartitionUtils.toGDPRKeysTopicPartition(this.aggregateRuntime, topicPartition.partition()), this.aggregateRuntime.getExternalDomainEventTypes(), this, this::createIndexTopic);
                    this.aggregatePartitions.put(aggregatePartition.getId(), aggregatePartition);
                    logger.info("Starting AggregatePartition {}", aggregatePartition.getId());
                    this.executorService.submit(aggregatePartition);
                }
                this.partitionsToAssign.clear();
                this.processState = AkcesControllerState.RUNNING;
                return;
            }
            return;
        }
        if (!this.partitionsToAssign.isEmpty()) {
            try {
                this.controlConsumer.seekToBeginning(this.partitionsToAssign);
                Map endOffsets = this.controlConsumer.endOffsets(this.partitionsToAssign);
                ConsumerRecords poll = this.controlConsumer.poll(Duration.ofMillis(100L));
                while (!endOffsets.isEmpty()) {
                    poll.forEach(consumerRecord -> {
                        AggregateServiceRecord aggregateServiceRecord = (AkcesControlRecord) consumerRecord.value();
                        if (!(aggregateServiceRecord instanceof AggregateServiceRecord)) {
                            logger.info("Received unknown AkcesControlRecord type: {}", aggregateServiceRecord.getClass().getSimpleName());
                            return;
                        }
                        AggregateServiceRecord aggregateServiceRecord2 = aggregateServiceRecord;
                        if (this.aggregateServices.putIfAbsent((String) consumerRecord.key(), aggregateServiceRecord2) == null) {
                            logger.info("Discovered service: {}", aggregateServiceRecord2.aggregateName());
                        }
                    });
                    if (poll.isEmpty()) {
                        endOffsets.entrySet().removeIf(entry -> {
                            return ((Long) entry.getValue()).longValue() <= this.controlConsumer.position((TopicPartition) entry.getKey());
                        });
                    }
                    poll = this.controlConsumer.poll(Duration.ofMillis(100L));
                }
            } catch (WakeupException | InterruptException e6) {
            } catch (KafkaException e7) {
                logger.error("Unrecoverable exception in AkcesController", e7);
                this.processState = AkcesControllerState.SHUTTING_DOWN;
            }
            for (DomainEventType<?> domainEventType2 : this.aggregateRuntime.getExternalDomainEventTypes()) {
                try {
                    this.aggregateRuntime.registerAndValidate(domainEventType2);
                } catch (SchemaException e8) {
                    logger.error("Error registering external domain event type: {}:{}", new Object[]{domainEventType2.typeName(), Integer.valueOf(domainEventType2.version()), e8});
                    this.processState = AkcesControllerState.SHUTTING_DOWN;
                }
            }
            for (CommandType<?> commandType2 : this.aggregateRuntime.getExternalCommandTypes()) {
                try {
                    this.aggregateRuntime.registerAndValidate(commandType2);
                } catch (SchemaException e9) {
                    logger.error("Error registering external command type: {}:{}", new Object[]{commandType2.typeName(), Integer.valueOf(commandType2.version()), e9});
                    this.processState = AkcesControllerState.SHUTTING_DOWN;
                }
            }
        }
        this.processState = AkcesControllerState.REBALANCING;
    }

    private boolean protocolRecordTypeNotYetProduced(SchemaType<?> schemaType, BiFunction<AggregateRuntime, Integer, TopicPartition> biFunction) {
        try {
            Consumer createConsumer = this.consumerFactory.createConsumer(this.aggregateRuntime.getName() + "-Akces-Control-TypeCheck", this.aggregateRuntime.getName() + "-" + HostUtils.getHostName() + "-Akces-Control-TypeCheck", (String) null);
            try {
                List list = IntStream.range(0, this.partitions.intValue()).mapToObj(i -> {
                    return (TopicPartition) biFunction.apply(this.aggregateRuntime, Integer.valueOf(i));
                }).toList();
                createConsumer.assign(list);
                createConsumer.seekToBeginning(list);
                Map endOffsets = createConsumer.endOffsets(list);
                while (!endOffsets.isEmpty()) {
                    try {
                        try {
                            Iterator it = createConsumer.poll(Duration.ofMillis(10L)).iterator();
                            while (it.hasNext()) {
                                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                                if (((ProtocolRecord) consumerRecord.value()).name().equals(schemaType.typeName()) && ((ProtocolRecord) consumerRecord.value()).version() == schemaType.version()) {
                                    if (createConsumer != null) {
                                        createConsumer.close();
                                    }
                                    return false;
                                }
                            }
                            endOffsets.entrySet().removeIf(entry -> {
                                return ((Long) entry.getValue()).longValue() <= createConsumer.position((TopicPartition) entry.getKey());
                            });
                        } catch (WakeupException | InterruptException e) {
                        }
                    } catch (KafkaException e2) {
                        logger.error("KafkaException while checking if ProtocolRecord {} v{} has already been produced", new Object[]{schemaType.typeName(), Integer.valueOf(schemaType.version()), e2});
                        if (createConsumer != null) {
                            createConsumer.close();
                        }
                        return false;
                    }
                }
                if (createConsumer != null) {
                    createConsumer.close();
                }
                return true;
            } finally {
            }
        } catch (KafkaException e3) {
            logger.error("KafkaException while checking if ProtocolRecord {} v{} has already been produced", new Object[]{schemaType.typeName(), Integer.valueOf(schemaType.version()), e3});
            return false;
        }
    }

    private void processControlRecords() {
        ConsumerRecords poll = this.controlConsumer.poll(Duration.ofMillis(100L));
        if (poll.isEmpty()) {
            return;
        }
        poll.forEach(consumerRecord -> {
            AggregateServiceRecord aggregateServiceRecord = (AkcesControlRecord) consumerRecord.value();
            if (!(aggregateServiceRecord instanceof AggregateServiceRecord)) {
                logger.info("Received unknown AkcesControlRecord type: {}", aggregateServiceRecord.getClass().getSimpleName());
                return;
            }
            AggregateServiceRecord aggregateServiceRecord2 = aggregateServiceRecord;
            if (!this.aggregateServices.containsKey(consumerRecord.key())) {
                logger.info("Discovered service: {}", aggregateServiceRecord2.aggregateName());
            }
            this.aggregateServices.put((String) consumerRecord.key(), aggregateServiceRecord2);
        });
    }

    private Boolean createIndexTopic(String str, String str2) {
        try {
            this.kafkaAdmin.createOrModifyTopics(new NewTopic[]{KafkaUtils.createCompactedTopic(KafkaUtils.getIndexTopicName(str, str2), 1, this.replicationFactor.shortValue())});
            return true;
        } catch (Exception e) {
            logger.error("Error creating index topic: {}", str, e);
            return false;
        }
    }

    private void publishControlRecord(int i) {
        try {
            Producer createProducer = this.controlProducerFactory.createProducer(this.aggregateRuntime.getName() + "-" + HostUtils.getHostName() + "-control");
            try {
                AggregateServiceRecord aggregateServiceRecord = new AggregateServiceRecord(this.aggregateRuntime.getName(), this.aggregateRuntime.getName() + "-Commands", this.aggregateRuntime.getName() + "-DomainEvents", this.aggregateRuntime.getAllCommandTypes().stream().map(commandType -> {
                    return new AggregateServiceCommandType(commandType.typeName(), commandType.version(), commandType.create(), "commands." + commandType.typeName());
                }).toList(), this.aggregateRuntime.getProducedDomainEventTypes().stream().map(domainEventType -> {
                    return new AggregateServiceDomainEventType(domainEventType.typeName(), domainEventType.version(), domainEventType.create(), domainEventType.external(), "domainevents." + domainEventType.typeName());
                }).toList(), this.aggregateRuntime.getExternalDomainEventTypes().stream().map(domainEventType2 -> {
                    return new AggregateServiceDomainEventType(domainEventType2.typeName(), domainEventType2.version(), domainEventType2.create(), domainEventType2.external(), "domainevents." + domainEventType2.typeName());
                }).toList());
                createProducer.beginTransaction();
                for (int i2 = 0; i2 < i; i2++) {
                    createProducer.send(new ProducerRecord("Akces-Control", Integer.valueOf(i2), this.aggregateRuntime.getName(), aggregateServiceRecord));
                }
                createProducer.commitTransaction();
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Exception e) {
            logger.error("Error publishing CommandServiceRecord", e);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        logger.info("Shutting down AkcesAggregateController");
        this.processState = AkcesControllerState.SHUTTING_DOWN;
        try {
            if (this.shutdownLatch.await(10L, TimeUnit.SECONDS)) {
                logger.info("AkcesAggregateController has been shutdown");
            } else {
                logger.warn("AkcesAggregateController did not shutdown within 10 seconds");
            }
        } catch (InterruptedException e) {
        }
    }

    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        if (collection.isEmpty()) {
            return;
        }
        this.partitionsToRevoke.addAll(collection);
        if (this.processState == AkcesControllerState.RUNNING) {
            logger.info("Switching from RUNNING to REBALANCING, revoking partitions: {}", collection.stream().map((v0) -> {
                return v0.partition();
            }).toList());
            this.processState = AkcesControllerState.REBALANCING;
        } else if (this.processState == AkcesControllerState.INITIALIZING) {
            logger.info("Switching from INITIALIZING to INITIAL_REBALANCING, revoking partitions: {}", collection.stream().map((v0) -> {
                return v0.partition();
            }).toList());
            this.processState = AkcesControllerState.INITIAL_REBALANCING;
        }
    }

    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        if (collection.isEmpty()) {
            return;
        }
        this.partitionsToAssign.addAll(collection);
        if (this.processState == AkcesControllerState.RUNNING) {
            logger.info("Switching from RUNNING to REBALANCING, assigning partitions : {}", collection.stream().map((v0) -> {
                return v0.partition();
            }).toList());
            this.processState = AkcesControllerState.REBALANCING;
        } else if (this.processState == AkcesControllerState.INITIALIZING) {
            logger.info("Switching from INITIALIZING to INITIAL_REBALANCING, assigning partitions : {}", collection.stream().map((v0) -> {
                return v0.partition();
            }).toList());
            this.processState = AkcesControllerState.INITIAL_REBALANCING;
        }
    }

    @Nonnull
    public CommandType<?> resolveType(@Nonnull Class<? extends Command> cls) {
        CommandInfo annotation = cls.getAnnotation(CommandInfo.class);
        if (annotation == null) {
            throw new IllegalStateException("Command class " + cls.getName() + " is not annotated with @CommandInfo");
        }
        List<AggregateServiceRecord> list = this.aggregateServices.values().stream().filter(aggregateServiceRecord -> {
            return supportsCommand(aggregateServiceRecord.supportedCommands(), annotation);
        }).toList();
        if (list.size() == 1) {
            return this.aggregateRuntime.getName().equals(((AggregateServiceRecord) list.getFirst()).aggregateName()) ? this.aggregateRuntime.getLocalCommandType(annotation.type(), annotation.version()) : new CommandType<>(annotation.type(), annotation.version(), cls, false, true, GDPRAnnotationUtils.hasPIIDataAnnotation(cls).booleanValue());
        }
        throw new IllegalStateException("Cannot determine where to send command " + cls.getName());
    }

    private boolean supportsCommand(List<AggregateServiceCommandType> list, CommandInfo commandInfo) {
        for (AggregateServiceCommandType aggregateServiceCommandType : list) {
            if (aggregateServiceCommandType.typeName().equals(commandInfo.type()) && aggregateServiceCommandType.version() == commandInfo.version()) {
                return true;
            }
        }
        return false;
    }

    private boolean supportsCommand(List<AggregateServiceCommandType> list, CommandType<?> commandType) {
        for (AggregateServiceCommandType aggregateServiceCommandType : list) {
            if (aggregateServiceCommandType.typeName().equals(commandType.typeName()) && aggregateServiceCommandType.version() == commandType.version()) {
                return true;
            }
        }
        return false;
    }

    private boolean producesDomainEvent(List<AggregateServiceDomainEventType> list, DomainEventType<?> domainEventType) {
        for (AggregateServiceDomainEventType aggregateServiceDomainEventType : list) {
            if (aggregateServiceDomainEventType.typeName().equals(domainEventType.typeName()) && aggregateServiceDomainEventType.version() == domainEventType.version()) {
                return true;
            }
        }
        return false;
    }

    @Nonnull
    public String resolveTopic(@Nonnull Class<? extends Command> cls) {
        return resolveTopic(resolveType(cls));
    }

    @Nonnull
    public String resolveTopic(@Nonnull CommandType<?> commandType) {
        List<AggregateServiceRecord> list = this.aggregateServices.values().stream().filter(aggregateServiceRecord -> {
            return supportsCommand(aggregateServiceRecord.supportedCommands(), (CommandType<?>) commandType);
        }).toList();
        if (list.size() == 1) {
            return ((AggregateServiceRecord) list.getFirst()).commandTopic();
        }
        throw new IllegalStateException("Cannot determine where to send command " + commandType.typeName() + " v" + commandType.version());
    }

    public String resolveTopic(@Nonnull DomainEventType<?> domainEventType) {
        List<AggregateServiceRecord> list = this.aggregateServices.values().stream().filter(aggregateServiceRecord -> {
            return producesDomainEvent(aggregateServiceRecord.producedEvents(), domainEventType);
        }).toList();
        if (list.size() == 1) {
            return ((AggregateServiceRecord) list.getFirst()).domainEventTopic();
        }
        throw new IllegalStateException("Cannot determine which service produces DomainEvent " + domainEventType.typeName() + " v" + domainEventType.version());
    }

    @Nonnull
    public Integer resolvePartition(@Nonnull String str) {
        return Integer.valueOf(Math.abs(this.hashFunction.hashString(str, StandardCharsets.UTF_8).asInt()) % this.partitions.intValue());
    }

    public boolean isRunning() {
        return this.processState == AkcesControllerState.RUNNING && this.aggregatePartitions.values().stream().allMatch((v0) -> {
            return v0.isProcessing();
        });
    }

    public void setEnvironment(Environment environment) {
        this.forceRegisterOnIncompatible = ((Boolean) environment.getProperty("akces.aggregate.schemas.forceRegister", Boolean.class, false)).booleanValue();
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}
