package com.holly.unit.kafka.impl;

import cn.hutool.core.util.StrUtil;
import com.holly.unit.kafka.KafkaMetaApi;
import com.holly.unit.kafka.config.KafkaConfig;
import com.holly.unit.kafka.exception.KafkaException;
import com.holly.unit.kafka.exception.enums.KafkaExceptionEnum;
import com.holly.unit.kafka.model.Ktopic;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicPartitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/holly/unit/kafka/impl/KafkaMetaApiImpl.class */
public class KafkaMetaApiImpl implements KafkaMetaApi {
    private AdminClient adminClient;
    private static final Logger log = LoggerFactory.getLogger(KafkaMetaApiImpl.class);
    private static volatile String bootstrapServers = null;

    public KafkaMetaApiImpl(KafkaConfig kafkaConfig) {
        if (bootstrapServers == null) {
            synchronized (getClass()) {
                if (bootstrapServers == null) {
                    bootstrapServers = kafkaConfig.getBootstrapServers();
                }
            }
        }
    }

    public void createTopic(Collection<Ktopic> collection) throws KafkaException {
        try {
            ArrayList arrayList = new ArrayList();
            for (Ktopic ktopic : collection) {
                arrayList.add(new NewTopic(ktopic.getName(), ktopic.getNumPartitions(), (short) ktopic.getReplicationFactor()));
            }
            getAdminClient().createTopics(arrayList).all().get();
        } catch (Exception e) {
            log.error("新增topic失败：{}", e);
            throw new KafkaException(KafkaExceptionEnum.KAFKA_MQ_METADATA_ERROR.getErrorCode(), StrUtil.format(KafkaExceptionEnum.KAFKA_MQ_METADATA_ERROR.getUserTip(), new Object[]{"新增topic失败", e.getMessage()}));
        }
    }

    private synchronized AdminClient getAdminClient() {
        if (this.adminClient == null) {
            HashMap hashMap = new HashMap(1);
            hashMap.put("bootstrap.servers", bootstrapServers);
            this.adminClient = KafkaAdminClient.create(hashMap);
        }
        return this.adminClient;
    }

    public void deleteTopic(Collection<String> collection) throws KafkaException {
        try {
            getAdminClient().deleteTopics(collection).all().get();
        } catch (Exception e) {
            log.error("删除topic失败：{}", e);
            throw new KafkaException(KafkaExceptionEnum.KAFKA_MQ_METADATA_ERROR.getErrorCode(), StrUtil.format(KafkaExceptionEnum.KAFKA_MQ_METADATA_ERROR.getUserTip(), new Object[]{"删除topic失败", e.getMessage()}));
        }
    }

    public String getTopicInfo(Collection<String> collection) throws KafkaException {
        AtomicReference atomicReference = new AtomicReference("");
        try {
            ((Map) getAdminClient().describeTopics(collection).all().get()).forEach((str, topicDescription) -> {
                Iterator it = topicDescription.partitions().iterator();
                while (it.hasNext()) {
                    atomicReference.set(atomicReference + ((TopicPartitionInfo) it.next()).toString() + "\n");
                }
            });
            return (String) atomicReference.get();
        } catch (Exception e) {
            log.error("获取topic失败：{}", e);
            throw new KafkaException(KafkaExceptionEnum.KAFKA_MQ_METADATA_ERROR.getErrorCode(), StrUtil.format(KafkaExceptionEnum.KAFKA_MQ_METADATA_ERROR.getUserTip(), new Object[]{"获取topic详情失败"}));
        }
    }

    public List<String> getAllTopic() throws KafkaException {
        new ArrayList();
        try {
            return (List) ((Collection) getAdminClient().listTopics().listings().get()).stream().map((v0) -> {
                return v0.name();
            }).collect(Collectors.toList());
        } catch (Exception e) {
            log.error("获取全部topic失败：{}", e);
            throw new KafkaException(KafkaExceptionEnum.KAFKA_MQ_METADATA_ERROR.getErrorCode(), StrUtil.format(KafkaExceptionEnum.KAFKA_MQ_METADATA_ERROR.getUserTip(), new Object[]{"获取全部topic失败"}));
        }
    }
}
