package org.apache.gearpump.streaming.examples.fsio;

import akka.actor.Cancellable;
import java.util.concurrent.TimeUnit;
import org.apache.gearpump.Message;
import org.apache.gearpump.cluster.UserConfig;
import org.apache.gearpump.streaming.task.StartTime;
import org.apache.gearpump.streaming.task.Task;
import org.apache.gearpump.streaming.task.TaskContext;
import org.apache.gearpump.util.HadoopConfig$;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import scala.Predef$;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.mutable.StringBuilder;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: SeqFileStreamProcessor.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u001df\u0001B\u0001\u0003\u0001=\u0011acU3r\r&dWm\u0015;sK\u0006l\u0007K]8dKN\u001cxN\u001d\u0006\u0003\u0007\u0011\tAAZ:j_*\u0011QAB\u0001\tKb\fW\u000e\u001d7fg*\u0011q\u0001C\u0001\ngR\u0014X-Y7j]\u001eT!!\u0003\u0006\u0002\u0011\u001d,\u0017M\u001d9v[BT!a\u0003\u0007\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005i\u0011aA8sO\u000e\u00011C\u0001\u0001\u0011!\t\tB#D\u0001\u0013\u0015\t\u0019b!\u0001\u0003uCN\\\u0017BA\u000b\u0013\u0005\u0011!\u0016m]6\t\u0011]\u0001!\u0011!Q\u0001\na\t1\u0002^1tW\u000e{g\u000e^3yiB\u0011\u0011#G\u0005\u00035I\u00111\u0002V1tW\u000e{g\u000e^3yi\"AA\u0004\u0001B\u0001B\u0003%Q$\u0001\u0004d_:4\u0017n\u001a\t\u0003=\u0005j\u0011a\b\u0006\u0003A!\tqa\u00197vgR,'/\u0003\u0002#?\tQQk]3s\u0007>tg-[4\t\u000b\u0011\u0002A\u0011A\u0013\u0002\rqJg.\u001b;?)\r1\u0003&\u000b\t\u0003O\u0001i\u0011A\u0001\u0005\u0006/\r\u0002\r\u0001\u0007\u0005\u00069\r\u0002\r!\b\u0005\bW\u0001\u0011\r\u0011\"\u0001-\u0003)yW\u000f\u001e9viB\u000bG\u000f[\u000b\u0002[A\u0011afM\u0007\u0002_)\u0011\u0001'M\u0001\u0003MNT!A\r\u0006\u0002\r!\fGm\\8q\u0013\t!tF\u0001\u0003QCRD\u0007B\u0002\u001c\u0001A\u0003%Q&A\u0006pkR\u0004X\u000f\u001e)bi\"\u0004\u0003b\u0002\u001d\u0001\u0001\u0004%\t!O\u0001\u0007oJLG/\u001a:\u0016\u0003i\u0002\"aO!\u000f\u0005qzT\"A\u001f\u000b\u0005y\n\u0014AA5p\u0013\t\u0001U(\u0001\u0007TKF,XM\\2f\r&dW-\u0003\u0002C\u0007\n1qK]5uKJT!\u0001Q\u001f\t\u000f\u0015\u0003\u0001\u0019!C\u0001\r\u0006QqO]5uKJ|F%Z9\u0015\u0005\u001dk\u0005C\u0001%L\u001b\u0005I%\"\u0001&\u0002\u000bM\u001c\u0017\r\\1\n\u00051K%\u0001B+oSRDqA\u0014#\u0002\u0002\u0003\u0007!(A\u0002yIEBa\u0001\u0015\u0001!B\u0013Q\u0014aB<sSR,'\u000f\t\u0005\b%\u0002\u0011\r\u0011\"\u0001T\u0003%!X\r\u001f;DY\u0006\u001c8/F\u0001Ua\t)v\fE\u0002W7vk\u0011a\u0016\u0006\u00031f\u000bA\u0001\\1oO*\t!,\u0001\u0003kCZ\f\u0017B\u0001/X\u0005\u0015\u0019E.Y:t!\tqv\f\u0004\u0001\u0005\u0013\u0001\f\u0017\u0011!A\u0001\u0006\u0003\u0019'AA 1\u0011\u0019\u0011\u0007\u0001)A\u0005)\u0006QA/\u001a=u\u00072\f7o\u001d\u0011\u0012\u0005\u0011<\u0007C\u0001%f\u0013\t1\u0017JA\u0004O_RD\u0017N\\4\u0011\u0005qB\u0017BA5>\u0005\u0011!V\r\u001f;\t\u000f-\u0004!\u0019!C\u0001Y\u0006\u00191.Z=\u0016\u0003\u001dDaA\u001c\u0001!\u0002\u00139\u0017\u0001B6fs\u0002Bq\u0001\u001d\u0001C\u0002\u0013\u0005A.A\u0003wC2,X\r\u0003\u0004s\u0001\u0001\u0006IaZ\u0001\u0007m\u0006dW/\u001a\u0011\t\u000fQ\u0004!\u0019!C\u0001k\u0006Q\u0001.\u00193p_B\u001cuN\u001c4\u0016\u0003Y\u0004\"a\u001e>\u000e\u0003aT!!_\u0019\u0002\t\r|gNZ\u0005\u0003wb\u0014QbQ8oM&<WO]1uS>t\u0007BB?\u0001A\u0003%a/A\u0006iC\u0012|w\u000e]\"p]\u001a\u0004\u0003\u0002C@\u0001\u0001\u0004%I!!\u0001\u0002\u00115\u001cxmQ8v]R,\"!a\u0001\u0011\u0007!\u000b)!C\u0002\u0002\b%\u0013A\u0001T8oO\"I\u00111\u0002\u0001A\u0002\u0013%\u0011QB\u0001\r[N<7i\\;oi~#S-\u001d\u000b\u0004\u000f\u0006=\u0001\"\u0003(\u0002\n\u0005\u0005\t\u0019AA\u0002\u0011!\t\u0019\u0002\u0001Q!\n\u0005\r\u0011!C7tO\u000e{WO\u001c;!\u0011%\t9\u0002\u0001a\u0001\n\u0013\t\t!A\bt]\u0006\u00048\u000b[8u\u0017Z\u001bu.\u001e8u\u0011%\tY\u0002\u0001a\u0001\n\u0013\ti\"A\nt]\u0006\u00048\u000b[8u\u0017Z\u001bu.\u001e8u?\u0012*\u0017\u000fF\u0002H\u0003?A\u0011BTA\r\u0003\u0003\u0005\r!a\u0001\t\u0011\u0005\r\u0002\u0001)Q\u0005\u0003\u0007\t\u0001c\u001d8baNCw\u000e^&W\u0007>,h\u000e\u001e\u0011\t\u0013\u0005\u001d\u0002\u00011A\u0005\n\u0005\u0005\u0011\u0001D:oCB\u001c\u0006n\u001c;US6,\u0007\"CA\u0016\u0001\u0001\u0007I\u0011BA\u0017\u0003A\u0019h.\u00199TQ>$H+[7f?\u0012*\u0017\u000fF\u0002H\u0003_A\u0011BTA\u0015\u0003\u0003\u0005\r!a\u0001\t\u0011\u0005M\u0002\u0001)Q\u0005\u0003\u0007\tQb\u001d8baNCw\u000e\u001e+j[\u0016\u0004\u0003\"CA\u001c\u0001\u0001\u0007I\u0011BA\u001d\u0003%\u00198\r[3ek2,'/\u0006\u0002\u0002<A!\u0011QHA$\u001b\t\tyD\u0003\u0003\u0002B\u0005\r\u0013!B1di>\u0014(BAA#\u0003\u0011\t7n[1\n\t\u0005%\u0013q\b\u0002\f\u0007\u0006t7-\u001a7mC\ndW\rC\u0005\u0002N\u0001\u0001\r\u0011\"\u0003\u0002P\u0005i1o\u00195fIVdWM]0%KF$2aRA)\u0011%q\u00151JA\u0001\u0002\u0004\tY\u0004\u0003\u0005\u0002V\u0001\u0001\u000b\u0015BA\u001e\u0003)\u00198\r[3ek2,'\u000f\t\u0005\b\u00033\u0002A\u0011IA.\u0003\u001dygn\u0015;beR$2aRA/\u0011!\ty&a\u0016A\u0002\u0005\u0005\u0014!C:uCJ$H+[7f!\r\t\u00121M\u0005\u0004\u0003K\u0012\"!C*uCJ$H+[7f\u0011\u001d\tI\u0007\u0001C!\u0003W\naa\u001c8OKb$HcA$\u0002n!A\u0011qNA4\u0001\u0004\t\t(A\u0002ng\u001e\u0004B!a\u001d\u0002v5\t\u0001\"C\u0002\u0002x!\u0011q!T3tg\u0006<W\rC\u0004\u0002|\u0001!\t%! \u0002\r=t7\u000b^8q)\u00059\u0005bBAA\u0001\u0011\u0005\u0011QP\u0001\re\u0016\u0004xN\u001d;Ti\u0006$Xo]\u0004\b\u0003\u000b\u0013\u0001\u0012AAD\u0003Y\u0019V-\u001d$jY\u0016\u001cFO]3b[B\u0013xnY3tg>\u0014\bcA\u0014\u0002\n\u001a1\u0011A\u0001E\u0001\u0003\u0017\u001bB!!#\u0002\u000eB\u0019\u0001*a$\n\u0007\u0005E\u0015J\u0001\u0004B]f\u0014VM\u001a\u0005\bI\u0005%E\u0011AAK)\t\t9\t\u0003\u0006\u0002\u001a\u0006%%\u0019!C\u0001\u00037\u000b1bT+U!V#v\fU!U\u0011V\u0011\u0011Q\u0014\t\u0004-\u0006}\u0015bAAQ/\n11\u000b\u001e:j]\u001eD\u0011\"!*\u0002\n\u0002\u0006I!!(\u0002\u0019=+F\u000bU+U?B\u000bE\u000b\u0013\u0011")
/* loaded from: input_file:org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.class */
public class SeqFileStreamProcessor extends Task {
    private final TaskContext taskContext;
    private final Path outputPath;
    private SequenceFile.Writer writer;
    private final Class<? extends Text> textClass;
    private final Text key;
    private final Text value;
    private final Configuration hadoopConf;
    private long msgCount;
    private long snapShotKVCount;
    private long snapShotTime;
    private Cancellable scheduler;

    public static String OUTPUT_PATH() {
        return SeqFileStreamProcessor$.MODULE$.OUTPUT_PATH();
    }

    public Path outputPath() {
        return this.outputPath;
    }

    public SequenceFile.Writer writer() {
        return this.writer;
    }

    public void writer_$eq(SequenceFile.Writer writer) {
        this.writer = writer;
    }

    public Class<? extends Text> textClass() {
        return this.textClass;
    }

    public Text key() {
        return this.key;
    }

    public Text value() {
        return this.value;
    }

    public Configuration hadoopConf() {
        return this.hadoopConf;
    }

    private long msgCount() {
        return this.msgCount;
    }

    private void msgCount_$eq(long j) {
        this.msgCount = j;
    }

    private long snapShotKVCount() {
        return this.snapShotKVCount;
    }

    private void snapShotKVCount_$eq(long j) {
        this.snapShotKVCount = j;
    }

    private long snapShotTime() {
        return this.snapShotTime;
    }

    private void snapShotTime_$eq(long j) {
        this.snapShotTime = j;
    }

    private Cancellable scheduler() {
        return this.scheduler;
    }

    private void scheduler_$eq(Cancellable cancellable) {
        this.scheduler = cancellable;
    }

    public void onStart(StartTime startTime) {
        FileSystem.get(hadoopConf()).deleteOnExit(outputPath());
        writer_$eq(SequenceFile.createWriter(hadoopConf(), new SequenceFile.Writer.Option[]{SequenceFile.Writer.file(outputPath()), SequenceFile.Writer.keyClass(textClass()), SequenceFile.Writer.valueClass(textClass())}));
        scheduler_$eq(this.taskContext.schedule(new FiniteDuration(5L, TimeUnit.SECONDS), new FiniteDuration(5L, TimeUnit.SECONDS), new SeqFileStreamProcessor$$anonfun$onStart$1(this)));
        snapShotTime_$eq(System.currentTimeMillis());
        LOG().info("sequence file bolt initiated");
    }

    public void onNext(Message message) {
        String[] split = ((String) message.msg()).split("\\+\\+");
        if (split.length >= 2) {
            key().set(split[0]);
            value().set(split[1]);
            writer().append(key(), value());
        }
        msgCount_$eq(msgCount() + 1);
    }

    public void onStop() {
        if (scheduler() == null) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxesRunTime.boxToBoolean(scheduler().cancel());
        }
        writer().close();
        LOG().info("sequence file bolt stopped");
    }

    public void reportStatus() {
        long currentTimeMillis = System.currentTimeMillis();
        LOG().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Task ", " Throughput: ", " (KVPairs, second)"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.taskContext.taskId(), new Tuple2.mcJJ.sp(msgCount() - snapShotKVCount(), (currentTimeMillis - snapShotTime()) / 1000)})));
        snapShotKVCount_$eq(msgCount());
        snapShotTime_$eq(currentTimeMillis);
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public SeqFileStreamProcessor(TaskContext taskContext, UserConfig userConfig) {
        super(taskContext, userConfig);
        this.taskContext = taskContext;
        this.outputPath = new Path(new StringBuilder().append((String) userConfig.getString(SeqFileStreamProcessor$.MODULE$.OUTPUT_PATH()).get()).append(System.getProperty("file.separator")).append(taskContext.taskId()).toString());
        this.writer = null;
        this.textClass = new Text().getClass();
        this.key = new Text();
        this.value = new Text();
        this.hadoopConf = HadoopConfig$.MODULE$.userConfigToHadoopConfig(userConfig).hadoopConf();
        this.msgCount = 0L;
        this.snapShotKVCount = 0L;
        this.snapShotTime = 0L;
        this.scheduler = null;
    }
}
