package com.github.hackerwin7.jlib.utils.drivers.kafka.consumer;

import com.github.hackerwin7.jlib.utils.drivers.kafka.conf.KafkaConf;
import com.github.hackerwin7.jlib.utils.drivers.kafka.data.KafkaMsg;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/github/hackerwin7/jlib/utils/drivers/kafka/consumer/KafkaSimpleConsumer.class */
public class KafkaSimpleConsumer {
    private static Logger logger = Logger.getLogger(KafkaSimpleConsumer.class);
    public static final int QUEUE_SIZE = 10000;
    public static final String CONF_SPLIT = ",";
    public static final String PORT_SPLIT = ":";
    public static final int CONSUME_TIME_OUT = 100000;
    public static final int CONSUME_BUFFER_SIZE = 65536;
    public static final int FETCH_SIZE = 1048576;
    public static final int ERR_COUNT_RECONN = 5;
    public static final long SLEEPING_TIME = 3000;
    public static final int THREAD_POOL_SIZE = 100;
    private String topic;
    private Map<String, Integer> brokers = new HashMap();
    private Map<Integer, Long> offsets = new HashMap();
    private Map<Integer, Long> endOffsets = new HashMap();
    private BlockingQueue<KafkaMsg> queue = new LinkedBlockingQueue(10000);
    private Map<Integer, ConsumeThread> consumers = new HashMap();
    private ExecutorService executors = Executors.newFixedThreadPool(100);

    /* loaded from: input_file:com/github/hackerwin7/jlib/utils/drivers/kafka/consumer/KafkaSimpleConsumer$ConsumeThread.class */
    public class ConsumeThread implements Runnable {
        private int partition;
        private long offset;
        private long endOffset;
        private Map<String, Integer> brokers;
        private Logger logger = Logger.getLogger(ConsumeThread.class);
        private AtomicBoolean running = new AtomicBoolean(true);
        private List<String> replicaBrokers = new ArrayList();
        String clientName = "simple consumer";
        private SimpleConsumer consumer = null;

        public ConsumeThread(Map<String, Integer> map, int i, long j, long j2) {
            this.partition = 0;
            this.offset = 0L;
            this.endOffset = 0L;
            this.brokers = null;
            this.brokers = map;
            this.partition = i;
            this.offset = j;
            this.endOffset = j2;
        }

