package org.apache.gearpump.streaming.kafka;

import com.twitter.bijection.Injection$;
import org.I0Itec.zkclient.ZkClient;
import org.apache.gearpump.streaming.kafka.lib.KafkaUtil$;
import org.apache.gearpump.streaming.kafka.lib.consumer.KafkaConsumer;
import org.apache.gearpump.streaming.kafka.lib.consumer.KafkaMessage;
import org.apache.gearpump.streaming.transaction.api.OffsetStorage;
import org.apache.gearpump.streaming.transaction.api.OffsetStorage$StorageEmpty$;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.mutable.ArrayBuilder;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

/* compiled from: KafkaStorage.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005=s!B\u0001\u0003\u0011\u0003i\u0011\u0001D&bM.\f7\u000b^8sC\u001e,'BA\u0002\u0005\u0003\u0015Y\u0017MZ6b\u0015\t)a!A\u0005tiJ,\u0017-\\5oO*\u0011q\u0001C\u0001\tO\u0016\f'\u000f];na*\u0011\u0011BC\u0001\u0007CB\f7\r[3\u000b\u0003-\t1a\u001c:h\u0007\u0001\u0001\"AD\b\u000e\u0003\t1Q\u0001\u0005\u0002\t\u0002E\u0011AbS1gW\u0006\u001cFo\u001c:bO\u0016\u001c\"a\u0004\n\u0011\u0005M1R\"\u0001\u000b\u000b\u0003U\tQa]2bY\u0006L!a\u0006\u000b\u0003\r\u0005s\u0017PU3g\u0011\u0015Ir\u0002\"\u0001\u001b\u0003\u0019a\u0014N\\5u}Q\tQ\u0002C\u0004\u001d\u001f\t\u0007I\u0011B\u000f\u0002\u00071{u)F\u0001\u001f!\ty\"%D\u0001!\u0015\t\t#\"A\u0003tY\u001a$$.\u0003\u0002$A\t1Aj\\4hKJDa!J\b!\u0002\u0013q\u0012\u0001\u0002'P\u000f\u00022A\u0001\u0005\u0002\u0001OM\u0019aE\u0005\u0015\u0011\u0005%rS\"\u0001\u0016\u000b\u0005-b\u0013aA1qS*\u0011Q\u0006B\u0001\fiJ\fgn]1di&|g.\u0003\u00020U\tiqJ\u001a4tKR\u001cFo\u001c:bO\u0016D\u0001\"\r\u0014\u0003\u0002\u0003\u0006IAM\u0001\u0006i>\u0004\u0018n\u0019\t\u0003gYr!a\u0005\u001b\n\u0005U\"\u0012A\u0002)sK\u0012,g-\u0003\u00028q\t11\u000b\u001e:j]\u001eT!!\u000e\u000b\t\u0011i2#\u0011!Q\u0001\nm\n\u0001\u0002\u001d:pIV\u001cWM\u001d\t\u0005y\u0005\u001b5)D\u0001>\u0015\tQdH\u0003\u0002@\u0001\u000691\r\\5f]R\u001c(BA\u0002\t\u0013\t\u0011UHA\u0007LC\u001a\\\u0017\r\u0015:pIV\u001cWM\u001d\t\u0004'\u00113\u0015BA#\u0015\u0005\u0015\t%O]1z!\t\u0019r)\u0003\u0002I)\t!!)\u001f;f\u0011!QeE!A%\u0002\u0013Y\u0015aC4fi\u000e{gn];nKJ\u00042a\u0005'O\u0013\tiEC\u0001\u0005=Eft\u0017-\\3?!\tyE+D\u0001Q\u0015\t\t&+\u0001\u0005d_:\u001cX/\\3s\u0015\t\u0019&!A\u0002mS\nL!!\u0016)\u0003\u001b-\u000bgm[1D_:\u001cX/\\3s\u0011!9fE!A%\u0002\u0013A\u0016!C2p]:,7\r\u001e.l!\r\u0019B*\u0017\t\u00035~k\u0011a\u0017\u0006\u00039v\u000b\u0001B_6dY&,g\u000e\u001e\u0006\u0003=*\ta!\u0013\u0019Ji\u0016\u001c\u0017B\u00011\\\u0005!Q6n\u00117jK:$\bBB\r'\t\u0003\u0011!\rF\u0003dI\u00164w\r\u0005\u0002\u000fM!)\u0011'\u0019a\u0001e!)!(\u0019a\u0001w!1!*\u0019CA\u0002-CaaV1\u0005\u0002\u0004A\u0006\u0002C)'\u0011\u000b\u0007I\u0011B5\u0016\u00039C\u0001b\u001b\u0014\t\u0002\u0003\u0006KAT\u0001\nG>t7/^7fe\u0002Bq!\u001c\u0014C\u0002\u0013%a.\u0001\u0006eCR\f')\u001f+j[\u0016,\u0012a\u001c\t\u0004ab\\hBA9w\u001d\t\u0011X/D\u0001t\u0015\t!H\"\u0001\u0004=e>|GOP\u0005\u0002+%\u0011q\u000fF\u0001\ba\u0006\u001c7.Y4f\u0013\tI(P\u0001\u0003MSN$(BA<\u0015!\u0011\u0019BP`\"\n\u0005u$\"A\u0002+va2,'\u0007E\u0002��\u0003\u001fqA!!\u0001\u0002\u000e9!\u00111AA\u0006\u001d\u0011\t)!!\u0003\u000f\u0007I\f9!C\u0001\f\u0013\tI!\"\u0003\u0002\b\u0011%\u0011qOB\u0005\u0005\u0003#\t\u0019BA\u0005US6,7\u000b^1na*\u0011qO\u0002\u0005\b\u0003/1\u0003\u0015!\u0003p\u0003-!\u0017\r^1CsRKW.\u001a\u0011\t\u000f\u0005ma\u0005\"\u0011\u0002\u001e\u00051An\\8l+B$B!a\b\u0002,A)\u0011\u0011EA\u0014\u00076\u0011\u00111\u0005\u0006\u0004\u0003K!\u0012\u0001B;uS2LA!!\u000b\u0002$\t\u0019AK]=\t\u000f\u00055\u0012\u0011\u0004a\u0001}\u0006!A/[7f\u0011\u001d\t\tD\nC!\u0003g\ta!\u00199qK:$GCBA\u001b\u0003w\ti\u0004E\u0002\u0014\u0003oI1!!\u000f\u0015\u0005\u0011)f.\u001b;\t\u000f\u00055\u0012q\u0006a\u0001}\"9\u0011qHA\u0018\u0001\u0004\u0019\u0015AB8gMN,G\u000fC\u0004\u0002D\u0019\"\t%!\u0012\u0002\u000b\rdwn]3\u0015\u0005\u0005U\u0002\u0002CA%M\u0011\u0005!!a\u0013\u0002\t1|\u0017\r\u001a\u000b\u0004_\u00065\u0003BB)\u0002H\u0001\u0007a\n")
/* loaded from: input_file:org/apache/gearpump/streaming/kafka/KafkaStorage.class */
public class KafkaStorage implements OffsetStorage {
    private final String topic;
    private final KafkaProducer<byte[], byte[]> producer;
    private final Function0<KafkaConsumer> getConsumer;
    private final Function0<ZkClient> connectZk;
    private KafkaConsumer consumer;
    private final List<Tuple2<Object, byte[]>> dataByTime;
    private volatile boolean bitmap$0;

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5 */
    private KafkaConsumer consumer$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                this.consumer = (KafkaConsumer) this.getConsumer.apply();
                this.bitmap$0 = true;
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = this;
            this.getConsumer = null;
            return this.consumer;
        }
    }

    private KafkaConsumer consumer() {
        return this.bitmap$0 ? this.consumer : consumer$lzycompute();
    }

    private List<Tuple2<Object, byte[]>> dataByTime() {
        return this.dataByTime;
    }

    public Try<byte[]> lookUp(long j) {
        if (dataByTime().isEmpty()) {
            return new Failure(OffsetStorage$StorageEmpty$.MODULE$);
        }
        Tuple2 tuple2 = (Tuple2) dataByTime().head();
        Tuple2 tuple22 = (Tuple2) dataByTime().last();
        return j < tuple2._1$mcJ$sp() ? new Failure(new OffsetStorage.Underflow((byte[]) tuple2._2())) : j > tuple22._1$mcJ$sp() ? new Failure(new OffsetStorage.Overflow((byte[]) tuple22._2())) : new Success(((Tuple2) dataByTime().reverse().find(new KafkaStorage$$anonfun$lookUp$1(this, j)).get())._2());
    }

    public void append(long j, byte[] bArr) {
        this.producer.send(new ProducerRecord(this.topic, Predef$.MODULE$.int2Integer(0), Injection$.MODULE$.apply(BoxesRunTime.boxToLong(j), Injection$.MODULE$.long2BigEndian()), bArr));
    }

    public void close() {
        this.producer.close();
        KafkaUtil$.MODULE$.deleteTopic(this.connectZk, this.topic);
    }

    public List<Tuple2<Object, byte[]>> load(KafkaConsumer kafkaConsumer) {
        Some some;
        ObjectRef create = ObjectRef.create(new ArrayBuilder.ofRef(ClassTag$.MODULE$.apply(Tuple2.class)));
        while (kafkaConsumer.hasNext()) {
            KafkaMessage next = kafkaConsumer.next();
            Option<byte[]> key = next.key();
            KafkaStorage$$anonfun$load$1 kafkaStorage$$anonfun$load$1 = new KafkaStorage$$anonfun$load$1(this, create, next);
            if (key.isEmpty()) {
                some = None$.MODULE$;
            } else {
                Success invert = Injection$.MODULE$.invert((byte[]) key.get(), Injection$.MODULE$.long2BigEndian());
                if (!(invert instanceof Success)) {
                    if (invert instanceof Failure) {
                        throw ((Failure) invert).exception();
                    }
                    throw new MatchError(invert);
                }
                long unboxToLong = BoxesRunTime.unboxToLong(invert.value());
                ArrayBuilder.ofRef ofref = (ArrayBuilder.ofRef) kafkaStorage$$anonfun$load$1.messagesBuilder$1.elem;
                Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
                Predef$ predef$ = Predef$.MODULE$;
                some = new Some(ofref.$plus$eq(new Tuple2(BoxesRunTime.boxToLong(unboxToLong), kafkaStorage$$anonfun$load$1.kafkaMsg$1.msg())));
            }
            new KafkaStorage$$anonfun$load$2(this);
            if (some.isEmpty()) {
                throw new RuntimeException("offset key should not be null");
            }
        }
        kafkaConsumer.close();
        Predef$ predef$2 = Predef$.MODULE$;
        return new ArrayOps.ofRef(((ArrayBuilder.ofRef) create.elem).result()).toList();
    }

    public KafkaStorage(String str, KafkaProducer<byte[], byte[]> kafkaProducer, Function0<KafkaConsumer> function0, Function0<ZkClient> function02) {
        this.topic = str;
        this.producer = kafkaProducer;
        this.getConsumer = function0;
        this.connectZk = function02;
        this.dataByTime = KafkaUtil$.MODULE$.topicExists(function02, str) ? load(consumer()) : List$.MODULE$.empty();
    }
}
