package com.jeesuite.kafka.spring;

import com.jeesuite.common.util.NodeNameHolder;
import com.jeesuite.common.util.ResourceUtils;
import com.jeesuite.kafka.KafkaConst;
import com.jeesuite.kafka.annotation.ConsumerHandler;
import com.jeesuite.kafka.consumer.ConsumerContext;
import com.jeesuite.kafka.consumer.ErrorMessageProcessor;
import com.jeesuite.kafka.consumer.NewApiTopicConsumer;
import com.jeesuite.kafka.consumer.OldApiTopicConsumer;
import com.jeesuite.kafka.consumer.TopicConsumer;
import com.jeesuite.kafka.consumer.hanlder.OffsetLogHanlder;
import com.jeesuite.kafka.consumer.hanlder.RetryErrorMessageHandler;
import com.jeesuite.kafka.handler.MessageHandler;
import com.jeesuite.kafka.serializer.KyroMessageDeserializer;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.PriorityOrdered;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.type.classreading.CachingMetadataReaderFactory;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:com/jeesuite/kafka/spring/TopicConsumerSpringProvider.class */
public class TopicConsumerSpringProvider implements InitializingBean, DisposableBean, ApplicationContextAware, PriorityOrdered {
    private static final Logger logger = LoggerFactory.getLogger(TopicConsumerSpringProvider.class);
    private ApplicationContext context;
    private TopicConsumer consumer;
    private Properties configs;
    private boolean independent;
    private String scanPackages;
    private String groupId;
    private String consumerId;
    private String routeEnv;
    private OffsetLogHanlder offsetLogHanlder;
    private RetryErrorMessageHandler retryErrorMessageHandler;
    private boolean useNewAPI = false;
    private Map<String, MessageHandler> topicHandlers = new HashMap();
    private int processThreads = 200;
    private AtomicInteger status = new AtomicInteger(0);

    public void afterPropertiesSet() throws Exception {
        if (StringUtils.isNotBlank(this.scanPackages)) {
            scanAndRegisterAnnotationTopics(org.springframework.util.StringUtils.tokenizeToStringArray(this.scanPackages, ",; \t\n"));
        }
        Validate.isTrue(this.topicHandlers != null && this.topicHandlers.size() > 0, "at latest one topic", new Object[0]);
        if (this.status.get() > 0) {
            return;
        }
        this.routeEnv = StringUtils.trimToNull(ResourceUtils.getProperty(KafkaConst.PROP_ENV_ROUTE));
        if (this.routeEnv != null) {
            logger.info("current route Env value is:", this.routeEnv);
            HashMap hashMap = new HashMap();
            for (String str : this.topicHandlers.keySet()) {
                hashMap.put(this.routeEnv + "." + str, this.topicHandlers.get(str));
            }
            this.topicHandlers = hashMap;
        }
        this.configs.put("rebalance.max.retries", "5");
        this.configs.put("rebalance.backoff.ms", "1205");
        this.configs.put("zookeeper.session.timeout.ms", "6000");
        this.configs.put("key.deserializer", StringDeserializer.class.getName());
        if (!this.configs.containsKey("value.deserializer")) {
            this.configs.put("value.deserializer", KyroMessageDeserializer.class.getName());
        }
        if (!this.useNewAPI) {
            this.configs.put("enable.auto.commit", "true");
        } else if ("smallest".equals(this.configs.getProperty("auto.offset.reset"))) {
            this.configs.put("auto.offset.reset", "earliest");
        } else if ("largest".equals(this.configs.getProperty("auto.offset.reset"))) {
            this.configs.put("auto.offset.reset", "latest");
        }
        this.groupId = this.configs.get("group.id").toString();
        logger.info("\n===============KAFKA Consumer group[{}] begin start=================\n", this.groupId);
        this.consumerId = NodeNameHolder.getNodeId();
        this.configs.put("consumer.id", this.consumerId);
        this.consumerId = this.groupId + "_" + this.consumerId;
        if (!this.configs.containsKey("client.id")) {
            this.configs.put("client.id", this.consumerId);
        }
        start();
        logger.info("\n===============KAFKA Consumer group[{}],consumerId[{}] start finished!!=================\n", this.groupId, this.consumerId);
    }

