package org.apache.gearpump.streaming.examples.kafka;

import akka.actor.package$;
import com.twitter.bijection.Injection$;
import org.apache.gearpump.Message;
import org.apache.gearpump.cluster.UserConfig;
import org.apache.gearpump.streaming.kafka.KafkaSource$;
import org.apache.gearpump.streaming.kafka.lib.KafkaConfig$;
import org.apache.gearpump.streaming.task.StartTime;
import org.apache.gearpump.streaming.task.TaskActor;
import org.apache.gearpump.streaming.task.TaskContext;
import org.apache.gearpump.streaming.transaction.api.MessageDecoder;
import org.apache.gearpump.streaming.transaction.api.TimeReplayableSource;
import org.apache.gearpump.streaming.transaction.api.TimeStampFilter;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;

/* compiled from: KafkaStreamProducer.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005Mb\u0001B\u0001\u0003\u0001=\u00111cS1gW\u0006\u001cFO]3b[B\u0013x\u000eZ;dKJT!a\u0001\u0003\u0002\u000b-\fgm[1\u000b\u0005\u00151\u0011\u0001C3yC6\u0004H.Z:\u000b\u0005\u001dA\u0011!C:ue\u0016\fW.\u001b8h\u0015\tI!\"\u0001\u0005hK\u0006\u0014\b/^7q\u0015\tYA\"\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u001b\u0005\u0019qN]4\u0004\u0001M\u0011\u0001\u0001\u0005\t\u0003#Qi\u0011A\u0005\u0006\u0003'\u0019\tA\u0001^1tW&\u0011QC\u0005\u0002\n)\u0006\u001c8.Q2u_JD\u0011b\u0006\u0001\u0003\u0002\u0003\u0006I\u0001G\u000e\u0002\u0017Q\f7o[\"p]R,\u0007\u0010\u001e\t\u0003#eI!A\u0007\n\u0003\u0017Q\u000b7o[\"p]R,\u0007\u0010^\u0005\u0003/QA\u0001\"\b\u0001\u0003\u0002\u0003\u0006IAH\u0001\u0005G>tg\r\u0005\u0002 E5\t\u0001E\u0003\u0002\"\u0011\u000591\r\\;ti\u0016\u0014\u0018BA\u0012!\u0005))6/\u001a:D_:4\u0017n\u001a\u0005\u0006K\u0001!\tAJ\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0007\u001dJ#\u0006\u0005\u0002)\u00015\t!\u0001C\u0003\u0018I\u0001\u0007\u0001\u0004C\u0003\u001eI\u0001\u0007a\u0004C\u0004-\u0001\t\u0007I\u0011B\u0017\u0002\r\r|gNZ5h+\u0005q\u0003\u0003B\u00187q\u0001k\u0011\u0001\r\u0006\u0003cI\n\u0011\"[7nkR\f'\r\\3\u000b\u0005M\"\u0014AC2pY2,7\r^5p]*\tQ'A\u0003tG\u0006d\u0017-\u0003\u00028a\t\u0019Q*\u00199\u0011\u0005erT\"\u0001\u001e\u000b\u0005mb\u0014\u0001\u00027b]\u001eT\u0011!P\u0001\u0005U\u00064\u0018-\u0003\u0002@u\t11\u000b\u001e:j]\u001e\u0004\"!\u0011\"\u000e\u0003QJ!a\u0011\u001b\u0003\u0007\u0005s\u0017\u0010\u0003\u0004F\u0001\u0001\u0006IAL\u0001\bG>tg-[4!\u0011\u001d9\u0005A1A\u0005\n!\u000b\u0011BY1uG\"\u001c\u0016N_3\u0016\u0003%\u0003\"!\u0011&\n\u0005-#$aA%oi\"1Q\n\u0001Q\u0001\n%\u000b!BY1uG\"\u001c\u0016N_3!\u0011\u001dy\u0005A1A\u0005\nA\u000b!\"\\:h\t\u0016\u001cw\u000eZ3s+\u0005\t\u0006C\u0001*X\u001b\u0005\u0019&B\u0001+V\u0003\r\t\u0007/\u001b\u0006\u0003-\u001a\t1\u0002\u001e:b]N\f7\r^5p]&\u0011\u0001l\u0015\u0002\u000f\u001b\u0016\u001c8/Y4f\t\u0016\u001cw\u000eZ3s\u0011\u0019Q\u0006\u0001)A\u0005#\u0006YQn]4EK\u000e|G-\u001a:!\u0011\u001da\u0006A1A\u0005\nu\u000baAZ5mi\u0016\u0014X#\u00010\u0011\u0005I{\u0016B\u00011T\u0005=!\u0016.\\3Ti\u0006l\u0007OR5mi\u0016\u0014\bB\u00022\u0001A\u0003%a,A\u0004gS2$XM\u001d\u0011\t\u000f\u0011\u0004!\u0019!C\u0005K\u000611o\\;sG\u0016,\u0012A\u001a\t\u0003%\u001eL!\u0001[*\u0003)QKW.\u001a*fa2\f\u00170\u00192mKN{WO]2f\u0011\u0019Q\u0007\u0001)A\u0005M\u000691o\\;sG\u0016\u0004\u0003b\u00027\u0001\u0001\u0004%I!\\\u0001\ngR\f'\u000f\u001e+j[\u0016,\u0012A\u001c\t\u0003_nt!\u0001]=\u000f\u0005EDhB\u0001:x\u001d\t\u0019h/D\u0001u\u0015\t)h\"\u0001\u0004=e>|GOP\u0005\u0002\u001b%\u00111\u0002D\u0005\u0003\u0013)I!A\u001f\u0005\u0002\u000fA\f7m[1hK&\u0011A0 \u0002\n)&lWm\u0015;b[BT!A\u001f\u0005\t\u0011}\u0004\u0001\u0019!C\u0005\u0003\u0003\tQb\u001d;beR$\u0016.\\3`I\u0015\fH\u0003BA\u0002\u0003\u0013\u00012!QA\u0003\u0013\r\t9\u0001\u000e\u0002\u0005+:LG\u000f\u0003\u0005\u0002\fy\f\t\u00111\u0001o\u0003\rAH%\r\u0005\b\u0003\u001f\u0001\u0001\u0015)\u0003o\u0003)\u0019H/\u0019:u)&lW\r\t\u0005\b\u0003'\u0001A\u0011IA\u000b\u0003\u001dygn\u0015;beR$B!a\u0001\u0002\u0018!A\u0011\u0011DA\t\u0001\u0004\tY\"\u0001\u0007oK^\u001cF/\u0019:u)&lW\rE\u0002\u0012\u0003;I1!a\b\u0013\u0005%\u0019F/\u0019:u)&lW\rC\u0004\u0002$\u0001!\t%!\n\u0002\r=tg*\u001a=u)\u0011\t\u0019!a\n\t\u0011\u0005%\u0012\u0011\u0005a\u0001\u0003W\t1!\\:h!\u0011\ti#a\f\u000e\u0003!I1!!\r\t\u0005\u001diUm]:bO\u0016\u0004")
/* loaded from: input_file:org/apache/gearpump/streaming/examples/kafka/KafkaStreamProducer.class */
public class KafkaStreamProducer extends TaskActor {
    private final Map<String, Object> config;
    private final int batchSize;
    private final MessageDecoder msgDecoder;
    private final TimeStampFilter org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$filter;
    private final TimeReplayableSource source;
    private long org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$startTime;

