package org.axonframework.extensions.kafka.autoconfig;

import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.Optional;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.config.Configuration;
import org.axonframework.config.EventProcessingConfigurer;
import org.axonframework.eventhandling.PropagatingErrorHandler;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.extensions.kafka.KafkaProperties;
import org.axonframework.extensions.kafka.eventhandling.DefaultKafkaMessageConverter;
import org.axonframework.extensions.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.extensions.kafka.eventhandling.cloudevent.CloudEventKafkaMessageConverter;
import org.axonframework.extensions.kafka.eventhandling.consumer.AsyncFetcher;
import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerFactory;
import org.axonframework.extensions.kafka.eventhandling.consumer.DefaultConsumerFactory;
import org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher;
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.KafkaEventMessage;
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.SortedKafkaMessageBuffer;
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource;
import org.axonframework.extensions.kafka.eventhandling.producer.ConfirmationMode;
import org.axonframework.extensions.kafka.eventhandling.producer.DefaultProducerFactory;
import org.axonframework.extensions.kafka.eventhandling.producer.KafkaEventPublisher;
import org.axonframework.extensions.kafka.eventhandling.producer.KafkaPublisher;
import org.axonframework.extensions.kafka.eventhandling.producer.ProducerFactory;
import org.axonframework.extensions.kafka.eventhandling.producer.TopicResolver;
import org.axonframework.extensions.kafka.eventhandling.tokenstore.KafkaTokenStore;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.upcasting.event.EventUpcaster;
import org.axonframework.serialization.upcasting.event.EventUpcasterChain;
import org.axonframework.springboot.TokenStoreProperties;
import org.axonframework.springboot.autoconfig.AxonAutoConfiguration;
import org.axonframework.springboot.autoconfig.InfraConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.AnyNestedCondition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.ConfigurationCondition;

@AutoConfigureBefore({InfraConfiguration.class})
@EnableConfigurationProperties({KafkaProperties.class, TokenStoreProperties.class})
@AutoConfiguration
@AutoConfigureAfter({AxonAutoConfiguration.class})
@ConditionalOnExpression("${axon.kafka.publisher.enabled:true} or ${axon.kafka.fetcher.enabled:true}")
/* loaded from: input_file:BOOT-INF/lib/axon-kafka-spring-boot-autoconfigure-4.9.0.jar:org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.class */
public class KafkaAutoConfiguration {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final KafkaProperties properties;
    private final TokenStoreProperties tokenStoreProperties;

    /* loaded from: input_file:BOOT-INF/lib/axon-kafka-spring-boot-autoconfigure-4.9.0.jar:org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration$ConsumerStreamingProcessorModeCondition.class */
    private static class ConsumerStreamingProcessorModeCondition extends AnyNestedCondition {

        @ConditionalOnProperty(name = {"axon.kafka.consumer.event-processor-mode"}, havingValue = "pooled_streaming")
        /* loaded from: input_file:BOOT-INF/lib/axon-kafka-spring-boot-autoconfigure-4.9.0.jar:org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration$ConsumerStreamingProcessorModeCondition$PooledStreamingCondition.class */
        static class PooledStreamingCondition {
            PooledStreamingCondition() {
            }
        }

        @ConditionalOnProperty(name = {"axon.kafka.consumer.event-processor-mode"}, havingValue = "tracking")
        /* loaded from: input_file:BOOT-INF/lib/axon-kafka-spring-boot-autoconfigure-4.9.0.jar:org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration$ConsumerStreamingProcessorModeCondition$TrackingCondition.class */
        static class TrackingCondition {
            TrackingCondition() {
            }
        }

