package com.jeesuite.amqp.kafka;

import com.jeesuite.amqp.AbstractConsumer;
import com.jeesuite.amqp.MQContext;
import com.jeesuite.amqp.MQMessage;
import com.jeesuite.amqp.MessageHandler;
import com.jeesuite.common.GlobalRuntimeContext;
import com.jeesuite.common.util.ResourceUtils;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/jeesuite/amqp/kafka/KafkaConsumerAdapter.class */
public class KafkaConsumerAdapter extends AbstractConsumer {
    private final Logger logger;
    private KafkaConsumer<String, String> kafkaConsumer;
    private Duration timeoutDuration;
    private boolean offsetAutoCommit;
    private Map<TopicPartition, OffsetAndMetadataStat> uncommitOffsetStats;

    public KafkaConsumerAdapter(Map<String, MessageHandler> map) {
        super(map);
        this.logger = LoggerFactory.getLogger("com.jeesuite.amqp");
        this.timeoutDuration = Duration.ofMillis(ResourceUtils.getLong("jeesuite.amqp.fetch.timeout.ms", 100L));
        this.uncommitOffsetStats = new ConcurrentHashMap();
    }

    @Override // com.jeesuite.amqp.MQConsumer
    public void start() throws Exception {
        this.logger.info(">>KafkaConsumer start Begin..");
        Properties buildConfigs = buildConfigs();
        this.offsetAutoCommit = Boolean.parseBoolean(buildConfigs.getProperty("enable.auto.commit"));
        this.kafkaConsumer = new KafkaConsumer<>(buildConfigs);
        Set<String> keySet = this.messageHandlers.keySet();
        if (this.offsetAutoCommit) {
            this.kafkaConsumer.subscribe(keySet);
        } else {
            this.kafkaConsumer.subscribe(keySet, new ConsumerRebalanceListener() { // from class: com.jeesuite.amqp.kafka.KafkaConsumerAdapter.1
                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                    KafkaConsumerAdapter.this.kafkaConsumer.commitSync();
                }

                public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                    KafkaConsumerAdapter.this.uncommitOffsetStats.clear();
                    Iterator<TopicPartition> it = collection.iterator();
                    while (it.hasNext()) {
                        KafkaConsumerAdapter.this.uncommitOffsetStats.put(it.next(), new OffsetAndMetadataStat(0L));
                    }
                }
            });
        }
        super.startWorker();
        this.logger.info(">>KafkaConsumer start End -> subscribeTopics:{}", buildConfigs, keySet);
    }

    @Override // com.jeesuite.amqp.AbstractConsumer
    public List<MQMessage> fetchMessages() {
        trySubmitOffsets();
        ConsumerRecords poll = this.kafkaConsumer.poll(this.timeoutDuration);
        Iterator it = poll.iterator();
        ArrayList arrayList = new ArrayList(poll.count());
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            MQMessage mQMessage = new MQMessage(consumerRecord.topic(), consumerRecord.value());
            mQMessage.setOriginMessage(consumerRecord);
            arrayList.add(mQMessage);
        }
        return arrayList;
    }

    @Override // com.jeesuite.amqp.AbstractConsumer
    public String handleMessageConsumed(MQMessage mQMessage) {
        if (this.offsetAutoCommit) {
            return null;
        }
        ConsumerRecord consumerRecord = (ConsumerRecord) mQMessage.getOriginMessage(ConsumerRecord.class);
        TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
        if (MQContext.isAsyncConsumeEnabled()) {
            this.uncommitOffsetStats.get(topicPartition).updateOnConsumed(consumerRecord.offset());
            return null;
        }
        HashMap hashMap = new HashMap(1);
        hashMap.put(topicPartition, new OffsetAndMetadata(consumerRecord.offset() + 1));
        submitOffsets(hashMap);
        return null;
    }

    private void trySubmitOffsets() {
        if (this.offsetAutoCommit || !MQContext.isAsyncConsumeEnabled()) {
            return;
        }
        HashMap hashMap = new HashMap(this.uncommitOffsetStats.size());
        this.uncommitOffsetStats.forEach((topicPartition, offsetAndMetadataStat) -> {
            if (offsetAndMetadataStat.isCommited()) {
                return;
            }
            hashMap.put(topicPartition, new OffsetAndMetadata(offsetAndMetadataStat.getOffset() + 1));
        });
        submitOffsets(hashMap);
    }

    private synchronized void submitOffsets(final Map<TopicPartition, OffsetAndMetadata> map) {
        if (map.isEmpty()) {
            return;
        }
        this.kafkaConsumer.commitAsync(map, new OffsetCommitCallback() { // from class: com.jeesuite.amqp.kafka.KafkaConsumerAdapter.2
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> map2, Exception exc) {
                if (exc != null) {
                    KafkaConsumerAdapter.this.kafkaConsumer.commitSync(map);
                } else if (KafkaConsumerAdapter.this.logger.isDebugEnabled()) {
                    KafkaConsumerAdapter.this.logger.debug("MQmessage_COMMIT_SUCCESS -> offsets:{}", map2);
                }
                map2.forEach((topicPartition, offsetAndMetadata) -> {
                    ((OffsetAndMetadataStat) KafkaConsumerAdapter.this.uncommitOffsetStats.get(topicPartition)).setCommited(true);
                });
            }
        });
    }

    private static Properties buildConfigs() {
        Properties properties = new Properties();
        properties.setProperty("group.id", MQContext.getGroupName());
        for (Field field : ConsumerConfig.class.getDeclaredFields()) {
            if (field.getName().endsWith("CONFIG") && field.getType() == String.class) {
                field.setAccessible(true);
                try {
                    String obj = field.get(ConsumerConfig.class).toString();
                    String property = ResourceUtils.getProperty("jeesuite.amqp.kafka[" + obj + "]");
                    if (StringUtils.isNotBlank(property)) {
                        properties.setProperty(obj, property);
                    }
                } catch (Exception e) {
                }
            }
        }
        if (!properties.containsKey("bootstrap.servers")) {
            throw new NullPointerException("Kafka config[bootstrap.servers] is required");
        }
        if (!properties.containsKey("key.deserializer")) {
            properties.put("key.deserializer", StringDeserializer.class.getName());
        }
        if (!properties.containsKey("value.deserializer")) {
            properties.put("value.deserializer", StringDeserializer.class.getName());
        }
        if (!properties.containsKey("client.id")) {
            properties.put("client.id", MQContext.getGroupName() + GlobalRuntimeContext.getWorkId());
        }
        if (!properties.containsKey("max.poll.records")) {
            properties.put("max.poll.records", Integer.valueOf(MQContext.getMaxProcessThreads()));
        }
        if (!properties.containsKey("enable.auto.commit")) {
            properties.put("enable.auto.commit", MQContext.isAsyncConsumeEnabled() ? "false" : "true");
        }
        return properties;
    }

    @Override // com.jeesuite.amqp.AbstractConsumer, com.jeesuite.amqp.MQConsumer
    public void shutdown() {
        super.shutdown();
        this.kafkaConsumer.close();
    }
}
