package com.ken.event.action.mq.kafka.config;

import com.ken.event.action.apply.consumer.IKenEventHandler;
import com.ken.event.action.apply.consumer.KenEvent;
import com.ken.event.action.mq.kafka.consumer.KafkaConsumerListener;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.annotation.PostConstruct;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;

@Configuration
@ConditionalOnClass({KafkaTemplate.class})
@ComponentScan({"com.ken.event.action.mq.kafka"})
/* loaded from: input_file:com/ken/event/action/mq/kafka/config/KafkaAutoConfiguration.class */
public class KafkaAutoConfiguration {
    private static final Logger log = LoggerFactory.getLogger(KafkaAutoConfiguration.class);

    @Autowired
    private KafkaProperties kafkaProperties;

    @Configuration
    @ConditionalOnBean({IKenEventHandler.class})
    /* loaded from: input_file:com/ken/event/action/mq/kafka/config/KafkaAutoConfiguration$ConsumerConfiguration.class */
    public static class ConsumerConfiguration {
        private static final Logger log = LoggerFactory.getLogger(ConsumerConfiguration.class);

        @Autowired
        private List<IKenEventHandler> eventHandlers;

        @Autowired
        private AdminClient adminClient;

        public ConsumerConfiguration() {
            log.debug("[Kafka Module Loader] Kafka的消费端的配置加载！");
        }

        @PostConstruct
        public void createTopic() {
            log.info("[event create topic] - 事件总线消费端创建Kafka Topic主题");
            ArrayList arrayList = new ArrayList();
            Iterator<IKenEventHandler> it = this.eventHandlers.iterator();
            while (it.hasNext()) {
                arrayList.add(new NewTopic(((KenEvent) it.next().getClass().getAnnotation(KenEvent.class)).value(), 4, (short) 2));
            }
            this.adminClient.createTopics(arrayList);
        }

        @Bean
        public String[] eventTypes() {
            ArrayList arrayList = new ArrayList();
            Iterator<IKenEventHandler> it = this.eventHandlers.iterator();
            while (it.hasNext()) {
                arrayList.add(((KenEvent) it.next().getClass().getAnnotation(KenEvent.class)).value());
            }
            return (String[]) arrayList.toArray(new String[0]);
        }

        @Bean
        public KafkaConsumerListener msgListener() {
            return new KafkaConsumerListener();
        }
    }

    @PostConstruct
    public void init() {
        log.info("[init serializer] 初始化序列化器....");
        this.kafkaProperties.getProducer().setKeySerializer(ByteArraySerializer.class);
        this.kafkaProperties.getProducer().setValueSerializer(ByteArraySerializer.class);
        this.kafkaProperties.getConsumer().setKeyDeserializer(ByteArrayDeserializer.class);
        this.kafkaProperties.getConsumer().setValueDeserializer(ByteArrayDeserializer.class);
    }

    @Bean
    public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> producerFactory, ProducerListener<Object, Object> producerListener, ObjectProvider<RecordMessageConverter> objectProvider) {
        KafkaTemplate<?, ?> kafkaTemplate = new KafkaTemplate<>(producerFactory);
        kafkaTemplate.getClass();
        objectProvider.ifUnique(kafkaTemplate::setMessageConverter);
        kafkaTemplate.setProducerListener(producerListener);
        kafkaTemplate.setDefaultTopic(this.kafkaProperties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }

    @Bean
    public ProducerListener<Object, Object> kafkaProducerListener() {
        return new LoggingProducerListener();
    }

    @Bean
    public ConsumerFactory<?, ?> kafkaConsumerFactory(ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> objectProvider) {
        DefaultKafkaConsumerFactory defaultKafkaConsumerFactory = new DefaultKafkaConsumerFactory(this.kafkaProperties.buildConsumerProperties());
        objectProvider.orderedStream().forEach(defaultKafkaConsumerFactoryCustomizer -> {
            defaultKafkaConsumerFactoryCustomizer.customize(defaultKafkaConsumerFactory);
        });
        return defaultKafkaConsumerFactory;
    }

    @Bean
    public ProducerFactory<?, ?> kafkaProducerFactory(ObjectProvider<DefaultKafkaProducerFactoryCustomizer> objectProvider) {
        DefaultKafkaProducerFactory defaultKafkaProducerFactory = new DefaultKafkaProducerFactory(this.kafkaProperties.buildProducerProperties());
        String transactionIdPrefix = this.kafkaProperties.getProducer().getTransactionIdPrefix();
        if (transactionIdPrefix != null) {
            defaultKafkaProducerFactory.setTransactionIdPrefix(transactionIdPrefix);
        }
        objectProvider.orderedStream().forEach(defaultKafkaProducerFactoryCustomizer -> {
            defaultKafkaProducerFactoryCustomizer.customize(defaultKafkaProducerFactory);
        });
        return defaultKafkaProducerFactory;
    }

    @Bean
    public AdminClient getAdminClient(KafkaProperties kafkaProperties) {
        return AdminClient.create(kafkaProperties.buildAdminProperties());
    }
}
