package org.reactivecommons.async.kafka.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.reactivecommons.api.domain.DomainEventBus;
import org.reactivecommons.async.api.DefaultCommandHandler;
import org.reactivecommons.async.api.HandlerRegistry;
import org.reactivecommons.async.commons.DLQDiscardNotifier;
import org.reactivecommons.async.commons.DiscardNotifier;
import org.reactivecommons.async.commons.HandlerResolverBuilder;
import org.reactivecommons.async.commons.converters.MessageConverter;
import org.reactivecommons.async.commons.converters.json.DefaultObjectMapperSupplier;
import org.reactivecommons.async.commons.converters.json.ObjectMapperSupplier;
import org.reactivecommons.async.commons.ext.CustomReporter;
import org.reactivecommons.async.commons.ext.DefaultCustomReporter;
import org.reactivecommons.async.kafka.KafkaDomainEventBus;
import org.reactivecommons.async.kafka.communications.ReactiveMessageListener;
import org.reactivecommons.async.kafka.communications.ReactiveMessageSender;
import org.reactivecommons.async.kafka.communications.topology.KafkaCustomizations;
import org.reactivecommons.async.kafka.communications.topology.TopologyCreator;
import org.reactivecommons.async.kafka.config.props.AsyncKafkaProps;
import org.reactivecommons.async.kafka.config.props.AsyncKafkaPropsDomain;
import org.reactivecommons.async.kafka.config.props.AsyncKafkaPropsDomainProperties;
import org.reactivecommons.async.kafka.converters.json.KafkaJacksonMessageConverter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.ssl.SslBundles;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;

@EnableConfigurationProperties({KafkaPropertiesAutoConfig.class, AsyncKafkaPropsDomainProperties.class})
@Configuration
@Import({AsyncKafkaPropsDomain.class})
/* loaded from: input_file:org/reactivecommons/async/kafka/config/RCKafkaConfig.class */
public class RCKafkaConfig {
    @Bean
    public ConnectionManager kafkaConnectionManager(AsyncKafkaPropsDomain asyncKafkaPropsDomain, MessageConverter messageConverter, KafkaCustomizations kafkaCustomizations, SslBundles sslBundles) {
        ConnectionManager connectionManager = new ConnectionManager();
        asyncKafkaPropsDomain.forEach((str, asyncKafkaProps) -> {
            TopologyCreator createTopologyCreator = createTopologyCreator(asyncKafkaProps, kafkaCustomizations, sslBundles);
            connectionManager.addDomain(str, createMessageListener(asyncKafkaProps, sslBundles), createMessageSender(asyncKafkaProps, messageConverter, createTopologyCreator, sslBundles), createTopologyCreator);
            connectionManager.setDiscardNotifier(str, new DLQDiscardNotifier(new KafkaDomainEventBus(connectionManager.getSender(str)), messageConverter));
        });
        return connectionManager;
    }

    @Bean
    public DomainHandlers buildHandlers(AsyncKafkaPropsDomain asyncKafkaPropsDomain, ApplicationContext applicationContext, HandlerRegistry handlerRegistry, DefaultCommandHandler<?> defaultCommandHandler) {
        DomainHandlers domainHandlers = new DomainHandlers();
        Map beansOfType = applicationContext.getBeansOfType(HandlerRegistry.class);
        if (!beansOfType.containsValue(handlerRegistry)) {
            beansOfType.put("primaryHandlerRegistry", handlerRegistry);
        }
        asyncKafkaPropsDomain.forEach((str, asyncKafkaProps) -> {
            domainHandlers.add(str, HandlerResolverBuilder.buildResolver(str, beansOfType, defaultCommandHandler));
        });
        return domainHandlers;
    }

    @ConditionalOnMissingBean({DomainEventBus.class})
    @Bean
    public DomainEventBus kafkaDomainEventBus(ConnectionManager connectionManager) {
        return new KafkaDomainEventBus(connectionManager.getSender("app"));
    }

