package org.apache.gearpump.streaming.examples.kafka;

import akka.actor.package$;
import org.apache.gearpump.Message;
import org.apache.gearpump.cluster.UserConfig;
import org.apache.gearpump.streaming.kafka.KafkaSource;
import org.apache.gearpump.streaming.kafka.lib.KafkaConfig;
import org.apache.gearpump.streaming.kafka.lib.KafkaConfig$;
import org.apache.gearpump.streaming.task.StartTime;
import org.apache.gearpump.streaming.task.TaskActor;
import org.apache.gearpump.streaming.task.TaskContext;
import org.apache.gearpump.streaming.transaction.api.MessageDecoder;
import org.apache.gearpump.streaming.transaction.api.TimeReplayableSource;
import org.apache.gearpump.streaming.transaction.api.TimeStampFilter;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.collection.immutable.List;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: KafkaStreamProducer.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005ma\u0001B\u0001\u0003\u0001=\u00111cS1gW\u0006\u001cFO]3b[B\u0013x\u000eZ;dKJT!a\u0001\u0003\u0002\u000b-\fgm[1\u000b\u0005\u00151\u0011\u0001C3yC6\u0004H.Z:\u000b\u0005\u001dA\u0011!C:ue\u0016\fW.\u001b8h\u0015\tI!\"\u0001\u0005hK\u0006\u0014\b/^7q\u0015\tYA\"\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u001b\u0005\u0019qN]4\u0004\u0001M\u0011\u0001\u0001\u0005\t\u0003#Qi\u0011A\u0005\u0006\u0003'\u0019\tA\u0001^1tW&\u0011QC\u0005\u0002\n)\u0006\u001c8.Q2u_JD\u0011b\u0006\u0001\u0003\u0002\u0003\u0006I\u0001G\u000e\u0002\u0017Q\f7o[\"p]R,\u0007\u0010\u001e\t\u0003#eI!A\u0007\n\u0003\u0017Q\u000b7o[\"p]R,\u0007\u0010^\u0005\u0003/QA\u0001\"\b\u0001\u0003\u0002\u0003\u0006IAH\u0001\u0005G>tg\r\u0005\u0002 E5\t\u0001E\u0003\u0002\"\u0011\u000591\r\\;ti\u0016\u0014\u0018BA\u0012!\u0005))6/\u001a:D_:4\u0017n\u001a\u0005\u0006K\u0001!\tAJ\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0007\u001dJ#\u0006\u0005\u0002)\u00015\t!\u0001C\u0003\u0018I\u0001\u0007\u0001\u0004C\u0003\u001eI\u0001\u0007a\u0004C\u0004-\u0001\t\u0007I\u0011B\u0017\u0002\u0017-\fgm[1D_:4\u0017nZ\u000b\u0002]A\u0011qfM\u0007\u0002a)\u0011\u0011GM\u0001\u0004Y&\u0014'BA\u0002\u0007\u0013\t!\u0004GA\u0006LC\u001a\\\u0017mQ8oM&<\u0007B\u0002\u001c\u0001A\u0003%a&\u0001\u0007lC\u001a\\\u0017mQ8oM&<\u0007\u0005C\u00049\u0001\t\u0007I\u0011B\u001d\u0002\u0013\t\fGo\u00195TSj,W#\u0001\u001e\u0011\u0005mrT\"\u0001\u001f\u000b\u0003u\nQa]2bY\u0006L!a\u0010\u001f\u0003\u0007%sG\u000f\u0003\u0004B\u0001\u0001\u0006IAO\u0001\u000bE\u0006$8\r[*ju\u0016\u0004\u0003bB\"\u0001\u0005\u0004%I\u0001R\u0001\u000b[N<G)Z2pI\u0016\u0014X#A#\u0011\u0005\u0019[U\"A$\u000b\u0005!K\u0015aA1qS*\u0011!JB\u0001\fiJ\fgn]1di&|g.\u0003\u0002M\u000f\nqQ*Z:tC\u001e,G)Z2pI\u0016\u0014\bB\u0002(\u0001A\u0003%Q)A\u0006ng\u001e$UmY8eKJ\u0004\u0003b\u0002)\u0001\u0005\u0004%I!U\u0001\u0007M&dG/\u001a:\u0016\u0003I\u0003\"AR*\n\u0005Q;%a\u0004+j[\u0016\u001cF/Y7q\r&dG/\u001a:\t\rY\u0003\u0001\u0015!\u0003S\u0003\u001d1\u0017\u000e\u001c;fe\u0002Bq\u0001\u0017\u0001C\u0002\u0013%\u0011,\u0001\u0004t_V\u00148-Z\u000b\u00025B\u0011aiW\u0005\u00039\u001e\u0013A\u0003V5nKJ+\u0007\u000f\\1zC\ndWmU8ve\u000e,\u0007B\u00020\u0001A\u0003%!,A\u0004t_V\u00148-\u001a\u0011\t\u000f\u0001\u0004\u0001\u0019!C\u0005C\u0006I1\u000f^1siRKW.Z\u000b\u0002EB\u00111m\u001c\b\u0003I6t!!\u001a7\u000f\u0005\u0019\\gBA4k\u001b\u0005A'BA5\u000f\u0003\u0019a$o\\8u}%\tQ\"\u0003\u0002\f\u0019%\u0011\u0011BC\u0005\u0003]\"\tq\u0001]1dW\u0006<W-\u0003\u0002qc\nIA+[7f'R\fW\u000e\u001d\u0006\u0003]\"Aqa\u001d\u0001A\u0002\u0013%A/A\u0007ti\u0006\u0014H\u000fV5nK~#S-\u001d\u000b\u0003kb\u0004\"a\u000f<\n\u0005]d$\u0001B+oSRDq!\u001f:\u0002\u0002\u0003\u0007!-A\u0002yIEBaa\u001f\u0001!B\u0013\u0011\u0017AC:uCJ$H+[7fA!)Q\u0010\u0001C!}\u00069qN\\*uCJ$HCA;��\u0011\u001d\t\t\u0001 a\u0001\u0003\u0007\tAB\\3x'R\f'\u000f\u001e+j[\u0016\u00042!EA\u0003\u0013\r\t9A\u0005\u0002\n'R\f'\u000f\u001e+j[\u0016Dq!a\u0003\u0001\t\u0003\ni!\u0001\u0004p]:+\u0007\u0010\u001e\u000b\u0004k\u0006=\u0001\u0002CA\t\u0003\u0013\u0001\r!a\u0005\u0002\u00075\u001cx\r\u0005\u0003\u0002\u0016\u0005]Q\"\u0001\u0005\n\u0007\u0005e\u0001BA\u0004NKN\u001c\u0018mZ3")
/* loaded from: input_file:org/apache/gearpump/streaming/examples/kafka/KafkaStreamProducer.class */
public class KafkaStreamProducer extends TaskActor {
    private final KafkaConfig kafkaConfig;
    private final int batchSize;
    private final MessageDecoder msgDecoder;
    private final TimeStampFilter org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$filter;
    private final TimeReplayableSource source;
    private long org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$startTime;

    private KafkaConfig kafkaConfig() {
        return this.kafkaConfig;
    }

    private int batchSize() {
        return this.batchSize;
    }

    private MessageDecoder msgDecoder() {
        return this.msgDecoder;
    }

    public TimeStampFilter org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$filter() {
        return this.org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$filter;
    }

    private TimeReplayableSource source() {
        return this.source;
    }

    public long org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$startTime() {
        return this.org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$startTime;
    }

    private void org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$startTime_$eq(long j) {
        this.org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$startTime = j;
    }

    public void onStart(StartTime startTime) {
        org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$startTime_$eq(startTime.startTime());
        LOG().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"start time ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$startTime())})));
        source().setStartTime(org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$startTime());
        package$.MODULE$.actorRef2Scala(self()).$bang(new Message("start", System.currentTimeMillis()), self());
    }

    public void onNext(Message message) {
        List pull = source().pull(batchSize());
        KafkaStreamProducer$$anonfun$onNext$1 kafkaStreamProducer$$anonfun$onNext$1 = new KafkaStreamProducer$$anonfun$onNext$1(this);
        while (true) {
            List list = pull;
            if (list.isEmpty()) {
                package$.MODULE$.actorRef2Scala(self()).$bang(new Message("continue", System.currentTimeMillis()), self());
                return;
            }
            Option filter = org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$filter().filter((Message) list.head(), org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$startTime());
            new KafkaStreamProducer$$anonfun$onNext$1$$anonfun$apply$1(kafkaStreamProducer$$anonfun$onNext$1);
            if (filter.isEmpty()) {
                None$ none$ = None$.MODULE$;
            } else {
                kafkaStreamProducer$$anonfun$onNext$1.$outer.output((Message) filter.get());
                new Some(BoxedUnit.UNIT);
            }
            pull = (List) list.tail();
        }
    }

    public KafkaStreamProducer(TaskContext taskContext, UserConfig userConfig) {
        super(taskContext, userConfig);
        this.kafkaConfig = (KafkaConfig) userConfig.getValue(KafkaConfig$.MODULE$.NAME(), system()).get();
        this.batchSize = kafkaConfig().getConsumerEmitBatchSize();
        this.msgDecoder = kafkaConfig().getMessageDecoder();
        this.org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$filter = kafkaConfig().getTimeStampFilter();
        this.source = new KafkaSource(super.taskContext().appId(), super.taskContext(), kafkaConfig(), msgDecoder());
        this.org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$startTime = 0L;
    }
}
