package org.apache.gearpump.streaming.kafka.lib;

import com.twitter.bijection.Injection$;
import kafka.common.TopicAndPartition;
import org.I0Itec.zkclient.ZkClient;
import org.apache.gearpump.cluster.UserConfig;
import org.apache.gearpump.streaming.kafka.lib.KafkaConfig;
import org.apache.gearpump.streaming.kafka.lib.KafkaConsumer;
import org.apache.gearpump.util.LogUtil$;
import org.slf4j.Logger;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;

/* compiled from: KafkaStorage.scala */
/* loaded from: input_file:org/apache/gearpump/streaming/kafka/lib/KafkaStorage$.class */
public final class KafkaStorage$ {
    public static final KafkaStorage$ MODULE$ = null;
    private final Logger LOG;

    static {
        new KafkaStorage$();
    }

    private Logger LOG() {
        return this.LOG;
    }

    public KafkaStorage apply(int i, UserConfig userConfig, TopicAndPartition topicAndPartition) {
        Map<String, ?> config = userConfig.config();
        String s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"app", "_", "_", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(i), topicAndPartition.topic(), BoxesRunTime.boxToInteger(topicAndPartition.partition())}));
        int storageReplicas = KafkaConfig$.MODULE$.ConfigToKafka(config).getStorageReplicas();
        KafkaConfig.ConfigToKafka ConfigToKafka = KafkaConfig$.MODULE$.ConfigToKafka(config);
        KafkaProducer producer = ConfigToKafka.getProducer(ConfigToKafka.getProducer$default$1(), ConfigToKafka.getProducer$default$2());
        String clientId = KafkaConfig$.MODULE$.ConfigToKafka(config).getClientId();
        int socketTimeoutMS = KafkaConfig$.MODULE$.ConfigToKafka(config).getSocketTimeoutMS();
        int socketReceiveBufferBytes = KafkaConfig$.MODULE$.ConfigToKafka(config).getSocketReceiveBufferBytes();
        int fetchMessageMaxBytes = KafkaConfig$.MODULE$.ConfigToKafka(config).getFetchMessageMaxBytes();
        KafkaConfig.ConfigToKafka ConfigToKafka2 = KafkaConfig$.MODULE$.ConfigToKafka(config);
        ZkClient zkClient = ConfigToKafka2.getZkClient(ConfigToKafka2.getZkClient$default$1(), ConfigToKafka2.getZkClient$default$2(), ConfigToKafka2.getZkClient$default$3(), ConfigToKafka2.getZkClient$default$4());
        boolean createTopic = KafkaUtil$.MODULE$.createTopic(zkClient, s, storageReplicas);
        KafkaConsumer.Broker broker = KafkaUtil$.MODULE$.getBroker(zkClient, s, 0);
        return new KafkaStorage(s, producer, load(createTopic, KafkaMessageIterator$.MODULE$.apply(broker.host(), broker.port(), s, 0, socketTimeoutMS, socketReceiveBufferBytes, fetchMessageMaxBytes, clientId)));
    }

    private List<Tuple2<Object, byte[]>> load(boolean z, KafkaMessageIterator kafkaMessageIterator) {
        return z ? fetch$1(List$.MODULE$.empty(), kafkaMessageIterator) : List$.MODULE$.empty();
    }

    private final List fetch$1(List list, KafkaMessageIterator kafkaMessageIterator) {
        Some some;
        while (kafkaMessageIterator.hasNext()) {
            KafkaMessage next = kafkaMessageIterator.next();
            Option<byte[]> key = next.key();
            new KafkaStorage$$anonfun$1(next);
            if (key.isEmpty()) {
                some = None$.MODULE$;
            } else {
                Success invert = Injection$.MODULE$.invert((byte[]) key.get(), Injection$.MODULE$.long2BigEndian());
                if (!(invert instanceof Success)) {
                    if (invert instanceof Failure) {
                        throw ((Failure) invert).exception();
                    }
                    throw new MatchError(invert);
                }
                some = new Some(new Tuple2(BoxesRunTime.boxToLong(BoxesRunTime.unboxToLong(invert.value())), next.msg()));
            }
            new KafkaStorage$$anonfun$2();
            Some some2 = some;
            if (some.isEmpty()) {
                throw new RuntimeException("offset key should not be null");
            }
            list = (List) list.$colon$plus(some2.get(), List$.MODULE$.canBuildFrom());
        }
        kafkaMessageIterator.close();
        return list;
    }

    private KafkaStorage$() {
        MODULE$ = this;
        this.LOG = LogUtil$.MODULE$.getLogger(KafkaStorage.class, LogUtil$.MODULE$.getLogger$default$2(), LogUtil$.MODULE$.getLogger$default$3(), LogUtil$.MODULE$.getLogger$default$4(), LogUtil$.MODULE$.getLogger$default$5(), LogUtil$.MODULE$.getLogger$default$6(), LogUtil$.MODULE$.getLogger$default$7());
    }
}