    private static ReactiveMessageSender createMessageSender(AsyncKafkaProps asyncKafkaProps, MessageConverter messageConverter, TopologyCreator topologyCreator, SslBundles sslBundles) {
        KafkaProperties m0getConnectionProperties = asyncKafkaProps.m0getConnectionProperties();
        m0getConnectionProperties.setClientId(asyncKafkaProps.getAppName());
        m0getConnectionProperties.getProducer().setKeySerializer(StringSerializer.class);
        m0getConnectionProperties.getProducer().setValueSerializer(ByteArraySerializer.class);
        return new ReactiveMessageSender(KafkaSender.create(SenderOptions.create(m0getConnectionProperties.buildProducerProperties(sslBundles))), messageConverter, topologyCreator);
    }

    private static ReactiveMessageListener createMessageListener(AsyncKafkaProps asyncKafkaProps, SslBundles sslBundles) {
        KafkaProperties m0getConnectionProperties = asyncKafkaProps.m0getConnectionProperties();
        m0getConnectionProperties.getConsumer().setKeyDeserializer(StringDeserializer.class);
        m0getConnectionProperties.getConsumer().setValueDeserializer(ByteArrayDeserializer.class);
        return new ReactiveMessageListener(ReceiverOptions.create(m0getConnectionProperties.buildConsumerProperties(sslBundles)));
    }

    private static TopologyCreator createTopologyCreator(AsyncKafkaProps asyncKafkaProps, KafkaCustomizations kafkaCustomizations, SslBundles sslBundles) {
        return new TopologyCreator(AdminClient.create(asyncKafkaProps.m0getConnectionProperties().buildAdminProperties(sslBundles)), kafkaCustomizations, asyncKafkaProps.getCheckExistingTopics().booleanValue());
    }

    @ConditionalOnMissingBean({KafkaCustomizations.class})
    @Bean
    public KafkaCustomizations defaultKafkaCustomizations() {
        return new KafkaCustomizations();
    }

    @ConditionalOnMissingBean({MessageConverter.class})
    @Bean
    public MessageConverter kafkaJacksonMessageConverter(ObjectMapperSupplier objectMapperSupplier) {
        return new KafkaJacksonMessageConverter((ObjectMapper) objectMapperSupplier.get());
    }

    @ConditionalOnMissingBean({DiscardNotifier.class})
    @Bean
    public DiscardNotifier kafkaDiscardNotifier(DomainEventBus domainEventBus, MessageConverter messageConverter) {
        return new DLQDiscardNotifier(domainEventBus, messageConverter);
    }

    @ConditionalOnMissingBean({ObjectMapperSupplier.class})
    @Bean
    public ObjectMapperSupplier defaultObjectMapperSupplier() {
        return new DefaultObjectMapperSupplier();
    }

    @ConditionalOnMissingBean({CustomReporter.class})
    @Bean
    public CustomReporter defaultKafkaCustomReporter() {
        return new DefaultCustomReporter();
    }

    @ConditionalOnMissingBean({AsyncKafkaPropsDomain.KafkaSecretFiller.class})
    @Bean
    public AsyncKafkaPropsDomain.KafkaSecretFiller defaultKafkaSecretFiller() {
        return (str, genericAsyncProps) -> {
        };
    }

    @ConditionalOnMissingBean({KafkaProperties.class})
    @Bean
    public KafkaProperties defaultKafkaProperties(KafkaPropertiesAutoConfig kafkaPropertiesAutoConfig, ObjectMapperSupplier objectMapperSupplier) {
        return (KafkaProperties) ((ObjectMapper) objectMapperSupplier.get()).convertValue(kafkaPropertiesAutoConfig, KafkaProperties.class);
    }

    @ConditionalOnMissingBean({DefaultCommandHandler.class})
    @Bean
    public DefaultCommandHandler<?> defaultCommandHandler() {
        return command -> {
            return Mono.empty();
        };
    }

    public static KafkaProperties readPropsFromDotEnv(Path path) throws IOException {
        String[] split = Files.readString(path).split("\n");
        KafkaProperties kafkaProperties = new KafkaProperties();
        Map<String, String> properties = kafkaProperties.getProperties();
        for (String str : split) {
            if (!str.startsWith("#")) {
                String[] split2 = str.split("=", 2);
                properties.put(split2[0], split2[1]);
            }
        }
        return kafkaProperties;
    }

    public static String jassConfig(String str, String str2) {
        return String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", str, str2);
    }
}
