package org.apache.gearpump.streaming.examples.kafka;

import akka.actor.Cancellable;
import com.twitter.bijection.Injection$;
import java.util.concurrent.TimeUnit;
import kafka.producer.ProducerConfig;
import org.apache.gearpump.Message;
import org.apache.gearpump.cluster.UserConfig;
import org.apache.gearpump.streaming.kafka.KafkaSink;
import org.apache.gearpump.streaming.kafka.lib.KafkaConfig;
import org.apache.gearpump.streaming.kafka.lib.KafkaConfig$;
import org.apache.gearpump.streaming.kafka.lib.KafkaUtil$;
import org.apache.gearpump.streaming.task.StartTime;
import org.apache.gearpump.streaming.task.TaskActor;
import org.apache.gearpump.streaming.task.TaskContext;
import scala.Predef$;
import scala.StringContext;
import scala.Tuple2;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ScalaSignature;

/* compiled from: KafkaStreamProcessor.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005Uc\u0001B\u0001\u0003\u0001=\u0011AcS1gW\u0006\u001cFO]3b[B\u0013xnY3tg>\u0014(BA\u0002\u0005\u0003\u0015Y\u0017MZ6b\u0015\t)a!\u0001\u0005fq\u0006l\u0007\u000f\\3t\u0015\t9\u0001\"A\u0005tiJ,\u0017-\\5oO*\u0011\u0011BC\u0001\tO\u0016\f'\u000f];na*\u00111\u0002D\u0001\u0007CB\f7\r[3\u000b\u00035\t1a\u001c:h\u0007\u0001\u0019\"\u0001\u0001\t\u0011\u0005E!R\"\u0001\n\u000b\u0005M1\u0011\u0001\u0002;bg.L!!\u0006\n\u0003\u0013Q\u000b7o[!di>\u0014\b\"C\f\u0001\u0005\u0003\u0005\u000b\u0011\u0002\r\u001c\u0003-!\u0018m]6D_:$X\r\u001f;\u0011\u0005EI\u0012B\u0001\u000e\u0013\u0005-!\u0016m]6D_:$X\r\u001f;\n\u0005]!\u0002\u0002C\u000f\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0010\u0002\u0017%t\u0007/\u001e;D_:4\u0017n\u001a\t\u0003?\tj\u0011\u0001\t\u0006\u0003C!\tqa\u00197vgR,'/\u0003\u0002$A\tQQk]3s\u0007>tg-[4\t\u000b\u0015\u0002A\u0011\u0001\u0014\u0002\rqJg.\u001b;?)\r9\u0013F\u000b\t\u0003Q\u0001i\u0011A\u0001\u0005\u0006/\u0011\u0002\r\u0001\u0007\u0005\u0006;\u0011\u0002\rA\b\u0005\bY\u0001\u0011\r\u0011\"\u0003.\u0003-Y\u0017MZ6b\u0007>tg-[4\u0016\u00039\u0002\"aL\u001a\u000e\u0003AR!!\r\u001a\u0002\u00071L'M\u0003\u0002\u0004\r%\u0011A\u0007\r\u0002\f\u0017\u000647.Y\"p]\u001aLw\r\u0003\u00047\u0001\u0001\u0006IAL\u0001\rW\u000647.Y\"p]\u001aLw\r\t\u0005\bq\u0001\u0011\r\u0011\"\u0003:\u0003\u0015!x\u000e]5d+\u0005Q\u0004CA\u001eB\u001d\tat(D\u0001>\u0015\u0005q\u0014!B:dC2\f\u0017B\u0001!>\u0003\u0019\u0001&/\u001a3fM&\u0011!i\u0011\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005\u0001k\u0004BB#\u0001A\u0003%!(\u0001\u0004u_BL7\r\t\u0005\b\u000f\u0002\u0011\r\u0011\"\u0003I\u0003%\u0011\u0017\r^2i'&TX-F\u0001J!\ta$*\u0003\u0002L{\t\u0019\u0011J\u001c;\t\r5\u0003\u0001\u0015!\u0003J\u0003)\u0011\u0017\r^2i'&TX\r\t\u0005\b\u001f\u0002\u0011\r\u0011\"\u0003Q\u00039\u0001(o\u001c3vG\u0016\u00148i\u001c8gS\u001e,\u0012!\u0015\t\u0003%Zk\u0011a\u0015\u0006\u0003)V\u000b\u0001\u0002\u001d:pIV\u001cWM\u001d\u0006\u0002\u0007%\u0011qk\u0015\u0002\u000f!J|G-^2fe\u000e{gNZ5h\u0011\u0019I\u0006\u0001)A\u0005#\u0006y\u0001O]8ek\u000e,'oQ8oM&<\u0007\u0005C\u0004\\\u0001\t\u0007I\u0011\u0002/\u0002\u0013-\fgm[1TS:\\W#A/\u0011\u0005y{V\"\u0001\u001a\n\u0005\u0001\u0014$!C&bM.\f7+\u001b8l\u0011\u0019\u0011\u0007\u0001)A\u0005;\u0006Q1.\u00194lCNKgn\u001b\u0011\t\u000f\u0011\u0004\u0001\u0019!C\u0005K\u0006)1m\\;oiV\ta\r\u0005\u0002=O&\u0011\u0001.\u0010\u0002\u0005\u0019>tw\rC\u0004k\u0001\u0001\u0007I\u0011B6\u0002\u0013\r|WO\u001c;`I\u0015\fHC\u00017p!\taT.\u0003\u0002o{\t!QK\\5u\u0011\u001d\u0001\u0018.!AA\u0002\u0019\f1\u0001\u001f\u00132\u0011\u0019\u0011\b\u0001)Q\u0005M\u000611m\\;oi\u0002Bq\u0001\u001e\u0001A\u0002\u0013%Q-A\u0005mCN$8i\\;oi\"9a\u000f\u0001a\u0001\n\u00139\u0018!\u00047bgR\u001cu.\u001e8u?\u0012*\u0017\u000f\u0006\u0002mq\"9\u0001/^A\u0001\u0002\u00041\u0007B\u0002>\u0001A\u0003&a-\u0001\u0006mCN$8i\\;oi\u0002Bq\u0001 \u0001A\u0002\u0013%Q-\u0001\u0005mCN$H+[7f\u0011\u001dq\b\u00011A\u0005\n}\fA\u0002\\1tiRKW.Z0%KF$2\u0001\\A\u0001\u0011\u001d\u0001X0!AA\u0002\u0019Dq!!\u0002\u0001A\u0003&a-A\u0005mCN$H+[7fA!I\u0011\u0011\u0002\u0001A\u0002\u0013%\u00111B\u0001\ng\u000eDW\rZ;mKJ,\"!!\u0004\u0011\t\u0005=\u0011\u0011D\u0007\u0003\u0003#QA!a\u0005\u0002\u0016\u0005)\u0011m\u0019;pe*\u0011\u0011qC\u0001\u0005C.\\\u0017-\u0003\u0003\u0002\u001c\u0005E!aC\"b]\u000e,G\u000e\\1cY\u0016D\u0011\"a\b\u0001\u0001\u0004%I!!\t\u0002\u001bM\u001c\u0007.\u001a3vY\u0016\u0014x\fJ3r)\ra\u00171\u0005\u0005\na\u0006u\u0011\u0011!a\u0001\u0003\u001bA\u0001\"a\n\u0001A\u0003&\u0011QB\u0001\u000bg\u000eDW\rZ;mKJ\u0004\u0003bBA\u0016\u0001\u0011\u0005\u0013QF\u0001\b_:\u001cF/\u0019:u)\ra\u0017q\u0006\u0005\t\u0003c\tI\u00031\u0001\u00024\u0005I1\u000f^1siRKW.\u001a\t\u0004#\u0005U\u0012bAA\u001c%\tI1\u000b^1siRKW.\u001a\u0005\b\u0003w\u0001A\u0011IA\u001f\u0003\u0019ygNT3yiR\u0019A.a\u0010\t\u0011\u0005\u0005\u0013\u0011\ba\u0001\u0003\u0007\n1!\\:h!\u0011\t)%a\u0012\u000e\u0003!I1!!\u0013\t\u0005\u001diUm]:bO\u0016Dq!!\u0014\u0001\t\u0003\ny%\u0001\u0004p]N#x\u000e\u001d\u000b\u0002Y\"9\u00111\u000b\u0001\u0005\n\u0005=\u0013\u0001\u0005:fa>\u0014H\u000f\u00165s_V<\u0007\u000e];u\u0001")
/* loaded from: input_file:org/apache/gearpump/streaming/examples/kafka/KafkaStreamProcessor.class */
public class KafkaStreamProcessor extends TaskActor {
    private final KafkaConfig kafkaConfig;
    private final String topic;
    private final int batchSize;
    private final ProducerConfig producerConfig;
    private final KafkaSink kafkaSink;
    private long count;
    private long lastCount;
    private long lastTime;
    private Cancellable scheduler;

