package com.jeesuite.kafka.consumer;

import com.jeesuite.kafka.handler.MessageHandler;
import com.jeesuite.kafka.message.DefaultMessage;
import com.jeesuite.kafka.thread.StandardThreadExecutor;
import java.io.Closeable;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/jeesuite/kafka/consumer/NewApiTopicConsumer.class */
public class NewApiTopicConsumer implements TopicConsumer, Closeable {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerWorker.class);
    private Properties configs;
    private Map<String, MessageHandler> topicHandlers;
    private ExecutorService fetcheExecutor;
    private StandardThreadExecutor processExecutor;
    private KafkaConsumer<String, DefaultMessage> consumer;
    private boolean offsetAutoCommit;
    private List<ConsumerWorker> consumerWorks = new ArrayList();
    private ErrorMessageDefaultProcessor errorMessageProcessor = new ErrorMessageDefaultProcessor(1);
    private final Map<TopicPartition, Long> partitionToUncommittedOffsetMap = new ConcurrentHashMap();
    private final List<Future<Boolean>> committedOffsetFutures = new ArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/jeesuite/kafka/consumer/NewApiTopicConsumer$ConsumerWorker.class */
    public class ConsumerWorker implements Runnable {
        private AtomicBoolean closed;

        /* loaded from: input_file:com/jeesuite/kafka/consumer/NewApiTopicConsumer$ConsumerWorker$ConsumeRecords.class */
        private class ConsumeRecords implements Callable<Boolean> {
            ConsumerRecords<String, DefaultMessage> records;
            Map<TopicPartition, Long> partitionToUncommittedOffsetMap;

            public ConsumeRecords(ConsumerRecords<String, DefaultMessage> consumerRecords, Map<TopicPartition, Long> map) {
                this.records = consumerRecords;
                this.partitionToUncommittedOffsetMap = map;
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() {
                NewApiTopicConsumer.logger.debug("Number of records received : {}", Integer.valueOf(this.records.count()));
                try {
                    Iterator it = this.records.iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
                        NewApiTopicConsumer.logger.info("Record received topicPartition : {}, offset : {}", topicPartition, Long.valueOf(consumerRecord.offset()));
                        this.partitionToUncommittedOffsetMap.put(topicPartition, Long.valueOf(consumerRecord.offset()));
                        ConsumerWorker.this.processConsumerRecords(consumerRecord);
                    }
                } catch (Exception e) {
                    NewApiTopicConsumer.logger.error("Error while consuming", e);
                }
                return true;
            }
        }

        private ConsumerWorker() {
            this.closed = new AtomicBoolean();
        }

        @Override // java.lang.Runnable
        public void run() {
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
            while (!this.closed.get()) {
                ConsumerRecords poll = NewApiTopicConsumer.this.consumer.poll(1500L);
                if (!poll.isEmpty()) {
                    if (NewApiTopicConsumer.this.offsetAutoCommit) {
                        Iterator it = poll.iterator();
                        while (it.hasNext()) {
                            processConsumerRecords((ConsumerRecord) it.next());
                        }
                    } else {
                        NewApiTopicConsumer.this.consumer.pause(NewApiTopicConsumer.this.consumer.assignment());
                        Future submit = newFixedThreadPool.submit(new ConsumeRecords(poll, NewApiTopicConsumer.this.partitionToUncommittedOffsetMap));
                        NewApiTopicConsumer.this.committedOffsetFutures.add(submit);
                        Boolean bool = false;
                        while (!bool.booleanValue() && !this.closed.get()) {
                            try {
                                bool = (Boolean) submit.get(3L, TimeUnit.SECONDS);
                            } catch (InterruptedException | ExecutionException e) {
                                NewApiTopicConsumer.logger.error("Error while consuming records", e);
                            } catch (CancellationException e2) {
                                NewApiTopicConsumer.logger.debug("ConsumeRecords Job got cancelled");
                            } catch (TimeoutException e3) {
                                NewApiTopicConsumer.logger.debug("heartbeats the coordinator");
                                NewApiTopicConsumer.this.consumer.poll(0L);
                                NewApiTopicConsumer.this.commitOffsets(NewApiTopicConsumer.this.partitionToUncommittedOffsetMap);
                            }
                        }
                        NewApiTopicConsumer.this.committedOffsetFutures.remove(submit);
                        NewApiTopicConsumer.this.consumer.resume(NewApiTopicConsumer.this.consumer.assignment());
                        NewApiTopicConsumer.this.commitOffsets(NewApiTopicConsumer.this.partitionToUncommittedOffsetMap);
                    }
                }
            }
            try {
                newFixedThreadPool.shutdownNow();
                do {
                } while (!newFixedThreadPool.awaitTermination(5L, TimeUnit.SECONDS));
            } catch (InterruptedException e4) {
                NewApiTopicConsumer.logger.error("Error while exiting the consumer");
            }
            NewApiTopicConsumer.this.consumer.close();
            NewApiTopicConsumer.logger.info("consumer exited");
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void processConsumerRecords(final ConsumerRecord<String, DefaultMessage> consumerRecord) {
            final MessageHandler messageHandler = (MessageHandler) NewApiTopicConsumer.this.topicHandlers.get(consumerRecord.topic());
            messageHandler.p1Process((DefaultMessage) consumerRecord.value());
            NewApiTopicConsumer.this.processExecutor.submit(new Runnable() { // from class: com.jeesuite.kafka.consumer.NewApiTopicConsumer.ConsumerWorker.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        messageHandler.p2Process((DefaultMessage) consumerRecord.value());
                    } catch (Exception e) {
                        if (!messageHandler.onProcessError((DefaultMessage) consumerRecord.value())) {
                            NewApiTopicConsumer.this.errorMessageProcessor.submit((DefaultMessage) consumerRecord.value(), messageHandler);
                        }
                        NewApiTopicConsumer.logger.error("[" + messageHandler.getClass().getSimpleName() + "] process Topic[" + consumerRecord.topic() + "] error", e);
                    }
                }
            });
        }

        public void close() {
            this.closed.set(true);
        }
    }

    public NewApiTopicConsumer(Properties properties, Map<String, MessageHandler> map, int i) {
        this.configs = properties;
        this.topicHandlers = map;
        this.fetcheExecutor = Executors.newFixedThreadPool(map.size());
        this.processExecutor = new StandardThreadExecutor(1, i, 1000);
        this.offsetAutoCommit = !properties.containsKey("enable.auto.commit") || Boolean.parseBoolean(properties.getProperty("enable.auto.commit"));
    }

    @Override // com.jeesuite.kafka.consumer.TopicConsumer
    public void start() {
        createKafkaConsumer();
        for (int i = 0; i < this.topicHandlers.size(); i++) {
            ConsumerWorker consumerWorker = new ConsumerWorker();
            this.consumerWorks.add(consumerWorker);
            this.fetcheExecutor.submit(consumerWorker);
        }
    }

    @Override // com.jeesuite.kafka.consumer.TopicConsumer, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        for (int i = 0; i < this.consumerWorks.size(); i = (i - 1) + 1) {
            this.consumerWorks.get(i).close();
            this.consumerWorks.remove(i);
        }
        this.fetcheExecutor.shutdown();
        this.processExecutor.shutdown();
        this.errorMessageProcessor.close();
        this.consumer.close();
    }

    private <K extends Serializable, V extends DefaultMessage> void createKafkaConsumer() {
        this.consumer = new KafkaConsumer<>(this.configs);
        ConsumerRebalanceListener consumerRebalanceListener = new ConsumerRebalanceListener() { // from class: com.jeesuite.kafka.consumer.NewApiTopicConsumer.1
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                if (!NewApiTopicConsumer.this.committedOffsetFutures.isEmpty()) {
                    ((Future) NewApiTopicConsumer.this.committedOffsetFutures.get(0)).cancel(true);
                }
                NewApiTopicConsumer.this.commitOffsets(NewApiTopicConsumer.this.partitionToUncommittedOffsetMap);
            }

            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                for (TopicPartition topicPartition : collection) {
                    OffsetAndMetadata committed = NewApiTopicConsumer.this.consumer.committed(topicPartition);
                    long offset = committed != null ? committed.offset() : -1L;
                    NewApiTopicConsumer.logger.debug("Assigned topicPartion : {} offset : {}", topicPartition, Long.valueOf(offset));
                    if (offset >= 0) {
                        NewApiTopicConsumer.this.consumer.seek(topicPartition, offset);
                    }
                }
            }
        };
        ArrayList arrayList = new ArrayList(this.topicHandlers.keySet());
        if (this.offsetAutoCommit) {
            this.consumer.subscribe(arrayList);
        } else {
            this.consumer.subscribe(arrayList, consumerRebalanceListener);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void commitOffsets(Map<TopicPartition, Long> map) {
        if (map.isEmpty()) {
            return;
        }
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
            hashMap.put(entry.getKey(), new OffsetAndMetadata(entry.getValue().longValue() + 1));
        }
        logger.debug("committing the offsets : {}", hashMap);
        this.consumer.commitSync(hashMap);
        map.clear();
    }
}
