package com.jeesuite.kafka.consumer;

import com.jeesuite.kafka.handler.MessageHandler;
import com.jeesuite.kafka.message.DefaultMessage;
import com.jeesuite.kafka.monitor.ZkConsumerCommand;
import com.jeesuite.kafka.monitor.model.TopicPartitionInfo;
import com.jeesuite.kafka.serializer.MessageDecoder;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/jeesuite/kafka/consumer/OldApiTopicConsumer.class */
public class OldApiTopicConsumer extends AbstractTopicConsumer implements TopicConsumer {
    private static final Logger logger = LoggerFactory.getLogger(OldApiTopicConsumer.class);
    private ConsumerConnector connector;
    private Deserializer<Object> deserializer;

    /* loaded from: input_file:com/jeesuite/kafka/consumer/OldApiTopicConsumer$MessageProcessor.class */
    class MessageProcessor implements Runnable {
        KafkaStream<String, Object> stream;
        private String topicName;
        private MessageHandler messageHandler;
        private String processorName;

        public MessageProcessor(String str, KafkaStream<String, Object> kafkaStream) {
            this.stream = kafkaStream;
            this.topicName = str;
            this.messageHandler = OldApiTopicConsumer.this.consumerContext.getMessageHandlers().get(str);
            this.processorName = this.messageHandler.getClass().getName();
        }

        @Override // java.lang.Runnable
        public void run() {
            DefaultMessage defaultMessage;
            OldApiTopicConsumer.logger.info("MessageProcessor [{}] start, topic:{}", Thread.currentThread().getName(), this.topicName);
            ConsumerIterator it = this.stream.iterator();
            while (it.hasNext()) {
                try {
                    MessageAndMetadata<String, Object> next = it.next();
                    Object message = next.message();
                    try {
                        defaultMessage = (DefaultMessage) message;
                    } catch (ClassCastException e) {
                        defaultMessage = new DefaultMessage((String) next.key(), (Serializable) message);
                    }
                    defaultMessage.setTopicMetadata(next.topic(), next.partition(), next.offset());
                    OldApiTopicConsumer.this.consumerContext.updateConsumerStats(next.topic(), 1);
                    OldApiTopicConsumer.this.consumerContext.saveOffsetsBeforeProcessed(next.topic(), next.partition(), next.offset());
                    this.messageHandler.p1Process(defaultMessage);
                    submitMessageToProcess(this.topicName, next, defaultMessage);
                } catch (Exception e2) {
                    OldApiTopicConsumer.logger.error("received_topic_error,topic:" + this.topicName, e2);
                }
                while (!OldApiTopicConsumer.this.consumerContext.fetchEnabled()) {
                    try {
                        Thread.sleep(1000L);
                    } catch (Exception e3) {
                    }
                }
                while (OldApiTopicConsumer.this.defaultProcessExecutor.getMaximumPoolSize() <= OldApiTopicConsumer.this.defaultProcessExecutor.getSubmittedTasksCount()) {
                    try {
                        Thread.sleep(100L);
                    } catch (Exception e4) {
                    }
                }
            }
        }

