package com.jeesuite.kafka.monitor;

import com.jeesuite.kafka.monitor.model.BrokerInfo;
import com.jeesuite.kafka.monitor.model.ConsumerGroupInfo;
import com.jeesuite.kafka.monitor.model.TopicInfo;
import com.jeesuite.kafka.monitor.model.TopicPartitionInfo;
import com.jeesuite.kafka.utils.KafkaConst;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import kafka.admin.AdminUtils;
import kafka.api.OffsetRequest;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.cluster.BrokerEndPoint;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.zookeeper.data.Stat;

/* loaded from: input_file:com/jeesuite/kafka/monitor/ZkConsumerCommand.class */
public class ZkConsumerCommand {
    private static final String CLIENT_ID = "ConsumerOffsetChecker";
    private List<String> kafkaServerList = new ArrayList();
    private Map<String, SimpleConsumer> consumers = new ConcurrentHashMap();
    private ZkClient zkClient;
    private ZkUtils zkUtils;

    public ZkConsumerCommand(String str, String str2) {
        this.kafkaServerList.addAll(Arrays.asList(str2.split(",")));
        if (this.zkClient == null) {
            this.zkClient = new ZkClient(str, 10000, 10000, ZKStringSerializer$.MODULE$);
        }
        this.zkClient = new ZkClient(str, 10000, 10000, ZKStringSerializer$.MODULE$);
        this.zkUtils = new ZkUtils(this.zkClient, new ZkConnection(str), false);
    }

    public ZkConsumerCommand(ZkClient zkClient, String str, String str2) {
        this.kafkaServerList.addAll(Arrays.asList(str2.split(",")));
        zkClient = zkClient == null ? new ZkClient(str, 10000, 10000, ZKStringSerializer$.MODULE$) : zkClient;
        this.zkClient = zkClient;
        this.zkUtils = new ZkUtils(zkClient, new ZkConnection(str), false);
    }

    public List<ConsumerGroupInfo> getAllConsumerGroups() {
        List<ConsumerGroupInfo> fetchAllConsumerGroups = fetchAllConsumerGroups();
        Iterator<ConsumerGroupInfo> it = fetchAllConsumerGroups.iterator();
        while (it.hasNext()) {
            loadTopicInfoInConsumerGroup(it.next());
        }
        return fetchAllConsumerGroups;
    }

    private void loadTopicInfoInConsumerGroup(ConsumerGroupInfo consumerGroupInfo) {
        for (String str : getSubscribeTopics(consumerGroupInfo.getGroupName())) {
            TopicInfo topicInfo = new TopicInfo();
            topicInfo.setTopicName(str);
            List<TopicPartitionInfo> topicOffsets = getTopicOffsets(consumerGroupInfo.getGroupName(), str);
            boolean z = true;
            for (TopicPartitionInfo topicPartitionInfo : topicOffsets) {
                getTopicPartitionLogSize(topicPartitionInfo);
                String fetchPartitionOwner = fetchPartitionOwner(consumerGroupInfo.getGroupName(), str, topicPartitionInfo.getPartition());
                if (fetchPartitionOwner != null) {
                    z = false;
                    topicPartitionInfo.setOwner(fetchPartitionOwner);
                    if (!consumerGroupInfo.isActived()) {
                        consumerGroupInfo.setActived(true);
                    }
                }
            }
            if (!z && topicOffsets.size() > 0) {
                topicInfo.setPartitions(topicOffsets);
                consumerGroupInfo.getTopics().add(topicInfo);
            }
        }
    }