    private Map<String, Object> config() {
        return this.config;
    }

    private int batchSize() {
        return this.batchSize;
    }

    private MessageDecoder msgDecoder() {
        return this.msgDecoder;
    }

    public TimeStampFilter org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$filter() {
        return this.org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$filter;
    }

    private TimeReplayableSource source() {
        return this.source;
    }

    public long org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$startTime() {
        return this.org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$startTime;
    }

    private void org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$startTime_$eq(long j) {
        this.org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$startTime = j;
    }

    public void onStart(StartTime startTime) {
        org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$startTime_$eq(startTime.startTime());
        LOG().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"start time ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$startTime())})));
        source().setStartTime(org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$startTime());
        package$.MODULE$.actorRef2Scala(self()).$bang(new Message("start", System.currentTimeMillis()), self());
    }

    public void onNext(Message message) {
        List pull = source().pull(batchSize());
        KafkaStreamProducer$$anonfun$onNext$1 kafkaStreamProducer$$anonfun$onNext$1 = new KafkaStreamProducer$$anonfun$onNext$1(this);
        while (true) {
            List list = pull;
            if (list.isEmpty()) {
                package$.MODULE$.actorRef2Scala(self()).$bang(new Message("continue", System.currentTimeMillis()), self());
                return;
            }
            Option filter = org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$filter().filter((Message) list.head(), org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$startTime());
            new KafkaStreamProducer$$anonfun$onNext$1$$anonfun$apply$1(kafkaStreamProducer$$anonfun$onNext$1);
            if (filter.isEmpty()) {
                None$ none$ = None$.MODULE$;
            } else {
                kafkaStreamProducer$$anonfun$onNext$1.$outer.output((Message) filter.get());
                new Some(BoxedUnit.UNIT);
            }
            pull = (List) list.tail();
        }
    }

    public KafkaStreamProducer(TaskContext taskContext, UserConfig userConfig) {
        super(taskContext, userConfig);
        this.config = userConfig.config();
        this.batchSize = KafkaConfig$.MODULE$.ConfigToKafka(config()).getConsumerEmitBatchSize();
        this.msgDecoder = new MessageDecoder(this) { // from class: org.apache.gearpump.streaming.examples.kafka.KafkaStreamProducer$$anon$1
            public Message fromBytes(byte[] bArr) {
                Success invert = Injection$.MODULE$.invert(bArr, Injection$.MODULE$.utf8());
                if (invert instanceof Success) {
                    return new Message((String) invert.value(), System.currentTimeMillis());
                }
                if (invert instanceof Failure) {
                    throw ((Failure) invert).exception();
                }
                throw new MatchError(invert);
            }
        };
        this.org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$filter = new TimeStampFilter(this) { // from class: org.apache.gearpump.streaming.examples.kafka.KafkaStreamProducer$$anon$2
            public Option<Message> filter(Message message, long j) {
                return Option$.MODULE$.option2Iterable(Option$.MODULE$.apply(message)).find(new KafkaStreamProducer$$anon$2$$anonfun$filter$1(this, j));
            }
        };
        this.source = KafkaSource$.MODULE$.apply(super.taskContext().appId(), super.taskContext(), userConfig, msgDecoder());
        this.org$apache$gearpump$streaming$examples$kafka$KafkaStreamProducer$$startTime = 0L;
    }
}