        private void submitMessageToProcess(final String str, final MessageAndMetadata<String, Object> messageAndMetadata, final DefaultMessage defaultMessage) {
            (defaultMessage.isConsumerAckRequired() ? OldApiTopicConsumer.this.highProcessExecutor : OldApiTopicConsumer.this.defaultProcessExecutor).submit(new Runnable() { // from class: com.jeesuite.kafka.consumer.OldApiTopicConsumer.MessageProcessor.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        long currentTimeMillis = OldApiTopicConsumer.logger.isDebugEnabled() ? System.currentTimeMillis() : 0L;
                        MessageProcessor.this.messageHandler.p2Process(defaultMessage);
                        if (OldApiTopicConsumer.logger.isDebugEnabled()) {
                            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                            if (currentTimeMillis2 > 1000) {
                                OldApiTopicConsumer.logger.debug("received_topic_useTime [{}]process topic:{} use time {} ms", new Object[]{MessageProcessor.this.processorName, str, Long.valueOf(currentTimeMillis2)});
                            }
                        }
                        if (defaultMessage.isConsumerAckRequired()) {
                            OldApiTopicConsumer.this.consumerContext.sendConsumerAck(defaultMessage.getMsgId());
                        }
                        OldApiTopicConsumer.this.consumerContext.saveOffsetsAfterProcessed(messageAndMetadata.topic(), messageAndMetadata.partition(), messageAndMetadata.offset());
                    } catch (Exception e) {
                        if (!MessageProcessor.this.messageHandler.onProcessError(defaultMessage)) {
                            OldApiTopicConsumer.this.consumerContext.processErrorMessage(str, defaultMessage);
                        }
                        OldApiTopicConsumer.logger.error("received_topic_process_error [" + MessageProcessor.this.processorName + "]processMessage error,topic:" + str, e);
                    }
                    OldApiTopicConsumer.this.consumerContext.updateConsumerStats(messageAndMetadata.topic(), -1);
                }
            });
        }
    }

    public OldApiTopicConsumer(ConsumerContext consumerContext) {
        super(consumerContext);
        try {
            this.deserializer = (Deserializer) Class.forName(consumerContext.getProperties().getProperty("value.deserializer")).newInstance();
        } catch (Exception e) {
        }
        this.connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerContext.getProperties()));
    }

    @Override // com.jeesuite.kafka.consumer.TopicConsumer
    public void start() {
        if (this.consumerContext.getOffsetLogHanlder() != null) {
            resetCorrectOffsets();
        }
        HashMap hashMap = new HashMap();
        for (String str : this.consumerContext.getMessageHandlers().keySet()) {
            hashMap.put(str, 1);
            logger.info("topic[{}] assign fetch Threads {}", str, 1);
        }
        Map createMessageStreams = this.connector.createMessageStreams(hashMap, new StringDecoder(new VerifiableProperties()), new MessageDecoder(this.deserializer));
        for (String str2 : this.consumerContext.getMessageHandlers().keySet()) {
            Iterator it = ((List) createMessageStreams.get(str2)).iterator();
            while (it.hasNext()) {
                this.fetchExecutor.execute(new MessageProcessor(str2, (KafkaStream) it.next()));
            }
        }
        this.runing.set(true);
    }

    private void resetCorrectOffsets() {
        String property = this.consumerContext.getProperties().getProperty("bootstrap.servers");
        String property2 = this.consumerContext.getProperties().getProperty("zookeeper.connect");
        if (StringUtils.isAnyBlank(new CharSequence[]{property, property2})) {
            logger.warn("resetCorrectOffsets exit。Please check [bootstrap.servers] and [zookeeper.connect] is existing");
            return;
        }
        ZkConsumerCommand zkConsumerCommand = new ZkConsumerCommand(property2, property);
        try {
            for (String str : zkConsumerCommand.getSubscribeTopics(this.consumerContext.getGroupId())) {
                for (TopicPartitionInfo topicPartitionInfo : zkConsumerCommand.getTopicOffsets(this.consumerContext.getGroupId(), str)) {
                    long latestProcessedOffsets = this.consumerContext.getLatestProcessedOffsets(str, topicPartitionInfo.getPartition());
                    if (latestProcessedOffsets >= 0 && latestProcessedOffsets < topicPartitionInfo.getOffset()) {
                        zkConsumerCommand.resetTopicOffsets(this.consumerContext.getGroupId(), str, topicPartitionInfo.getPartition(), latestProcessedOffsets);
                        logger.info("seek Topic[{}] partition[{}] from {} to {}", new Object[]{str, Integer.valueOf(topicPartitionInfo.getPartition()), Long.valueOf(topicPartitionInfo.getOffset()), Long.valueOf(latestProcessedOffsets)});
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        zkConsumerCommand.close();
    }

    @Override // com.jeesuite.kafka.consumer.AbstractTopicConsumer, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.runing.get()) {
            this.connector.commitOffsets();
            this.connector.shutdown();
            this.consumerContext.switchFetch(true);
            super.close();
            logger.info("KafkaTopicSubscriber shutdown ok...");
        }
    }
}