    private KafkaConfig kafkaConfig() {
        return this.kafkaConfig;
    }

    private String topic() {
        return this.topic;
    }

    private int batchSize() {
        return this.batchSize;
    }

    private ProducerConfig producerConfig() {
        return this.producerConfig;
    }

    private KafkaSink kafkaSink() {
        return this.kafkaSink;
    }

    private long count() {
        return this.count;
    }

    private void count_$eq(long j) {
        this.count = j;
    }

    private long lastCount() {
        return this.lastCount;
    }

    private void lastCount_$eq(long j) {
        this.lastCount = j;
    }

    private long lastTime() {
        return this.lastTime;
    }

    private void lastTime_$eq(long j) {
        this.lastTime = j;
    }

    private Cancellable scheduler() {
        return this.scheduler;
    }

    private void scheduler_$eq(Cancellable cancellable) {
        this.scheduler = cancellable;
    }

    public void onStart(StartTime startTime) {
        scheduler_$eq(context().system().scheduler().schedule(new FiniteDuration(5L, TimeUnit.SECONDS), new FiniteDuration(5L, TimeUnit.SECONDS), new KafkaStreamProcessor$$anonfun$onStart$1(this), context().dispatcher()));
    }

    public void onNext(Message message) {
        Tuple2 msg = message.msg();
        kafkaSink().write(topic(), (byte[]) Injection$.MODULE$.apply((String) msg._1(), Injection$.MODULE$.utf8()), (byte[]) Injection$.MODULE$.apply((String) msg._2(), Injection$.MODULE$.utf8()));
        count_$eq(count() + 1);
    }