        public ConsumerStreamingProcessorModeCondition() {
            super(ConfigurationCondition.ConfigurationPhase.REGISTER_BEAN);
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/axon-kafka-spring-boot-autoconfigure-4.9.0.jar:org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration$ProducerStreamingProcessorModeCondition.class */
    private static class ProducerStreamingProcessorModeCondition extends AnyNestedCondition {

        @ConditionalOnProperty(name = {"axon.kafka.producer.event-processor-mode"}, havingValue = "pooled_streaming")
        /* loaded from: input_file:BOOT-INF/lib/axon-kafka-spring-boot-autoconfigure-4.9.0.jar:org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration$ProducerStreamingProcessorModeCondition$PooledStreamingCondition.class */
        static class PooledStreamingCondition {
            PooledStreamingCondition() {
            }
        }

        @ConditionalOnProperty(name = {"axon.kafka.producer.event-processor-mode"}, havingValue = "tracking")
        /* loaded from: input_file:BOOT-INF/lib/axon-kafka-spring-boot-autoconfigure-4.9.0.jar:org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration$ProducerStreamingProcessorModeCondition$TrackingCondition.class */
        static class TrackingCondition {
            TrackingCondition() {
            }
        }

        public ProducerStreamingProcessorModeCondition() {
            super(ConfigurationCondition.ConfigurationPhase.REGISTER_BEAN);
        }
    }

    public KafkaAutoConfiguration(KafkaProperties kafkaProperties, TokenStoreProperties tokenStoreProperties) {
        this.properties = kafkaProperties;
        this.tokenStoreProperties = tokenStoreProperties;
    }

    @ConditionalOnMissingBean
    @Bean
    public KafkaMessageConverter<?, ?> kafkaMessageConverter(@Qualifier("eventSerializer") Serializer serializer, Configuration configuration) {
        KafkaProperties.MessageConverterMode messageConverterMode = this.properties.getMessageConverterMode();
        if (messageConverterMode == KafkaProperties.MessageConverterMode.DEFAULT) {
            return DefaultKafkaMessageConverter.builder().serializer(serializer).upcasterChain(configuration.upcasterChain() != null ? configuration.upcasterChain() : new EventUpcasterChain(new EventUpcaster[0])).build();
        }
        if (messageConverterMode == KafkaProperties.MessageConverterMode.CLOUD_EVENT) {
            return CloudEventKafkaMessageConverter.builder().serializer(serializer).upcasterChain(configuration.upcasterChain() != null ? configuration.upcasterChain() : new EventUpcasterChain(new EventUpcaster[0])).build();
        }
        throw new AxonConfigurationException("Unknown Kafka Message Converter Mode [" + messageConverterMode + "] detected");
    }

    @ConditionalOnMissingBean
    @ConditionalOnProperty(name = {"axon.kafka.publisher.enabled"}, havingValue = "true", matchIfMissing = true)
    @Bean({"axonKafkaProducerFactory"})
    public <K, V> ProducerFactory<?, ?> kafkaProducerFactory() {
        ConfirmationMode confirmationMode = this.properties.getPublisher().getConfirmationMode();
        String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
        DefaultProducerFactory.Builder<K, V> confirmationMode2 = DefaultProducerFactory.builder().configuration(this.properties.buildProducerProperties()).confirmationMode(confirmationMode);
        if (isNonEmptyString(transactionIdPrefix)) {
            confirmationMode2.transactionalIdPrefix(transactionIdPrefix).confirmationMode(ConfirmationMode.TRANSACTIONAL);
            if (!confirmationMode.isTransactional()) {
                logger.warn("The confirmation mode is set to [{}], whilst a transactional id prefix is present. The transactional id prefix overwrites the confirmation mode choice to TRANSACTIONAL", confirmationMode);
            }
        }
        return confirmationMode2.build();
    }

    private boolean isNonEmptyString(String str) {
        return (str == null || str.equals("")) ? false : true;
    }

    @ConditionalOnMissingBean
    @ConditionalOnBean({ProducerFactory.class, KafkaMessageConverter.class})
    @ConditionalOnProperty(name = {"axon.kafka.publisher.enabled"}, havingValue = "true", matchIfMissing = true)
    @Bean
    public TopicResolver topicResolver() {
        return eventMessage -> {
            return Optional.of(this.properties.getDefaultTopic());
        };
    }

    @ConditionalOnMissingBean
    @ConditionalOnBean({ProducerFactory.class, KafkaMessageConverter.class})
    @ConditionalOnProperty(name = {"axon.kafka.publisher.enabled"}, havingValue = "true", matchIfMissing = true)
    @Bean(destroyMethod = "shutDown")
    public <K, V> KafkaPublisher<?, ?> kafkaPublisher(@Qualifier("eventSerializer") Serializer serializer, ProducerFactory<K, V> producerFactory, KafkaMessageConverter<K, V> kafkaMessageConverter, Configuration configuration, TopicResolver topicResolver) {
        return KafkaPublisher.builder().serializer(serializer).producerFactory(producerFactory).messageConverter(kafkaMessageConverter).messageMonitor(configuration.messageMonitor(KafkaPublisher.class, "kafkaPublisher")).topicResolver(topicResolver).build();
    }

    @ConditionalOnMissingBean
    @ConditionalOnBean({KafkaPublisher.class})
    @ConditionalOnProperty(name = {"axon.kafka.publisher.enabled"}, havingValue = "true", matchIfMissing = true)
    @Bean
    public <K, V> KafkaEventPublisher<?, ?> kafkaEventPublisher(KafkaPublisher<K, V> kafkaPublisher, KafkaProperties kafkaProperties, EventProcessingConfigurer eventProcessingConfigurer) {
        KafkaEventPublisher<K, V> build = KafkaEventPublisher.builder().processingGroup(this.properties.getPublisher().getProcessingGroup()).kafkaPublisher(kafkaPublisher).build();
        eventProcessingConfigurer.registerEventHandler(configuration -> {
            return build;
        }).registerListenerInvocationErrorHandler(build.getProcessingGroup(), configuration2 -> {
            return PropagatingErrorHandler.instance();
        }).assignHandlerTypesMatching(build.getProcessingGroup(), cls -> {
            return cls.isAssignableFrom(KafkaEventPublisher.class);
        });
        KafkaProperties.EventProcessorMode eventProcessorMode = kafkaProperties.getProducer().getEventProcessorMode();
        if (eventProcessorMode == KafkaProperties.EventProcessorMode.SUBSCRIBING) {
            eventProcessingConfigurer.registerSubscribingEventProcessor(build.getProcessingGroup());
        } else if (eventProcessorMode == KafkaProperties.EventProcessorMode.TRACKING) {
            eventProcessingConfigurer.registerTrackingEventProcessor(build.getProcessingGroup());
        } else {
            if (eventProcessorMode != KafkaProperties.EventProcessorMode.POOLED_STREAMING) {
                throw new AxonConfigurationException("Unknown Event Processor Mode [" + eventProcessorMode + "] detected");
            }
            eventProcessingConfigurer.registerPooledStreamingEventProcessor(build.getProcessingGroup());
        }
        return build;
    }

    @ConditionalOnMissingBean
    @Conditional({ProducerStreamingProcessorModeCondition.class})
    @ConditionalOnProperty(name = {"axon.kafka.fetcher.enabled"}, havingValue = "true", matchIfMissing = true)
    @Bean
    public TokenStore tokenStore(Serializer serializer) {
        return KafkaTokenStore.builder().serializer(serializer).consumerConfiguration(this.properties.buildConsumerProperties()).producerConfiguration(this.properties.buildProducerProperties()).claimTimeout(this.tokenStoreProperties.getClaimTimeout()).build();
    }

    @ConditionalOnMissingBean
    @ConditionalOnProperty(name = {"axon.kafka.fetcher.enabled"}, havingValue = "true", matchIfMissing = true)
    @Bean({"axonKafkaConsumerFactory"})
    public ConsumerFactory<?, ?> kafkaConsumerFactory() {
        return new DefaultConsumerFactory(this.properties.buildConsumerProperties());
    }

    @ConditionalOnMissingBean
    @ConditionalOnProperty(name = {"axon.kafka.fetcher.enabled"}, havingValue = "true", matchIfMissing = true)
    @Bean(destroyMethod = "shutdown")
    public Fetcher<?, ?, ?> kafkaFetcher() {
        return AsyncFetcher.builder().pollTimeout(this.properties.getFetcher().getPollTimeout()).build();
    }

    @ConditionalOnMissingBean
    @ConditionalOnBean({ConsumerFactory.class, KafkaMessageConverter.class, Fetcher.class})
    @Conditional({ConsumerStreamingProcessorModeCondition.class})
    @ConditionalOnProperty(name = {"axon.kafka.fetcher.enabled"}, havingValue = "true", matchIfMissing = true)
    @Bean
    public <K, V> StreamableKafkaMessageSource<?, ?> streamableKafkaMessageSource(@Qualifier("eventSerializer") Serializer serializer, ConsumerFactory<K, V> consumerFactory, Fetcher<K, V, KafkaEventMessage> fetcher, KafkaMessageConverter<K, V> kafkaMessageConverter) {
        return StreamableKafkaMessageSource.builder().topics(Collections.singletonList(this.properties.getDefaultTopic())).serializer(serializer).consumerFactory(consumerFactory).fetcher(fetcher).messageConverter(kafkaMessageConverter).bufferFactory(() -> {
            return new SortedKafkaMessageBuffer(this.properties.getFetcher().getBufferSize());
        }).build();
    }
}
