package com.jeesuite.kafka.consumer;

import com.jeesuite.kafka.handler.MessageHandler;
import com.jeesuite.kafka.message.DefaultMessage;
import com.jeesuite.kafka.thread.StandardThreadExecutor;
import java.io.Closeable;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/jeesuite/kafka/consumer/NewApiTopicConsumer.class */
public class NewApiTopicConsumer implements TopicConsumer, Closeable {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerWorker.class);
    private Map<String, MessageHandler> topicHandlers;
    private ExecutorService fetcheExecutor;
    private StandardThreadExecutor processExecutor;
    private List<ConsumerWorker> consumerWorks = new ArrayList();
    private ErrorMessageDefaultProcessor errorMessageProcessor = new ErrorMessageDefaultProcessor(1);
    private final Map<TopicPartition, Long> partitionToUncommittedOffsetMap = new ConcurrentHashMap();
    private boolean offsetAutoCommit;
    private ConsumerContext consumerContext;
    private Properties properties;
    private String clientIdPrefix;

    /* loaded from: input_file:com/jeesuite/kafka/consumer/NewApiTopicConsumer$ConsumerWorker.class */
    private class ConsumerWorker implements Runnable {
        private AtomicBoolean closed = new AtomicBoolean();
        KafkaConsumer<String, Serializable> consumer;

        public ConsumerWorker(String str, int i) {
            NewApiTopicConsumer.this.properties.setProperty("client.id", NewApiTopicConsumer.this.clientIdPrefix + "_" + i);
            this.consumer = new KafkaConsumer<>(NewApiTopicConsumer.this.properties);
            NewApiTopicConsumer.this.subscribeTopic(this.consumer, str);
            if (!NewApiTopicConsumer.this.offsetAutoCommit || NewApiTopicConsumer.this.consumerContext.getOffsetLogHanlder() == null) {
                return;
            }
            NewApiTopicConsumer.this.resetCorrectOffsets(this.consumer);
        }

        @Override // java.lang.Runnable
        public void run() {
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
            while (!this.closed.get()) {
                ConsumerRecords poll = this.consumer.poll(1500L);
                if (!poll.isEmpty()) {
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        processConsumerRecords((ConsumerRecord) it.next());
                    }
                }
            }
            try {
                newFixedThreadPool.shutdownNow();
                do {
                } while (!newFixedThreadPool.awaitTermination(5L, TimeUnit.SECONDS));
            } catch (InterruptedException e) {
                NewApiTopicConsumer.logger.error("Error while exiting the consumer");
            }
            this.consumer.close();
            NewApiTopicConsumer.logger.info("consumer exited");
        }

        private void processConsumerRecords(final ConsumerRecord<String, Serializable> consumerRecord) {
            final MessageHandler messageHandler = (MessageHandler) NewApiTopicConsumer.this.topicHandlers.get(consumerRecord.topic());
            NewApiTopicConsumer.this.consumerContext.saveOffsetsBeforeProcessed(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset());
            final DefaultMessage defaultMessage = consumerRecord.value() instanceof DefaultMessage ? (DefaultMessage) consumerRecord.value() : new DefaultMessage((Serializable) consumerRecord.value());
            messageHandler.p1Process(defaultMessage);
            NewApiTopicConsumer.this.processExecutor.submit(new Runnable() { // from class: com.jeesuite.kafka.consumer.NewApiTopicConsumer.ConsumerWorker.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        messageHandler.p2Process(defaultMessage);
                        if (!NewApiTopicConsumer.this.offsetAutoCommit) {
                            NewApiTopicConsumer.this.partitionToUncommittedOffsetMap.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), Long.valueOf(consumerRecord.offset()));
                            NewApiTopicConsumer.this.commitOffsets(ConsumerWorker.this.consumer);
                        }
                        NewApiTopicConsumer.this.consumerContext.saveOffsetsAfterProcessed(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset());
                    } catch (Exception e) {
                        if (!messageHandler.onProcessError(defaultMessage)) {
                            NewApiTopicConsumer.this.errorMessageProcessor.submit(defaultMessage, messageHandler);
                        }
                        NewApiTopicConsumer.logger.error("[" + messageHandler.getClass().getSimpleName() + "] process Topic[" + consumerRecord.topic() + "] error", e);
                    }
                }
            });
        }

        public void close() {
            this.closed.set(true);
            this.consumer.close();
        }
    }

    public NewApiTopicConsumer(ConsumerContext consumerContext) {
        this.consumerContext = consumerContext;
        this.properties = consumerContext.getProperties();
        this.clientIdPrefix = this.properties.getProperty("client.id");
        this.topicHandlers = consumerContext.getMessageHandlers();
        this.offsetAutoCommit = !consumerContext.getProperties().containsKey("enable.auto.commit") || Boolean.parseBoolean(consumerContext.getProperties().getProperty("enable.auto.commit"));
        this.fetcheExecutor = Executors.newFixedThreadPool(this.topicHandlers.size());
        this.processExecutor = new StandardThreadExecutor(1, consumerContext.getMaxProcessThreads(), 1000);
    }

    @Override // com.jeesuite.kafka.consumer.TopicConsumer
    public void start() {
        ArrayList arrayList = new ArrayList(this.topicHandlers.keySet());
        for (int i = 0; i < arrayList.size(); i++) {
            ConsumerWorker consumerWorker = new ConsumerWorker((String) arrayList.get(i), i);
            this.consumerWorks.add(consumerWorker);
            this.fetcheExecutor.submit(consumerWorker);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void resetCorrectOffsets(KafkaConsumer<String, Serializable> kafkaConsumer) {
        Map listTopics = kafkaConsumer.listTopics();
        Set<String> keySet = listTopics.keySet();
        ArrayList arrayList = new ArrayList(this.topicHandlers.keySet());
        kafkaConsumer.poll(200L);
        for (String str : keySet) {
            if (arrayList.contains(str)) {
                for (PartitionInfo partitionInfo : (List) listTopics.get(str)) {
                    try {
                        long latestProcessedOffsets = this.consumerContext.getLatestProcessedOffsets(str, partitionInfo.partition());
                        TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                        OffsetAndMetadata committed = kafkaConsumer.committed(topicPartition);
                        if (kafkaConsumer.assignment().contains(topicPartition) && latestProcessedOffsets >= 0 && latestProcessedOffsets < committed.offset()) {
                            kafkaConsumer.seek(topicPartition, latestProcessedOffsets);
                            logger.info(">>>>>>> seek Topic[{}] partition[{}] from {} to {}", new Object[]{str, Integer.valueOf(partitionInfo.partition()), Long.valueOf(committed.offset()), Long.valueOf(latestProcessedOffsets)});
                        }
                    } catch (Exception e) {
                        logger.warn("try seek topic[" + str + "] partition[" + partitionInfo.partition() + "] offsets error");
                    }
                }
            }
        }
        kafkaConsumer.resume(kafkaConsumer.assignment());
    }

    @Override // com.jeesuite.kafka.consumer.TopicConsumer, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        for (int i = 0; i < this.consumerWorks.size(); i = (i - 1) + 1) {
            this.consumerWorks.get(i).close();
            this.consumerWorks.remove(i);
        }
        this.fetcheExecutor.shutdown();
        if (this.processExecutor != null) {
            this.processExecutor.shutdown();
        }
        this.errorMessageProcessor.close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <K extends Serializable, V extends Serializable> void subscribeTopic(final KafkaConsumer<String, Serializable> kafkaConsumer, String str) {
        ArrayList arrayList = new ArrayList(Arrays.asList(str));
        if (this.offsetAutoCommit) {
            kafkaConsumer.subscribe(arrayList);
        } else {
            kafkaConsumer.subscribe(arrayList, new ConsumerRebalanceListener() { // from class: com.jeesuite.kafka.consumer.NewApiTopicConsumer.1
                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                    NewApiTopicConsumer.this.commitOffsets(kafkaConsumer);
                }

                public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                    for (TopicPartition topicPartition : collection) {
                        long j = 0;
                        if (NewApiTopicConsumer.this.consumerContext.getOffsetLogHanlder() != null) {
                            try {
                                j = NewApiTopicConsumer.this.consumerContext.getLatestProcessedOffsets(topicPartition.topic(), topicPartition.partition());
                                NewApiTopicConsumer.logger.info("offsetLogHanlder.getLatestProcessedOffsets({},{}) result is {}", new Object[]{topicPartition.topic(), Integer.valueOf(topicPartition.partition()), Long.valueOf(j)});
                            } catch (Exception e) {
                                NewApiTopicConsumer.logger.warn("offsetLogHanlder.getLatestProcessedOffsets error:{}", e.getMessage());
                            }
                        }
                        if (j == 0) {
                            OffsetAndMetadata committed = kafkaConsumer.committed(topicPartition);
                            j = committed != null ? committed.offset() : -1L;
                        }
                        NewApiTopicConsumer.logger.debug("Assigned topicPartion : {} offset : {}", topicPartition, Long.valueOf(j));
                        if (j >= 0) {
                            kafkaConsumer.seek(topicPartition, j);
                        }
                    }
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void commitOffsets(KafkaConsumer<String, Serializable> kafkaConsumer) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, Long> entry : this.partitionToUncommittedOffsetMap.entrySet()) {
            hashMap.put(entry.getKey(), new OffsetAndMetadata(entry.getValue().longValue() + 1));
        }
        if (hashMap.isEmpty()) {
            return;
        }
        logger.debug("committing the offsets : {}", hashMap);
        kafkaConsumer.commitAsync(hashMap, new OffsetCommitCallback() { // from class: com.jeesuite.kafka.consumer.NewApiTopicConsumer.2
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
                if (exc != null) {
                    NewApiTopicConsumer.logger.error("committ the offsets error", exc);
                } else {
                    NewApiTopicConsumer.logger.debug("committed the offsets : {}", map);
                    NewApiTopicConsumer.this.partitionToUncommittedOffsetMap.clear();
                }
            }
        });
    }
}
