package org.apache.gearpump.streaming.examples.kafka;

import akka.actor.package$;
import org.apache.gearpump.Message;
import org.apache.gearpump.cluster.UserConfig;
import org.apache.gearpump.streaming.TaskDescription;
import org.apache.gearpump.streaming.kafka.KafkaSource;
import org.apache.gearpump.streaming.kafka.lib.KafkaConfig;
import org.apache.gearpump.streaming.kafka.lib.KafkaConfig$;
import org.apache.gearpump.streaming.task.StartTime;
import org.apache.gearpump.streaming.task.Task;
import org.apache.gearpump.streaming.task.TaskContext;
import org.apache.gearpump.streaming.transaction.api.MessageDecoder;
import org.apache.gearpump.streaming.transaction.api.TimeReplayableSource;
import org.apache.gearpump.streaming.transaction.api.TimeStampFilter;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.collection.immutable.List;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: KafkaStreamProducer.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u001db\u0001B\u0001\u0003\u0001=\u00111cS1gW\u0006\u001cFO]3b[B\u0013x\u000eZ;dKJT!a\u0001\u0003\u0002\u000b-\fgm[1\u000b\u0005\u00151\u0011\u0001C3yC6\u0004H.Z:\u000b\u0005\u001dA\u0011!C:ue\u0016\fW.\u001b8h\u0015\tI!\"\u0001\u0005hK\u0006\u0014\b/^7q\u0015\tYA\"\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u001b\u0005\u0019qN]4\u0004\u0001M\u0011\u0001\u0001\u0005\t\u0003#Qi\u0011A\u0005\u0006\u0003'\u0019\tA\u0001^1tW&\u0011QC\u0005\u0002\u0005)\u0006\u001c8\u000e\u0003\u0005\u0018\u0001\t\u0005\t\u0015!\u0003\u0019\u0003-!\u0018m]6D_:$X\r\u001f;\u0011\u0005EI\u0012B\u0001\u000e\u0013\u0005-!\u0016m]6D_:$X\r\u001f;\t\u0011q\u0001!\u0011!Q\u0001\nu\tAaY8oMB\u0011a$I\u0007\u0002?)\u0011\u0001\u0005C\u0001\bG2,8\u000f^3s\u0013\t\u0011sD\u0001\u0006Vg\u0016\u00148i\u001c8gS\u001eDQ\u0001\n\u0001\u0005\u0002\u0015\na\u0001P5oSRtDc\u0001\u0014)SA\u0011q\u0005A\u0007\u0002\u0005!)qc\ta\u00011!)Ad\ta\u0001;!91\u0006\u0001b\u0001\n\u0013a\u0013aC6bM.\f7i\u001c8gS\u001e,\u0012!\f\t\u0003]Ij\u0011a\f\u0006\u0003aE\n1\u0001\\5c\u0015\t\u0019a!\u0003\u00024_\tY1*\u00194lC\u000e{gNZ5h\u0011\u0019)\u0004\u0001)A\u0005[\u0005a1.\u00194lC\u000e{gNZ5hA!9q\u0007\u0001b\u0001\n\u0013A\u0014!\u00032bi\u000eD7+\u001b>f+\u0005I\u0004C\u0001\u001e>\u001b\u0005Y$\"\u0001\u001f\u0002\u000bM\u001c\u0017\r\\1\n\u0005yZ$aA%oi\"1\u0001\t\u0001Q\u0001\ne\n!BY1uG\"\u001c\u0016N_3!\u0011\u001d\u0011\u0005A1A\u0005\n\r\u000b!\"\\:h\t\u0016\u001cw\u000eZ3s+\u0005!\u0005CA#K\u001b\u00051%BA$I\u0003\r\t\u0007/\u001b\u0006\u0003\u0013\u001a\t1\u0002\u001e:b]N\f7\r^5p]&\u00111J\u0012\u0002\u000f\u001b\u0016\u001c8/Y4f\t\u0016\u001cw\u000eZ3s\u0011\u0019i\u0005\u0001)A\u0005\t\u0006YQn]4EK\u000e|G-\u001a:!\u0011\u001dy\u0005A1A\u0005\nA\u000baAZ5mi\u0016\u0014X#A)\u0011\u0005\u0015\u0013\u0016BA*G\u0005=!\u0016.\\3Ti\u0006l\u0007OR5mi\u0016\u0014\bBB+\u0001A\u0003%\u0011+A\u0004gS2$XM\u001d\u0011\t\u000f]\u0003!\u0019!C\u0001q\u0005yA/Y:l!\u0006\u0014\u0018\r\u001c7fY&\u001cX\u000e\u0003\u0004Z\u0001\u0001\u0006I!O\u0001\u0011i\u0006\u001c8\u000eU1sC2dW\r\\5t[\u0002Bqa\u0017\u0001C\u0002\u0013%A,\u0001\u0004t_V\u00148-Z\u000b\u0002;B\u0011QIX\u0005\u0003?\u001a\u0013A\u0003V5nKJ+\u0007\u000f\\1zC\ndWmU8ve\u000e,\u0007BB1\u0001A\u0003%Q,A\u0004t_V\u00148-\u001a\u0011\t\u000f\r\u0004\u0001\u0019!C\u0005I\u0006I1\u000f^1siRKW.Z\u000b\u0002KB\u0011aM\u001d\b\u0003OBt!\u0001[8\u000f\u0005%tgB\u00016n\u001b\u0005Y'B\u00017\u000f\u0003\u0019a$o\\8u}%\tQ\"\u0003\u0002\f\u0019%\u0011\u0011BC\u0005\u0003c\"\tq\u0001]1dW\u0006<W-\u0003\u0002ti\nIA+[7f'R\fW\u000e\u001d\u0006\u0003c\"AqA\u001e\u0001A\u0002\u0013%q/A\u0007ti\u0006\u0014H\u000fV5nK~#S-\u001d\u000b\u0003qn\u0004\"AO=\n\u0005i\\$\u0001B+oSRDq\u0001`;\u0002\u0002\u0003\u0007Q-A\u0002yIEBaA \u0001!B\u0013)\u0017AC:uCJ$H+[7fA!9\u0011\u0011\u0001\u0001\u0005B\u0005\r\u0011aB8o'R\f'\u000f\u001e\u000b\u0004q\u0006\u0015\u0001bBA\u0004\u007f\u0002\u0007\u0011\u0011B\u0001\r]\u0016<8\u000b^1siRKW.\u001a\t\u0004#\u0005-\u0011bAA\u0007%\tI1\u000b^1siRKW.\u001a\u0005\b\u0003#\u0001A\u0011IA\n\u0003\u0019ygNT3yiR\u0019\u00010!\u0006\t\u0011\u0005]\u0011q\u0002a\u0001\u00033\t1!\\:h!\u0011\tY\"!\b\u000e\u0003!I1!a\b\t\u0005\u001diUm]:bO\u0016Dq!a\t\u0001\t\u0003\n)#\u0001\u0004p]N#x\u000e\u001d\u000b\u0002q\u0002")
/* loaded from: input_file:org/apache/gearpump/streaming/examples/kafka/KafkaStreamProducer.class */
public class KafkaStreamProducer extends Task {
    public final TaskContext org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$taskContext;
    private final KafkaConfig kafkaConfig;
    private final int batchSize;
    private final MessageDecoder msgDecoder;
    private final TimeStampFilter org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$filter;
    private final int taskParallelism;
    private final TimeReplayableSource source;
    private long org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$startTime;

    private KafkaConfig kafkaConfig() {
        return this.kafkaConfig;
    }

    private int batchSize() {
        return this.batchSize;
    }

    private MessageDecoder msgDecoder() {
        return this.msgDecoder;
    }

    public TimeStampFilter org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$filter() {
        return this.org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$filter;
    }

    public int taskParallelism() {
        return this.taskParallelism;
    }

    private TimeReplayableSource source() {
        return this.source;
    }

    public long org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$startTime() {
        return this.org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$startTime;
    }

    private void org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$startTime_$eq(long j) {
        this.org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$startTime = j;
    }

    public void onStart(StartTime startTime) {
        org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$startTime_$eq(startTime.startTime());
        LOG().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"start time ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$startTime())})));
        source().setStartTime(org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$startTime());
        package$.MODULE$.actorRef2Scala(self()).$bang(new Message("start", System.currentTimeMillis()), self());
    }

    public void onNext(Message message) {
        List pull = source().pull(batchSize());
        KafkaStreamProducer$$anonfun$onNext$1 kafkaStreamProducer$$anonfun$onNext$1 = new KafkaStreamProducer$$anonfun$onNext$1(this);
        while (true) {
            List list = pull;
            if (list.isEmpty()) {
                package$.MODULE$.actorRef2Scala(self()).$bang(new Message("continue", System.currentTimeMillis()), self());
                return;
            }
            Option filter = org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$filter().filter((Message) list.head(), org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$startTime());
            new KafkaStreamProducer$$anonfun$onNext$1$$anonfun$apply$1(kafkaStreamProducer$$anonfun$onNext$1);
            if (filter.isEmpty()) {
                None$ none$ = None$.MODULE$;
            } else {
                kafkaStreamProducer$$anonfun$onNext$1.$outer.org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$taskContext.output((Message) filter.get());
                new Some(BoxedUnit.UNIT);
            }
            pull = (List) list.tail();
        }
    }

    public void onStop() {
        LOG().info("closing kafka source...");
        source().close();
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public KafkaStreamProducer(TaskContext taskContext, UserConfig userConfig) {
        super(taskContext, userConfig);
        this.org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$taskContext = taskContext;
        this.kafkaConfig = (KafkaConfig) userConfig.getValue(KafkaConfig$.MODULE$.NAME(), system()).get();
        this.batchSize = kafkaConfig().getConsumerEmitBatchSize();
        this.msgDecoder = kafkaConfig().getMessageDecoder();
        this.org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$filter = kafkaConfig().getTimeStampFilter();
        this.taskParallelism = ((TaskDescription) taskContext.dag().processors().apply(BoxesRunTime.boxToInteger(taskContext.taskId().processorId()))).parallelism();
        this.source = new KafkaSource(taskContext.appName(), taskContext.taskId(), taskParallelism(), kafkaConfig(), msgDecoder());
        this.org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$startTime = 0L;
    }
}