        @Override // java.lang.Runnable
        public void run() {
            PartitionMetadata findLeader = findLeader();
            if (findLeader == null) {
                this.logger.error("can not find metadata for topic = " + KafkaSimpleConsumer.this.topic);
                return;
            }
            if (findLeader.leader() == null) {
                this.logger.error("can not find leader for topic = " + KafkaSimpleConsumer.this.topic);
                return;
            }
            String host = findLeader.leader().host();
            int port = findLeader.leader().port();
            this.clientName = "simple_" + System.currentTimeMillis();
            this.consumer = new SimpleConsumer(host, port, 100000, 65536, this.clientName);
            long j = this.offset;
            int i = 0;
            while (this.running.get()) {
                FetchResponse fetch = this.consumer.fetch(new FetchRequestBuilder().clientId(this.clientName).addFetch(KafkaSimpleConsumer.this.topic, this.partition, j, 1048576).build());
                if (fetch.hasError()) {
                    i++;
                    short errorCode = fetch.errorCode(KafkaSimpleConsumer.this.topic, this.partition);
                    this.logger.error("receive error response from broker = " + host + ", port = " + port + ", topic = " + KafkaSimpleConsumer.this.topic + ", partition = " + this.partition + ", err code = " + ((int) errorCode));
                    if (i >= 5) {
                        this.logger.error("continuous receive " + i + " responses, exiting consumer thread......");
                        return;
                    }
                    if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
                        long minOffset = getMinOffset();
                        long maxOffset = getMaxOffset();
                        this.logger.error("encounter out off range fetch error.");
                        this.logger.error("min offset = " + minOffset + ", max offset = " + maxOffset + ", request offset = " + j);
                        if (j < minOffset) {
                            j = minOffset;
                        } else if (j > maxOffset) {
                            j = maxOffset;
                        }
                        this.logger.error("reset the request offset to " + j + ", in partition = " + this.partition + ", of topic = " + KafkaSimpleConsumer.this.topic);
                    } else {
                        this.logger.error("leader broker maybe switched , finding ......");
                        this.logger.error("old leader = " + host + ", port = " + port);
                        this.consumer.close();
                        findLeader = findNewLeader(findLeader);
                        if (findLeader == null) {
                            this.logger.error("can not find new metadata for topic = " + KafkaSimpleConsumer.this.topic);
                            this.logger.error("exiting consumer thread...");
                            return;
                        } else if (findLeader.leader() == null) {
                            this.logger.error("can not find new leader for topic = " + KafkaSimpleConsumer.this.topic);
                            this.logger.error("exiting consumer thread...");
                            return;
                        } else {
                            host = findLeader.leader().host();
                            port = findLeader.leader().port();
                            this.consumer = new SimpleConsumer(host, port, 100000, 65536, this.clientName);
                            this.logger.error("new leader = " + host + ", port = " + port);
                        }
                    }
                } else {
                    i = 0;
                    Iterator<MessageAndOffset> it = fetch.messageSet(KafkaSimpleConsumer.this.topic, this.partition).iterator();
                    while (it.hasNext()) {
                        MessageAndOffset next = it.next();
                        long offset = next.offset();
                        if (offset < j) {
                            this.logger.error("found an old offset = " + offset + ", expect read offset = " + j);
                        } else {
                            long nextOffset = next.nextOffset();
                            String str = null;
                            ByteBuffer payload = next.message().payload();
                            byte[] bArr = new byte[payload.limit()];
                            payload.get(bArr);
                            if (next.message().hasKey()) {
                                ByteBuffer key = next.message().key();
                                byte[] bArr2 = new byte[key.limit()];
                                key.get(bArr2);
                                str = new String(bArr2);
                            }
                            KafkaMsg build = KafkaMsg.createBuilder().offset(offset).nextOffset(nextOffset).val(bArr).key(str).topic(KafkaSimpleConsumer.this.topic).partition(this.partition).build();
                            while (true) {
                                try {
                                    KafkaSimpleConsumer.this.queue.put(build);
                                    break;
                                } catch (InterruptedException e) {
                                    this.logger.error(e.getMessage(), e);
                                    try {
                                        Thread.sleep(3000L);
                                    } catch (InterruptedException e2) {
                                        this.logger.error(e2.getMessage(), e2);
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }

        private PartitionMetadata findLeader() {
            PartitionMetadata partitionMetadata = null;
            Iterator<Map.Entry<String, Integer>> it = this.brokers.entrySet().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Map.Entry<String, Integer> next = it.next();
                String key = next.getKey();
                SimpleConsumer simpleConsumer = null;
                try {
                    try {
                        simpleConsumer = new SimpleConsumer(key, next.getValue().intValue(), 100000, 65536, "find_leader" + System.currentTimeMillis());
                        Iterator<TopicMetadata> it2 = simpleConsumer.send(new TopicMetadataRequest(Collections.singletonList(KafkaSimpleConsumer.this.topic))).topicsMetadata().iterator();
                        while (it2.hasNext()) {
                            for (PartitionMetadata partitionMetadata2 : it2.next().partitionsMetadata()) {
                                if (partitionMetadata2.partitionId() == this.partition) {
                                    partitionMetadata = partitionMetadata2;
                                    if (simpleConsumer != null) {
                                        simpleConsumer.close();
                                    }
                                }
                            }
                        }
                        if (simpleConsumer != null) {
                            simpleConsumer.close();
                        }
                    } catch (Throwable th) {
                        this.logger.error("error communicating with broker = [" + key + "] to find leader for topic = [" + KafkaSimpleConsumer.this.topic + "], partition = [" + this.partition + DefaultExpressionEngine.DEFAULT_ATTRIBUTE_END);
                        if (simpleConsumer != null) {
                            simpleConsumer.close();
                        }
                    }
                } catch (Throwable th2) {
                    if (simpleConsumer != null) {
                        simpleConsumer.close();
                    }
                    throw th2;
                }
            }
            if (this.replicaBrokers != null) {
                this.replicaBrokers.clear();
                for (Broker broker : partitionMetadata.replicas()) {
                    this.replicaBrokers.add(broker.host() + ":" + broker.port());
                }
            }
            return partitionMetadata;
        }

        private PartitionMetadata findNewLeader(PartitionMetadata partitionMetadata) {
            boolean z;
            for (int i = 0; i <= 2; i++) {
                PartitionMetadata findLeader = findLeader();
                if (findLeader == null) {
                    z = true;
                } else if (findLeader.leader() == null) {
                    z = true;
                } else {
                    if (!StringUtils.equalsIgnoreCase(partitionMetadata.leader().host(), findLeader.leader().host()) || i != 0) {
                        return findLeader;
                    }
                    z = true;
                }
                if (z) {
                    try {
                        Thread.sleep(3000L);
                    } catch (InterruptedException e) {
                        this.logger.error(e.getMessage(), e);
                    }
                }
            }
            this.logger.error("unable to find new leader when simple consumer has error code......");
            return null;
        }

        private long getOffset(long j) {
            TopicAndPartition topicAndPartition = new TopicAndPartition(KafkaSimpleConsumer.this.topic, this.partition);
            HashMap hashMap = new HashMap();
            hashMap.put(topicAndPartition, new PartitionOffsetRequestInfo(j, 1));
            OffsetResponse offsetsBefore = this.consumer.getOffsetsBefore(new OffsetRequest(hashMap, kafka.api.OffsetRequest.CurrentVersion(), this.clientName));
            if (!offsetsBefore.hasError()) {
                return offsetsBefore.offsets(KafkaSimpleConsumer.this.topic, this.partition)[0];
            }
            this.logger.error("error fetching offset data with topic = " + KafkaSimpleConsumer.this.topic + ", partition = " + this.partition + ", reason = " + ((int) offsetsBefore.errorCode(KafkaSimpleConsumer.this.topic, this.partition)));
            return -1L;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public long getMinOffset() {
            return getOffset(kafka.api.OffsetRequest.EarliestTime());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public long getMaxOffset() {
            return getOffset(kafka.api.OffsetRequest.LatestTime());
        }
    }

    public KafkaSimpleConsumer(KafkaConf kafkaConf) {
        this.topic = null;
        String prop = kafkaConf.getProp(KafkaConf.SIMPLE_BROKER_LIST);
        String prop2 = kafkaConf.getProp(KafkaConf.SIMPLE_PARTITIONS);
        String prop3 = kafkaConf.getProp(KafkaConf.SIMPLE_OFFSETS);
        String prop4 = kafkaConf.getProp(KafkaConf.SIMPLE_END_OFFSETS);
        this.topic = kafkaConf.getProp(KafkaConf.SIMPLE_TOPIC);
        for (String str : StringUtils.split(prop, ",")) {
            String[] split = StringUtils.split(str, ":");
            this.brokers.put(split[0], Integer.valueOf(split[1]));
        }
        String[] split2 = StringUtils.split(prop2, ",");
        String[] split3 = StringUtils.split(prop3, ",");
        for (int i = 0; i <= split2.length - 1; i++) {
            this.offsets.put(Integer.valueOf(split2[i]), Long.valueOf(split3[i]));
        }
        String[] split4 = StringUtils.split(prop4, ",");
        for (int i2 = 0; i2 <= split2.length - 1; i2++) {
            Integer valueOf = Integer.valueOf(split2[i2]);
            Long l = Long.MAX_VALUE;
            if (i2 <= split4.length - 1) {
                l = Long.valueOf(split4[i2]);
            }
            this.endOffsets.put(valueOf, l);
        }
        start();
    }

    private void start() {
        for (Map.Entry<Integer, Long> entry : this.offsets.entrySet()) {
            int intValue = entry.getKey().intValue();
            long longValue = entry.getValue().longValue();
            long longValue2 = this.endOffsets.get(Integer.valueOf(intValue)).longValue();
            if (!this.endOffsets.containsKey(Integer.valueOf(intValue)) || longValue2 <= 0) {
                logger.info("topic = " + this.topic + ", partition = " + intValue + ", offset = " + longValue + ", end offset = " + longValue2 + " is invalid, reset end offset to Long.max");
                longValue2 = Long.MAX_VALUE;
            }
            ConsumeThread consumeThread = new ConsumeThread(this.brokers, intValue, longValue, longValue2);
            this.executors.submit(consumeThread);
            this.consumers.put(Integer.valueOf(intValue), consumeThread);
            logger.info("started consumer topic = " + this.topic + ", partition = " + intValue + ", offset = " + longValue + ", end offset = " + longValue2);
        }
    }

    public long findMaxOffset(int i) {
        return this.consumers.get(Integer.valueOf(i)).getMaxOffset();
    }

    public long findMinOffset(int i) {
        return this.consumers.get(Integer.valueOf(i)).getMinOffset();
    }

    public String getTopic() {
        return this.topic;
    }

    public void close() {
        if (this.executors != null) {
            this.executors.shutdownNow();
        }
    }

    public KafkaMsg consume() throws Exception {
        return this.queue.take();
    }

    public int getQueueSize() {
        return this.queue.size();
    }

    public boolean isConsumeEmpty() {
        return this.queue.isEmpty();
    }
}
