package easier.framework.starter.mq.kafka;

import cn.hutool.core.collection.CollUtil;
import easier.framework.core.plugin.cache.CacheBuilderException;
import easier.framework.core.util.StrUtil;
import easier.framework.starter.mq.EasierMQProperties;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nonnull;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;

/* loaded from: input_file:easier/framework/starter/mq/kafka/KafkaConsumers.class */
public class KafkaConsumers implements DisposableBean {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumers.class);
    private final Map<String, List<KafkaConsumer<String, byte[]>>> consumers = new HashMap();
    private final Map<String, EasierMQProperties.ConsumerProperties> consumerProperties = new HashMap();
    private String primary;

    @Nonnull
    public synchronized KafkaConsumer<String, byte[]> create() {
        if (StrUtil.isBlank(this.primary)) {
            throw CacheBuilderException.of("未配置主kafka消费者", new Object[0]);
        }
        KafkaConsumer<String, byte[]> create = create(this.primary);
        if (this.consumers.containsKey(this.primary)) {
            this.consumers.get(this.primary).add(create);
        } else {
            this.consumers.put(this.primary, CollUtil.newArrayList(new KafkaConsumer[]{create}));
        }
        return create;
    }

    @Nonnull
    public synchronized KafkaConsumer<String, byte[]> create(String str) {
        if (StrUtil.isBlank(str)) {
            return create();
        }
        EasierMQProperties.ConsumerProperties consumerProperties = this.consumerProperties.get(str);
        if (consumerProperties == null) {
            throw CacheBuilderException.of("无法获取kafka消费者配置:{}", new Object[]{str});
        }
        KafkaConsumer<String, byte[]> createConsumer = createConsumer(consumerProperties);
        if (this.consumers.containsKey(str)) {
            this.consumers.get(str).add(createConsumer);
        } else {
            this.consumers.put(str, CollUtil.newArrayList(new KafkaConsumer[]{createConsumer}));
            log.info("已加载kafka消费者【{}】", str);
        }
        return createConsumer;
    }

    public void destroy() {
        this.consumers.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).forEach((v0) -> {
            v0.close();
        });
    }

    public void init(EasierMQProperties easierMQProperties) {
        List<String> smartSplit = StrUtil.smartSplit(easierMQProperties.getEnableKafkaConsumer());
        if (CollUtil.isEmpty(smartSplit)) {
            return;
        }
        this.primary = (String) CollUtil.getFirst(smartSplit);
        for (String str : smartSplit) {
            EasierMQProperties.ConsumerProperties consumerProperties = easierMQProperties.getKafkaConsumer().get(str);
            if (consumerProperties == null) {
                throw CacheBuilderException.of("无法获取kafka消费者配置:{}", new Object[]{str});
            }
            this.consumerProperties.put(str, consumerProperties);
            log.info("已加载kafka消费者配置【{}】", str);
            Iterator it = StrUtil.smartSplit(consumerProperties.getAlias()).iterator();
            while (it.hasNext()) {
                log.info("已加载kafka消费者配置【{}】", (String) it.next());
            }
        }
    }

    private KafkaConsumer<String, byte[]> createConsumer(EasierMQProperties.ConsumerProperties consumerProperties) {
        Map<String, Object> properties = consumerProperties.getProperties();
        properties.put("bootstrap.servers", consumerProperties.getBootstrapServers());
        if (consumerProperties.getGroupId() != null) {
            properties.put("group.id", consumerProperties.getGroupId());
        }
        if (consumerProperties.getEnableAutoCommit() != null) {
            properties.put("enable.auto.commit", consumerProperties.getEnableAutoCommit());
        }
        if (consumerProperties.getMaxPollRecords() != null) {
            properties.put("max.poll.records", consumerProperties.getMaxPollRecords());
        }
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", ByteArrayDeserializer.class);
        String plainLoginModuleUsername = consumerProperties.getPlainLoginModuleUsername();
        String plainLoginModulePassword = consumerProperties.getPlainLoginModulePassword();
        if (StrUtil.isNotBlank(plainLoginModuleUsername) && StrUtil.isNotBlank(plainLoginModulePassword)) {
            properties.putIfAbsent("sasl.jaas.config", StrUtil.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{}\" password=\"{}\";", new Object[]{plainLoginModuleUsername, plainLoginModulePassword}));
            properties.putIfAbsent("security.protocol", SecurityProtocol.SASL_PLAINTEXT.name);
            properties.putIfAbsent("sasl.mechanism", "PLAIN");
        }
        return new KafkaConsumer<>(properties);
    }
}
