package org.apache.gearpump.streaming.examples.kafka;

import akka.actor.Cancellable;
import com.twitter.bijection.Injection$;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.gearpump.Message;
import org.apache.gearpump.cluster.UserConfig;
import org.apache.gearpump.streaming.kafka.KafkaSink;
import org.apache.gearpump.streaming.kafka.lib.KafkaConfig;
import org.apache.gearpump.streaming.kafka.lib.KafkaConfig$;
import org.apache.gearpump.streaming.kafka.lib.KafkaUtil$;
import org.apache.gearpump.streaming.task.StartTime;
import org.apache.gearpump.streaming.task.Task;
import org.apache.gearpump.streaming.task.TaskContext;
import scala.Predef$;
import scala.StringContext;
import scala.Tuple2;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: KafkaStreamProcessor.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0015c\u0001B\u0001\u0003\u0001=\u0011AcS1gW\u0006\u001cFO]3b[B\u0013xnY3tg>\u0014(BA\u0002\u0005\u0003\u0015Y\u0017MZ6b\u0015\t)a!\u0001\u0005fq\u0006l\u0007\u000f\\3t\u0015\t9\u0001\"A\u0005tiJ,\u0017-\\5oO*\u0011\u0011BC\u0001\tO\u0016\f'\u000f];na*\u00111\u0002D\u0001\u0007CB\f7\r[3\u000b\u00035\t1a\u001c:h\u0007\u0001\u0019\"\u0001\u0001\t\u0011\u0005E!R\"\u0001\n\u000b\u0005M1\u0011\u0001\u0002;bg.L!!\u0006\n\u0003\tQ\u000b7o\u001b\u0005\t/\u0001\u0011\t\u0011)A\u00051\u0005YA/Y:l\u0007>tG/\u001a=u!\t\t\u0012$\u0003\u0002\u001b%\tYA+Y:l\u0007>tG/\u001a=u\u0011!a\u0002A!A!\u0002\u0013i\u0012aC5oaV$8i\u001c8gS\u001e\u0004\"AH\u0011\u000e\u0003}Q!\u0001\t\u0005\u0002\u000f\rdWo\u001d;fe&\u0011!e\b\u0002\u000b+N,'oQ8oM&<\u0007\"\u0002\u0013\u0001\t\u0003)\u0013A\u0002\u001fj]&$h\bF\u0002'Q%\u0002\"a\n\u0001\u000e\u0003\tAQaF\u0012A\u0002aAQ\u0001H\u0012A\u0002uAqa\u000b\u0001C\u0002\u0013%A&A\u0006lC\u001a\\\u0017mQ8oM&<W#A\u0017\u0011\u00059\u0012T\"A\u0018\u000b\u0005A\n\u0014a\u00017jE*\u00111AB\u0005\u0003g=\u00121bS1gW\u0006\u001cuN\u001c4jO\"1Q\u0007\u0001Q\u0001\n5\nAb[1gW\u0006\u001cuN\u001c4jO\u0002Bqa\u000e\u0001C\u0002\u0013%\u0001(A\u0003u_BL7-F\u0001:!\tQ\u0004I\u0004\u0002<}5\tAHC\u0001>\u0003\u0015\u00198-\u00197b\u0013\tyD(\u0001\u0004Qe\u0016$WMZ\u0005\u0003\u0003\n\u0013aa\u0015;sS:<'BA =\u0011\u0019!\u0005\u0001)A\u0005s\u00051Ao\u001c9jG\u0002BqA\u0012\u0001C\u0002\u0013%q)\u0001\bqe>$WoY3s\u0007>tg-[4\u0016\u0003!\u0003\"!\u0013(\u000e\u0003)S!a\u0013'\u0002\tU$\u0018\u000e\u001c\u0006\u0002\u001b\u0006!!.\u0019<b\u0013\ty%J\u0001\u0006Qe>\u0004XM\u001d;jKNDa!\u0015\u0001!\u0002\u0013A\u0015a\u00049s_\u0012,8-\u001a:D_:4\u0017n\u001a\u0011\t\u000fM\u0003!\u0019!C\u0005)\u0006I1.\u00194lCNKgn[\u000b\u0002+B\u0011akV\u0007\u0002c%\u0011\u0001,\r\u0002\n\u0017\u000647.Y*j].DaA\u0017\u0001!\u0002\u0013)\u0016AC6bM.\f7+\u001b8lA!9A\f\u0001a\u0001\n\u0013i\u0016!B2pk:$X#\u00010\u0011\u0005mz\u0016B\u00011=\u0005\u0011auN\\4\t\u000f\t\u0004\u0001\u0019!C\u0005G\u0006I1m\\;oi~#S-\u001d\u000b\u0003I\u001e\u0004\"aO3\n\u0005\u0019d$\u0001B+oSRDq\u0001[1\u0002\u0002\u0003\u0007a,A\u0002yIEBaA\u001b\u0001!B\u0013q\u0016AB2pk:$\b\u0005C\u0004m\u0001\u0001\u0007I\u0011B/\u0002\u00131\f7\u000f^\"pk:$\bb\u00028\u0001\u0001\u0004%Ia\\\u0001\u000eY\u0006\u001cHoQ8v]R|F%Z9\u0015\u0005\u0011\u0004\bb\u00025n\u0003\u0003\u0005\rA\u0018\u0005\u0007e\u0002\u0001\u000b\u0015\u00020\u0002\u00151\f7\u000f^\"pk:$\b\u0005C\u0004u\u0001\u0001\u0007I\u0011B/\u0002\u00111\f7\u000f\u001e+j[\u0016DqA\u001e\u0001A\u0002\u0013%q/\u0001\u0007mCN$H+[7f?\u0012*\u0017\u000f\u0006\u0002eq\"9\u0001.^A\u0001\u0002\u0004q\u0006B\u0002>\u0001A\u0003&a,A\u0005mCN$H+[7fA!9A\u0010\u0001a\u0001\n\u0013i\u0018!C:dQ\u0016$W\u000f\\3s+\u0005q\bcA@\u0002\n5\u0011\u0011\u0011\u0001\u0006\u0005\u0003\u0007\t)!A\u0003bGR|'O\u0003\u0002\u0002\b\u0005!\u0011m[6b\u0013\u0011\tY!!\u0001\u0003\u0017\r\u000bgnY3mY\u0006\u0014G.\u001a\u0005\n\u0003\u001f\u0001\u0001\u0019!C\u0005\u0003#\tQb]2iK\u0012,H.\u001a:`I\u0015\fHc\u00013\u0002\u0014!A\u0001.!\u0004\u0002\u0002\u0003\u0007a\u0010C\u0004\u0002\u0018\u0001\u0001\u000b\u0015\u0002@\u0002\u0015M\u001c\u0007.\u001a3vY\u0016\u0014\b\u0005C\u0004\u0002\u001c\u0001!\t%!\b\u0002\u000f=t7\u000b^1siR\u0019A-a\b\t\u0011\u0005\u0005\u0012\u0011\u0004a\u0001\u0003G\t\u0011b\u001d;beR$\u0016.\\3\u0011\u0007E\t)#C\u0002\u0002(I\u0011\u0011b\u0015;beR$\u0016.\\3\t\u000f\u0005-\u0002\u0001\"\u0011\u0002.\u00051qN\u001c(fqR$2\u0001ZA\u0018\u0011!\t\t$!\u000bA\u0002\u0005M\u0012aA7tOB!\u0011QGA\u001c\u001b\u0005A\u0011bAA\u001d\u0011\t9Q*Z:tC\u001e,\u0007bBA\u001f\u0001\u0011\u0005\u0013qH\u0001\u0007_:\u001cFo\u001c9\u0015\u0003\u0011Dq!a\u0011\u0001\t\u0013\ty$\u0001\tsKB|'\u000f\u001e+ie>,x\r\u001b9vi\u0002")
/* loaded from: input_file:org/apache/gearpump/streaming/examples/kafka/KafkaStreamProcessor.class */
public class KafkaStreamProcessor extends Task {
    private final TaskContext taskContext;
    private final KafkaConfig kafkaConfig;
    private final String topic;
    private final Properties producerConfig;
    private final KafkaSink kafkaSink;
    private long count;
    private long lastCount;
    private long lastTime;
    private Cancellable scheduler;

