package io.gearpump.streaming.kafka.lib;

import io.gearpump.util.LogUtil$;
import java.io.InputStream;
import java.util.Properties;
import kafka.admin.AdminUtils$;
import kafka.cluster.Broker;
import kafka.common.TopicAndPartition;
import kafka.consumer.ConsumerConfig;
import kafka.utils.ZkUtils$;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import scala.Function0;
import scala.Option;
import scala.Predef$;
import scala.StringContext;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.mutable.Iterable$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: KafkaUtil.scala */
/* loaded from: input_file:io/gearpump/streaming/kafka/lib/KafkaUtil$.class */
public final class KafkaUtil$ {
    public static final KafkaUtil$ MODULE$ = null;
    private final Logger LOG;

    static {
        new KafkaUtil$();
    }

    private Logger LOG() {
        return this.LOG;
    }

    public Broker getBroker(Function0<ZkClient> function0, String str, int i) {
        ZkClient zkClient = (ZkClient) function0.apply();
        try {
            try {
                Option leaderForPartition = ZkUtils$.MODULE$.getLeaderForPartition(zkClient, str, i);
                new KafkaUtil$$anonfun$1(str, i);
                if (leaderForPartition.isEmpty()) {
                    throw new RuntimeException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"leader not available for TopicAndPartition(", ", ", ")"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str, BoxesRunTime.boxToInteger(i)})));
                }
                int unboxToInt = BoxesRunTime.unboxToInt(leaderForPartition.get());
                Option brokerInfo = ZkUtils$.MODULE$.getBrokerInfo(zkClient, unboxToInt);
                new KafkaUtil$$anonfun$getBroker$1(unboxToInt);
                if (brokerInfo.isEmpty()) {
                    throw new RuntimeException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"broker info not found for leader ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(unboxToInt)})));
                }
                return (Broker) brokerInfo.get();
            } catch (Exception e) {
                LOG().error(e.getMessage());
                throw e;
            }
        } finally {
            zkClient.close();
        }
    }

    public TopicAndPartition[] getTopicAndPartitions(Function0<ZkClient> function0, List<String> list) {
        ZkClient zkClient = (ZkClient) function0.apply();
        try {
            try {
                return (TopicAndPartition[]) ((TraversableOnce) ZkUtils$.MODULE$.getPartitionsForTopics(zkClient, list).flatMap(new KafkaUtil$$anonfun$getTopicAndPartitions$1(), Iterable$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.apply(TopicAndPartition.class));
            } catch (Exception e) {
                LOG().error(e.getMessage());
                throw e;
            }
        } finally {
            zkClient.close();
        }
    }

    public boolean topicExists(Function0<ZkClient> function0, String str) {
        ZkClient zkClient = (ZkClient) function0.apply();
        try {
            try {
                return AdminUtils$.MODULE$.topicExists(zkClient, str);
            } catch (Exception e) {
                LOG().error(e.getMessage());
                throw e;
            }
        } finally {
            zkClient.close();
        }
    }

    public boolean createTopic(Function0<ZkClient> function0, String str, int i, int i2) {
        boolean z;
        ZkClient zkClient = (ZkClient) function0.apply();
        try {
            try {
                if (AdminUtils$.MODULE$.topicExists(zkClient, str)) {
                    LOG().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"topic ", " exists"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str})));
                    z = true;
                } else {
                    AdminUtils$.MODULE$.createTopic(zkClient, str, i, i2, AdminUtils$.MODULE$.createTopic$default$5());
                    LOG().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"created topic ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str})));
                    z = false;
                }
                return z;
            } catch (Exception e) {
                LOG().error(e.getMessage());
                throw e;
            }
        } finally {
            zkClient.close();
        }
    }

    public void deleteTopic(Function0<ZkClient> function0, String str) {
        ZkClient zkClient = (ZkClient) function0.apply();
        try {
            try {
                AdminUtils$.MODULE$.deleteTopic(zkClient, str);
            } catch (Exception e) {
                LOG().error(e.getMessage());
            }
        } finally {
            zkClient.close();
        }
    }

    public Function0<ZkClient> connectZookeeper(ConsumerConfig consumerConfig) {
        return new KafkaUtil$$anonfun$connectZookeeper$1(consumerConfig.zkConnect(), consumerConfig.zkSessionTimeoutMs(), consumerConfig.zkConnectionTimeoutMs());
    }

    public Properties loadProperties(String str) {
        Properties properties = new Properties();
        InputStream inputStream = null;
        try {
            try {
                inputStream = getClass().getClassLoader().getResourceAsStream(str);
                properties.load(inputStream);
            } catch (Exception e) {
                LOG().error(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", " not found"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str})));
            }
            return properties;
        } finally {
            if (inputStream != null) {
                inputStream.close();
            }
        }
    }

    public <K, V> KafkaProducer<K, V> createKafkaProducer(Properties properties, Serializer<K> serializer, Serializer<V> serializer2) {
        if (properties.getProperty("bootstrap.servers") == null) {
            properties.setProperty("bootstrap.servers", "localhost:9092");
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        return new KafkaProducer<>(properties, serializer, serializer2);
    }

    public Properties buildProducerConfig(String str) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", str);
        return properties;
    }

    public Properties buildConsumerConfig(String str) {
        Properties properties = new Properties();
        properties.setProperty("zookeeper.connect", str);
        properties.setProperty("group.id", "gearpump");
        return properties;
    }

    private KafkaUtil$() {
        MODULE$ = this;
        this.LOG = LogUtil$.MODULE$.getLogger(getClass(), LogUtil$.MODULE$.getLogger$default$2(), LogUtil$.MODULE$.getLogger$default$3(), LogUtil$.MODULE$.getLogger$default$4(), LogUtil$.MODULE$.getLogger$default$5(), LogUtil$.MODULE$.getLogger$default$6(), LogUtil$.MODULE$.getLogger$default$7(), LogUtil$.MODULE$.getLogger$default$8());
    }
}
