package com.jeesuite.kafka.consumer;

import com.jeesuite.common.util.ResourceUtils;
import com.jeesuite.kafka.handler.MessageHandler;
import com.jeesuite.kafka.message.DefaultMessage;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/jeesuite/kafka/consumer/NewApiTopicConsumer.class */
public class NewApiTopicConsumer extends AbstractTopicConsumer implements TopicConsumer {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerWorker.class);
    private Map<String, MessageHandler> topicHandlers;
    private List<ConsumerWorker> consumerWorks;
    private boolean offsetAutoCommit;
    private Properties properties;
    private String clientIdPrefix;
    private long pollTimeout;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/jeesuite/kafka/consumer/NewApiTopicConsumer$ConsumerWorker.class */
    public class ConsumerWorker implements Runnable {
        KafkaConsumer<String, Serializable> consumer;
        private AtomicBoolean closed = new AtomicBoolean();
        private AtomicBoolean commiting = new AtomicBoolean(false);
        private AtomicInteger uncommittedNums = new AtomicInteger(0);
        private List<TopicPartition> assigndPartitions = new ArrayList();
        private Map<TopicPartition, OffsetAndMetadata> uncommittedOffsetMap = new ConcurrentHashMap();

        public ConsumerWorker(String str, int i) {
            NewApiTopicConsumer.this.properties.setProperty("client.id", NewApiTopicConsumer.this.clientIdPrefix + "_" + i);
            this.consumer = new KafkaConsumer<>(NewApiTopicConsumer.this.properties);
        }

        public boolean isCommiting() {
            return this.commiting.get();
        }

        public void setCommiting(boolean z) {
            this.commiting.set(z);
        }

        public void addPartitions(TopicPartition topicPartition, long j) {
            this.assigndPartitions.add(topicPartition);
            this.uncommittedOffsetMap.put(topicPartition, new OffsetAndMetadata(j));
            this.uncommittedNums.incrementAndGet();
        }

        public void resetUncommittedOffsetMap() {
            this.uncommittedOffsetMap.clear();
            this.uncommittedNums.set(0);
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!this.closed.get()) {
                if (this.uncommittedNums.get() > 0) {
                    NewApiTopicConsumer.this.commitOffsets(this);
                }
                while (!NewApiTopicConsumer.this.consumerContext.fetchEnabled()) {
                    try {
                        Thread.sleep(1000L);
                    } catch (Exception e) {
                    }
                }
                while (NewApiTopicConsumer.this.defaultProcessExecutor.getMaximumPoolSize() <= NewApiTopicConsumer.this.defaultProcessExecutor.getSubmittedTasksCount()) {
                    try {
                        Thread.sleep(100L);
                    } catch (Exception e2) {
                    }
                }
                ConsumerRecords poll = this.consumer.poll(NewApiTopicConsumer.this.pollTimeout);
                if (!poll.isEmpty()) {
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        processConsumerRecords((ConsumerRecord) it.next());
                    }
                    if (!NewApiTopicConsumer.this.offsetAutoCommit) {
                    }
                }
            }
            this.consumer.close();
            NewApiTopicConsumer.logger.info("consumer exited");
        }

        private void processConsumerRecords(final ConsumerRecord<String, Serializable> consumerRecord) {
            final DefaultMessage defaultMessage = consumerRecord.value() instanceof DefaultMessage ? (DefaultMessage) consumerRecord.value() : new DefaultMessage((String) consumerRecord.key(), (Serializable) consumerRecord.value());
            final MessageHandler messageHandler = (MessageHandler) NewApiTopicConsumer.this.topicHandlers.get(consumerRecord.topic());
            defaultMessage.setTopicMetadata(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset());
            NewApiTopicConsumer.this.consumerContext.updateConsumerStats(consumerRecord.topic(), 1);
            NewApiTopicConsumer.this.consumerContext.saveOffsetsBeforeProcessed(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset());
            messageHandler.p1Process(defaultMessage);
            (defaultMessage.isConsumerAckRequired() ? NewApiTopicConsumer.this.highProcessExecutor : NewApiTopicConsumer.this.defaultProcessExecutor).submit(new Runnable() { // from class: com.jeesuite.kafka.consumer.NewApiTopicConsumer.ConsumerWorker.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        messageHandler.p2Process(defaultMessage);
                        if (!NewApiTopicConsumer.this.offsetAutoCommit) {
                            ConsumerWorker.this.uncommittedOffsetMap.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1));
                            ConsumerWorker.this.uncommittedNums.incrementAndGet();
                        }
                        if (defaultMessage.isConsumerAckRequired()) {
                            NewApiTopicConsumer.this.consumerContext.sendConsumerAck(defaultMessage.getMsgId());
                        }
                        NewApiTopicConsumer.this.consumerContext.saveOffsetsAfterProcessed(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset());
                    } catch (Exception e) {
                        if (!messageHandler.onProcessError(defaultMessage)) {
                            NewApiTopicConsumer.this.consumerContext.processErrorMessage(consumerRecord.topic(), defaultMessage);
                        }
                        NewApiTopicConsumer.logger.error("[" + messageHandler.getClass().getSimpleName() + "] process Topic[" + consumerRecord.topic() + "] error", e);
                    }
                    NewApiTopicConsumer.this.consumerContext.updateConsumerStats(consumerRecord.topic(), -1);
                }
            });
        }

        public void close() {
            this.closed.set(true);
            this.consumer.close();
        }
    }

    public NewApiTopicConsumer(ConsumerContext consumerContext) {
        super(consumerContext);
        this.consumerWorks = new ArrayList();
        this.pollTimeout = 1000L;
        this.properties = consumerContext.getProperties();
        this.clientIdPrefix = this.properties.getProperty("client.id");
        this.topicHandlers = consumerContext.getMessageHandlers();
        this.offsetAutoCommit = !consumerContext.getProperties().containsKey("enable.auto.commit") || Boolean.parseBoolean(consumerContext.getProperties().getProperty("enable.auto.commit"));
        if (this.properties.containsKey("poll.timeout.ms")) {
            this.pollTimeout = Long.parseLong(this.properties.remove("poll.timeout.ms").toString());
        } else {
            this.pollTimeout = Long.parseLong(ResourceUtils.getProperty("consumer.poll.timeout.ms", "1000"));
        }
        logger.info("pollTimeout:" + this.pollTimeout);
    }

    @Override // com.jeesuite.kafka.consumer.TopicConsumer
    public void start() {
        ArrayList arrayList = new ArrayList(this.topicHandlers.keySet());
        for (int i = 0; i < arrayList.size(); i++) {
            ConsumerWorker consumerWorker = new ConsumerWorker((String) arrayList.get(i), i);
            subscribeTopic(consumerWorker, (String) arrayList.get(i));
            if (this.offsetAutoCommit && this.consumerContext.getOffsetLogHanlder() != null) {
                resetCorrectOffsets(consumerWorker);
            }
            this.consumerWorks.add(consumerWorker);
            this.fetchExecutor.submit(consumerWorker);
        }
    }

    private void resetCorrectOffsets(ConsumerWorker consumerWorker) {
        KafkaConsumer<String, Serializable> kafkaConsumer = consumerWorker.consumer;
        Map listTopics = kafkaConsumer.listTopics();
        Set<String> keySet = listTopics.keySet();
        ArrayList arrayList = new ArrayList(this.topicHandlers.keySet());
        kafkaConsumer.poll(200L);
        for (String str : keySet) {
            if (arrayList.contains(str)) {
                for (PartitionInfo partitionInfo : (List) listTopics.get(str)) {
                    try {
                        long latestProcessedOffsets = this.consumerContext.getLatestProcessedOffsets(str, partitionInfo.partition());
                        TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                        OffsetAndMetadata committed = kafkaConsumer.committed(topicPartition);
                        if (kafkaConsumer.assignment().contains(topicPartition) && latestProcessedOffsets >= 0 && latestProcessedOffsets < committed.offset()) {
                            kafkaConsumer.seek(topicPartition, latestProcessedOffsets);
                            logger.info(">>>>>>> seek Topic[{}] partition[{}] from {} to {}", new Object[]{str, Integer.valueOf(partitionInfo.partition()), Long.valueOf(committed.offset()), Long.valueOf(latestProcessedOffsets)});
                        }
                    } catch (Exception e) {
                        logger.warn("try seek topic[" + str + "] partition[" + partitionInfo.partition() + "] offsets error");
                    }
                }
            }
        }
        kafkaConsumer.resume(kafkaConsumer.assignment());
    }

    private <K extends Serializable, V extends Serializable> void subscribeTopic(final ConsumerWorker consumerWorker, String str) {
        ArrayList arrayList = new ArrayList(Arrays.asList(str));
        if (this.offsetAutoCommit) {
            consumerWorker.consumer.subscribe(arrayList);
        } else {
            consumerWorker.consumer.subscribe(arrayList, new ConsumerRebalanceListener() { // from class: com.jeesuite.kafka.consumer.NewApiTopicConsumer.1
                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                    consumerWorker.assigndPartitions.clear();
                    Iterator<TopicPartition> it = collection.iterator();
                    while (it.hasNext()) {
                        consumerWorker.assigndPartitions.add(it.next());
                    }
                }

                public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                    consumerWorker.assigndPartitions.clear();
                    for (TopicPartition topicPartition : collection) {
                        long j = 0;
                        if (NewApiTopicConsumer.this.consumerContext.getOffsetLogHanlder() != null) {
                            try {
                                j = NewApiTopicConsumer.this.consumerContext.getLatestProcessedOffsets(topicPartition.topic(), topicPartition.partition());
                                NewApiTopicConsumer.logger.info("offsetLogHanlder.getLatestProcessedOffsets({},{}) result is {}", new Object[]{topicPartition.topic(), Integer.valueOf(topicPartition.partition()), Long.valueOf(j)});
                            } catch (Exception e) {
                                NewApiTopicConsumer.logger.warn("offsetLogHanlder.getLatestProcessedOffsets error:{}", e.getMessage());
                            }
                        }
                        if (j > 0) {
                            consumerWorker.consumer.seek(topicPartition, j);
                            NewApiTopicConsumer.logger.info("topicPartion : {} seek offset : {}", topicPartition, Long.valueOf(j));
                        }
                    }
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void commitOffsets(final ConsumerWorker consumerWorker) {
        KafkaConsumer<String, Serializable> kafkaConsumer = consumerWorker.consumer;
        if (consumerWorker.isCommiting()) {
            return;
        }
        consumerWorker.setCommiting(true);
        if (consumerWorker.uncommittedOffsetMap.isEmpty()) {
            return;
        }
        logger.debug("committing the offsets : {}", consumerWorker.uncommittedOffsetMap);
        kafkaConsumer.commitAsync(consumerWorker.uncommittedOffsetMap, new OffsetCommitCallback() { // from class: com.jeesuite.kafka.consumer.NewApiTopicConsumer.2
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
                consumerWorker.setCommiting(false);
                if (exc != null) {
                    NewApiTopicConsumer.logger.error("committ the offsets error", exc);
                } else {
                    consumerWorker.resetUncommittedOffsetMap();
                    NewApiTopicConsumer.logger.debug("committed the offsets : {}", map);
                }
            }
        });
    }

    @Override // com.jeesuite.kafka.consumer.AbstractTopicConsumer, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.runing.get()) {
            for (int i = 0; i < this.consumerWorks.size(); i = (i - 1) + 1) {
                this.consumerWorks.get(i).close();
                this.consumerWorks.remove(i);
            }
            this.consumerContext.switchFetch(true);
            super.close();
        }
    }
}