    private KafkaConfig kafkaConfig() {
        return this.kafkaConfig;
    }

    private String topic() {
        return this.topic;
    }

    private Properties producerConfig() {
        return this.producerConfig;
    }

    private KafkaSink kafkaSink() {
        return this.kafkaSink;
    }

    private long count() {
        return this.count;
    }

    private void count_$eq(long j) {
        this.count = j;
    }

    private long lastCount() {
        return this.lastCount;
    }

    private void lastCount_$eq(long j) {
        this.lastCount = j;
    }

    private long lastTime() {
        return this.lastTime;
    }

    private void lastTime_$eq(long j) {
        this.lastTime = j;
    }

    private Cancellable scheduler() {
        return this.scheduler;
    }

    private void scheduler_$eq(Cancellable cancellable) {
        this.scheduler = cancellable;
    }

    public void onStart(StartTime startTime) {
        scheduler_$eq(this.taskContext.schedule(new FiniteDuration(5L, TimeUnit.SECONDS), new FiniteDuration(5L, TimeUnit.SECONDS), new KafkaStreamProcessor$$anonfun$onStart$1(this)));
    }

    public void onNext(Message message) {
        Tuple2 tuple2 = (Tuple2) message.msg();
        kafkaSink().write(topic(), (byte[]) Injection$.MODULE$.apply((String) tuple2._1(), Injection$.MODULE$.utf8()), (byte[]) Injection$.MODULE$.apply((String) tuple2._2(), Injection$.MODULE$.utf8()));
        count_$eq(count() + 1);
    }

    public void onStop() {
        if (scheduler() == null) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxesRunTime.boxToBoolean(scheduler().cancel());
        }
        kafkaSink().close();
        LOG().info("KafkaStreamProcessor stopped");
    }

    public void org$apache$gearpump$streaming$examples$kafka$KafkaStreamProcessor$$reportThroughput() {
        long currentTimeMillis = System.currentTimeMillis();
        LOG().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Task ", "; Throughput: ", " (messages, second)"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.taskContext.taskId(), new Tuple2.mcJJ.sp(count() - lastCount(), (currentTimeMillis - lastTime()) / 1000)})));
        lastCount_$eq(count());
        lastTime_$eq(currentTimeMillis);
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public KafkaStreamProcessor(TaskContext taskContext, UserConfig userConfig) {
        super(taskContext, userConfig);
        this.taskContext = taskContext;
        this.kafkaConfig = (KafkaConfig) userConfig.getValue(KafkaConfig$.MODULE$.NAME(), system()).get();
        this.topic = kafkaConfig().getProducerTopic();
        this.producerConfig = KafkaUtil$.MODULE$.buildProducerConfig(kafkaConfig());
        this.kafkaSink = new KafkaSink(producerConfig());
        this.count = 0L;
        this.lastCount = 0L;
        this.lastTime = System.currentTimeMillis();
        this.scheduler = null;
    }
}