    private void start() {
        if (!this.independent) {
            registerKafkaSubscriber();
        } else {
            logger.info("KAFKA 启动模式[independent]");
            new Thread(new Runnable() { // from class: com.jeesuite.kafka.spring.TopicConsumerSpringProvider.1
                @Override // java.lang.Runnable
                public void run() {
                    TopicConsumerSpringProvider.this.registerKafkaSubscriber();
                }
            }).start();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void registerKafkaSubscriber() {
        this.status.set(1);
        Validate.notEmpty(this.configs, "configs is required", new Object[0]);
        Validate.notEmpty(this.configs.getProperty("group.id"), "kafka configs[group.id] is required", new Object[0]);
        Validate.notEmpty(this.configs.getProperty("bootstrap.servers"), "kafka configs[bootstrap.servers] is required", new Object[0]);
        StringBuffer stringBuffer = new StringBuffer();
        for (Map.Entry entry : this.configs.entrySet()) {
            stringBuffer.append(entry.getKey()).append("  =  ").append(entry.getValue()).append("\n");
        }
        logger.info("\n============kafka.Consumer.Config============\n" + stringBuffer.toString() + "\n");
        ConsumerContext consumerContext = ConsumerContext.getInstance();
        consumerContext.propertiesSetIfAbsent(this.configs, this.groupId, this.consumerId, this.topicHandlers, this.processThreads, this.offsetLogHanlder, new ErrorMessageProcessor(1, 10, 3, this.retryErrorMessageHandler));
        if (this.useNewAPI) {
            this.consumer = new NewApiTopicConsumer(consumerContext);
        } else {
            this.consumer = new OldApiTopicConsumer(consumerContext);
        }
        this.consumer.start();
        this.status.set(2);
    }

    public void setConfigs(Properties properties) {
        this.configs = properties;
    }

    public void setTopicHandlers(Map<String, MessageHandler> map) {
        this.topicHandlers = map;
    }

    public void setIndependent(boolean z) {
        this.independent = z;
    }

    public void setProcessThreads(int i) {
        this.processThreads = i;
    }

    public void setUseNewAPI(boolean z) {
        this.useNewAPI = z;
    }

    public void setOffsetLogHanlder(OffsetLogHanlder offsetLogHanlder) {
        this.offsetLogHanlder = offsetLogHanlder;
    }

    public void setScanPackages(String str) {
        this.scanPackages = str;
    }

    public void setRetryErrorMessageHandler(RetryErrorMessageHandler retryErrorMessageHandler) {
        this.retryErrorMessageHandler = retryErrorMessageHandler;
    }

    public void destroy() throws Exception {
        this.consumer.close();
    }

    private void scanAndRegisterAnnotationTopics(String[] strArr) {
        PathMatchingResourcePatternResolver pathMatchingResourcePatternResolver = new PathMatchingResourcePatternResolver();
        for (String str : strArr) {
            logger.info(">>begin scan package [{}] with Annotation[ConsumerHandler] MessageHanlder ", str);
            try {
                Resource[] resources = pathMatchingResourcePatternResolver.getResources("classpath*:" + ClassUtils.convertClassNameToResourcePath(str) + "/**/*.class");
                CachingMetadataReaderFactory cachingMetadataReaderFactory = new CachingMetadataReaderFactory(pathMatchingResourcePatternResolver);
                for (Resource resource : resources) {
                    if (resource.isReadable()) {
                        Class<?> cls = Class.forName(cachingMetadataReaderFactory.getMetadataReader(resource).getClassMetadata().getClassName());
                        if (cls.isAnnotationPresent(ConsumerHandler.class)) {
                            ConsumerHandler consumerHandler = (ConsumerHandler) cls.getAnnotation(ConsumerHandler.class);
                            MessageHandler messageHandler = (MessageHandler) this.context.getBean(cls);
                            if (!this.topicHandlers.containsKey(consumerHandler.topic())) {
                                this.topicHandlers.put(consumerHandler.topic(), messageHandler);
                                logger.info("register new MessageHandler:{}-{}", consumerHandler.topic(), cls.getName());
                            }
                        }
                    }
                }
                logger.info("<<scan package[" + str + "] finished!");
            } catch (Exception e) {
                if (e instanceof NoSuchBeanDefinitionException) {
                    throw e;
                }
                logger.error("<<scan package[" + str + "] error", e);
            }
        }
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.context = applicationContext;
    }

    public int getOrder() {
        return Integer.MIN_VALUE;
    }
}