    public List<BrokerInfo> fetchAllBrokers() {
        ArrayList arrayList = new ArrayList();
        scala.collection.Iterator it = this.zkUtils.getAllBrokersInCluster().toList().iterator();
        while (it.hasNext()) {
            Node node = ((Broker) it.next()).getNode(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT));
            arrayList.add(new BrokerInfo(node.idString(), node.host(), node.port()));
        }
        return arrayList;
    }

    public List<ConsumerGroupInfo> fetchAllConsumerGroups() {
        ArrayList arrayList = new ArrayList();
        List<String> children = this.zkClient.getChildren("/consumers");
        if (children == null) {
            return arrayList;
        }
        for (String str : children) {
            List<String> consumerClusterNodes = getConsumerClusterNodes(str);
            if (consumerClusterNodes != null && !consumerClusterNodes.isEmpty()) {
                ConsumerGroupInfo consumerGroupInfo = new ConsumerGroupInfo();
                consumerGroupInfo.setClusterNodes(consumerClusterNodes);
                consumerGroupInfo.setGroupName(str);
                arrayList.add(consumerGroupInfo);
            }
        }
        return arrayList;
    }

    public MetadataResponse.TopicMetadata getTopicMetadata(String str) {
        return AdminUtils.fetchTopicMetadataFromZk(str, this.zkUtils);
    }

    public int getPartitionCounts(String str) {
        return getTopicMetadata(str).partitionMetadata().size();
    }

    public List<String> getConsumerClusterNodes(String str) {
        String str2 = KafkaConst.ZK_CONSUMER_PATH + str + "/ids";
        return this.zkClient.exists(str2) ? this.zkClient.getChildren(str2) : new ArrayList();
    }

    public List<String> getSubscribeTopics(String str) {
        String str2 = KafkaConst.ZK_CONSUMER_PATH + str + "/owners";
        return this.zkClient.exists(str2) ? this.zkClient.getChildren(str2) : new ArrayList();
    }

    public List<TopicPartitionInfo> getTopicOffsets(String str, String str2) {
        ArrayList arrayList = new ArrayList();
        String str3 = KafkaConst.ZK_CONSUMER_PATH + str + "/offsets/" + str2;
        if (!this.zkClient.exists(str3)) {
            return new ArrayList();
        }
        for (String str4 : this.zkClient.getChildren(str3)) {
            Stat stat = new Stat();
            TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(str2, Integer.parseInt(str4), Long.parseLong(this.zkClient.readData(str3 + "/" + str4, stat).toString()));
            topicPartitionInfo.setCreateTime(new Date(stat.getCtime()));
            topicPartitionInfo.setLastTime(new Date(stat.getMtime()));
            arrayList.add(topicPartitionInfo);
        }
        return arrayList;
    }

    public void resetTopicOffsets(String str, String str2, int i, long j) {
        this.zkClient.writeData(KafkaConst.ZK_CONSUMER_PATH + str + "/offsets/" + str2 + "/" + i, String.valueOf(j));
    }

    public String fetchPartitionOwner(String str, String str2, int i) {
        try {
            return (String) this.zkClient.readData(KafkaConst.ZK_CONSUMER_PATH + str + "/owners/" + str2 + "/" + i);
        } catch (Exception e) {
            return null;
        }
    }

    public boolean consumerIsActive(String str, String str2) {
        return getConsumerClusterNodes(str).contains(str2);
    }

    private SimpleConsumer getConsumerClient(String str) {
        if (this.consumers.containsKey(str)) {
            return this.consumers.get(str);
        }
        SimpleConsumer simpleConsumer = new SimpleConsumer(str.split(":")[0], Integer.parseInt(str.split(":")[1]), 100000, 65536, CLIENT_ID);
        this.consumers.put(str, simpleConsumer);
        return simpleConsumer;
    }

    private SimpleConsumer getConsumerClient(String str, int i) {
        return getConsumerClient(str + ":" + i);
    }

    public void getTopicPartitionLogSize(TopicPartitionInfo topicPartitionInfo) {
        BrokerEndPoint leader = findLeader(topicPartitionInfo.getTopic(), topicPartitionInfo.getPartition()).leader();
        SimpleConsumer consumerClient = getConsumerClient(leader.host(), leader.port());
        try {
            topicPartitionInfo.setLogSize(getLastOffset(consumerClient, topicPartitionInfo.getTopic(), topicPartitionInfo.getPartition(), OffsetRequest.LatestTime()));
            consumerClient.close();
        } catch (Throwable th) {
            consumerClient.close();
            throw th;
        }
    }

    private static long getLastOffset(SimpleConsumer simpleConsumer, String str, int i, long j) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(str, i);
        HashMap hashMap = new HashMap();
        hashMap.put(topicAndPartition, new PartitionOffsetRequestInfo(j, 1));
        OffsetResponse offsetsBefore = simpleConsumer.getOffsetsBefore(new kafka.javaapi.OffsetRequest(hashMap, OffsetRequest.CurrentVersion(), CLIENT_ID));
        if (!offsetsBefore.hasError()) {
            return offsetsBefore.offsets(str, i)[0];
        }
        System.out.println("Error fetching data Offset Data the Broker. Reason: " + ((int) offsetsBefore.errorCode(str, i)));
        return 0L;
    }

    private PartitionMetadata findLeader(String str, int i) {
        PartitionMetadata partitionMetadata = null;
        Iterator<String> it = this.kafkaServerList.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            String next = it.next();
            SimpleConsumer simpleConsumer = null;
            try {
                try {
                    simpleConsumer = new SimpleConsumer(next.split(":")[0], Integer.parseInt(next.split(":")[1]), 100000, 65536, "leaderLookup");
                    Iterator it2 = simpleConsumer.send(new TopicMetadataRequest(Collections.singletonList(str))).topicsMetadata().iterator();
                    while (it2.hasNext()) {
                        for (PartitionMetadata partitionMetadata2 : ((TopicMetadata) it2.next()).partitionsMetadata()) {
                            if (partitionMetadata2.partitionId() == i) {
                                partitionMetadata = partitionMetadata2;
                                if (simpleConsumer != null) {
                                    simpleConsumer.close();
                                }
                            }
                        }
                    }
                    if (simpleConsumer != null) {
                        simpleConsumer.close();
                    }
                } catch (Exception e) {
                    System.out.println("Error communicating with Broker [" + next + "] to find Leader for [" + str + ", " + i + "] Reason: " + e);
                    if (simpleConsumer != null) {
                        simpleConsumer.close();
                    }
                }
            } catch (Throwable th) {
                if (simpleConsumer != null) {
                    simpleConsumer.close();
                }
                throw th;
            }
        }
        if (partitionMetadata != null) {
            this.kafkaServerList.clear();
            for (BrokerEndPoint brokerEndPoint : partitionMetadata.replicas()) {
                this.kafkaServerList.add(brokerEndPoint.host() + ":" + brokerEndPoint.port());
            }
        }
        return partitionMetadata;
    }

    public void close() {
        if (this.zkUtils != null) {
            this.zkUtils.close();
            this.zkUtils = null;
            this.zkClient = null;
        }
    }
}