    public void onStop() {
        kafkaSink().close();
        scheduler().cancel();
    }

    public void org$apache$gearpump$streaming$examples$kafka$KafkaStreamProcessor$$reportThroughput() {
        long currentTimeMillis = System.currentTimeMillis();
        LOG().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Task ", "; Throughput: ", " (messages, second)"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{super.taskContext().taskId(), new Tuple2.mcJJ.sp(count() - lastCount(), (currentTimeMillis - lastTime()) / 1000)})));
        lastCount_$eq(count());
        lastTime_$eq(currentTimeMillis);
    }

    public KafkaStreamProcessor(TaskContext taskContext, UserConfig userConfig) {
        super(taskContext, userConfig);
        this.kafkaConfig = (KafkaConfig) userConfig.getValue(KafkaConfig$.MODULE$.NAME(), system()).get();
        this.topic = kafkaConfig().getProducerTopic();
        this.batchSize = kafkaConfig().getProducerEmitBatchSize();
        this.producerConfig = KafkaUtil$.MODULE$.buildProducerConfig(kafkaConfig());
        this.kafkaSink = new KafkaSink(producerConfig(), batchSize());
        this.count = 0L;
        this.lastCount = 0L;
        this.lastTime = System.currentTimeMillis();
        this.scheduler = null;
    }
}
