package org.apache.gearpump.streaming.dsl;

import org.apache.gearpump.Message;
import org.apache.gearpump.streaming.dsl.op.OpType;
import org.apache.gearpump.streaming.kafka.KafkaSource;
import org.apache.gearpump.streaming.kafka.lib.KafkaConfig;
import org.apache.gearpump.streaming.task.TaskId;
import org.apache.gearpump.streaming.transaction.api.MessageDecoder;
import scala.Function1;
import scala.None$;
import scala.Some;
import scala.collection.immutable.List;
import scala.reflect.ClassTag;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

/* compiled from: StreamApp.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005=a\u0001B\u0001\u0003\u00015\u0011QbS1gW\u0006\u0004&o\u001c3vG\u0016\u0014(BA\u0002\u0005\u0003\r!7\u000f\u001c\u0006\u0003\u000b\u0019\t\u0011b\u001d;sK\u0006l\u0017N\\4\u000b\u0005\u001dA\u0011\u0001C4fCJ\u0004X/\u001c9\u000b\u0005%Q\u0011AB1qC\u000eDWMC\u0001\f\u0003\ry'oZ\u0002\u0001+\tqqfE\u0002\u0001\u001fU\u0001\"\u0001E\n\u000e\u0003EQ\u0011AE\u0001\u0006g\u000e\fG.Y\u0005\u0003)E\u0011a!\u00118z%\u00164\u0007c\u0001\f+[9\u0011qc\n\b\u00031\u0015r!!\u0007\u0013\u000f\u0005i\u0019cBA\u000e#\u001d\ta\u0012E\u0004\u0002\u001eA5\taD\u0003\u0002 \u0019\u00051AH]8pizJ\u0011aC\u0005\u0003\u0013)I!a\u0002\u0005\n\u0005\u00151\u0011BA\u0002\u0005\u0013\t1#!\u0001\u0002pa&\u0011\u0001&K\u0001\u0007\u001fB$\u0016\u0010]3\u000b\u0005\u0019\u0012\u0011BA\u0016-\u0005!!&/\u0019<feN,'B\u0001\u0015*!\tqs\u0006\u0004\u0001\u0005\u000bA\u0002!\u0019A\u0019\u0003\u0003Q\u000b\"AM\u001b\u0011\u0005A\u0019\u0014B\u0001\u001b\u0012\u0005\u001dqu\u000e\u001e5j]\u001e\u0004\"\u0001\u0005\u001c\n\u0005]\n\"aA!os\"A\u0011\b\u0001B\u0001B\u0003%!(A\u0006lC\u001a\\\u0017mQ8oM&<\u0007CA\u001eA\u001b\u0005a$BA\u001f?\u0003\ra\u0017N\u0019\u0006\u0003\u007f\u0011\tQa[1gW\u0006L!!\u0011\u001f\u0003\u0017-\u000bgm[1D_:4\u0017n\u001a\u0005\t\u0007\u0002\u0011\t\u0011)A\u0005\t\u0006I1m\u001c8wKJ$XM\u001d\t\u0005!\u0015;U&\u0003\u0002G#\tIa)\u001e8di&|g.\r\t\u0003\u0011&k\u0011AB\u0005\u0003\u0015\u001a\u0011q!T3tg\u0006<W\r\u0003\u0005M\u0001\t\r\t\u0015a\u0003N\u0003))g/\u001b3f]\u000e,G%\u000e\t\u0004\u001dFkS\"A(\u000b\u0005A\u000b\u0012a\u0002:fM2,7\r^\u0005\u0003%>\u0013\u0001b\u00117bgN$\u0016m\u001a\u0005\u0006)\u0002!\t!V\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0007YS6\f\u0006\u0002X3B\u0019\u0001\fA\u0017\u000e\u0003\tAQ\u0001T*A\u00045CQ!O*A\u0002iBQaQ*A\u0002\u0011Cq!\u0018\u0001C\u0002\u0013\u0005a,A\u0005cCR\u001c\u0007nU5{KV\tq\f\u0005\u0002\u0011A&\u0011\u0011-\u0005\u0002\u0004\u0013:$\bBB2\u0001A\u0003%q,\u0001\u0006cCR\u001c\u0007nU5{K\u0002Bq!\u001a\u0001C\u0002\u0013\u0005a-\u0001\u0006ng\u001e$UmY8eKJ,\u0012a\u001a\t\u0003Q6l\u0011!\u001b\u0006\u0003U.\f1!\u00199j\u0015\taG!A\u0006ue\u0006t7/Y2uS>t\u0017B\u00018j\u00059iUm]:bO\u0016$UmY8eKJDa\u0001\u001d\u0001!\u0002\u00139\u0017aC7tO\u0012+7m\u001c3fe\u0002B\u0001B\u001d\u0001\t\u0006\u0004%\ta]\u0001\u0007g>,(oY3\u0016\u0003Q\u0004\"!\u001e<\u000e\u0003yJ!a\u001e \u0003\u0017-\u000bgm[1T_V\u00148-\u001a\u0005\ts\u0002A\t\u0011)Q\u0005i\u000691o\\;sG\u0016\u0004\u0003\"B>\u0001\t\u0003b\u0018a\u00024pe\u0016\f7\r[\u000b\u0004{\u0006-Ac\u0001@\u0002\u0004A\u0011\u0001c`\u0005\u0004\u0003\u0003\t\"\u0001B+oSRDq!!\u0002{\u0001\u0004\t9!A\u0002gk:\u0004R\u0001E#.\u0003\u0013\u00012ALA\u0006\t\u0019\tiA\u001fb\u0001c\t\tQ\u000b")
/* loaded from: input_file:org/apache/gearpump/streaming/dsl/KafkaProducer.class */
public class KafkaProducer<T> implements OpType.Traverse<T> {
    private final KafkaConfig kafkaConfig;
    public final Function1<Message, T> org$apache$gearpump$streaming$dsl$KafkaProducer$$converter;
    private final int batchSize;
    private final MessageDecoder msgDecoder;
    private KafkaSource source;
    private volatile boolean bitmap$0;

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5 */
    private KafkaSource source$lzycompute() {
        Some some;
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                Some some2 = new Some(new KafkaSource(this.kafkaConfig.getClientId(), new TaskId(0, 0), 1, this.kafkaConfig, msgDecoder()));
                new KafkaProducer$$anonfun$source$1(this);
                if (some2.isEmpty()) {
                    some = None$.MODULE$;
                } else {
                    KafkaSource kafkaSource = (KafkaSource) some2.get();
                    kafkaSource.startFromBeginning();
                    some = new Some(kafkaSource);
                }
                this.source = (KafkaSource) some.get();
                this.bitmap$0 = true;
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = this;
            return this.source;
        }
    }

    public int batchSize() {
        return this.batchSize;
    }

    public MessageDecoder msgDecoder() {
        return this.msgDecoder;
    }

    public KafkaSource source() {
        return this.bitmap$0 ? this.source : source$lzycompute();
    }

    @Override // org.apache.gearpump.streaming.dsl.op.OpType.Traverse
    public <U> void foreach(Function1<T, U> function1) {
        List pull = source().pull(batchSize());
        new KafkaProducer$$anonfun$foreach$2(this, function1);
        List list = pull;
        while (true) {
            List list2 = list;
            if (list2.isEmpty()) {
                return;
            }
            function1.apply(this.org$apache$gearpump$streaming$dsl$KafkaProducer$$converter.apply((Message) list2.head()));
            list = (List) list2.tail();
        }
    }

    public KafkaProducer(KafkaConfig kafkaConfig, Function1<Message, T> function1, ClassTag<T> classTag) {
        this.kafkaConfig = kafkaConfig;
        this.org$apache$gearpump$streaming$dsl$KafkaProducer$$converter = function1;
        this.batchSize = kafkaConfig.getConsumerEmitBatchSize();
        this.msgDecoder = kafkaConfig.getMessageDecoder();
    }
}
