package io.gearpump.streaming.state.api;

import akka.actor.package$;
import io.gearpump.Message;
import io.gearpump.cluster.UserConfig;
import io.gearpump.streaming.state.impl.CheckpointManager;
import io.gearpump.streaming.state.impl.PersistentStateConfig$;
import io.gearpump.streaming.task.ReportCheckpointClock;
import io.gearpump.streaming.task.StartTime;
import io.gearpump.streaming.task.Task;
import io.gearpump.streaming.task.TaskContext;
import io.gearpump.streaming.transaction.api.CheckpointStore;
import io.gearpump.streaming.transaction.api.CheckpointStoreFactory;
import java.util.concurrent.TimeUnit;
import scala.Option;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: PersistentTask.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005]t!B\u0001\u0003\u0011\u0003i\u0011A\u0004)feNL7\u000f^3oiR\u000b7o\u001b\u0006\u0003\u0007\u0011\t1!\u00199j\u0015\t)a!A\u0003ti\u0006$XM\u0003\u0002\b\u0011\u0005I1\u000f\u001e:fC6Lgn\u001a\u0006\u0003\u0013)\t\u0001bZ3beB,X\u000e\u001d\u0006\u0002\u0017\u0005\u0011\u0011n\\\u0002\u0001!\tqq\"D\u0001\u0003\r\u0015\u0001\"\u0001#\u0001\u0012\u00059\u0001VM]:jgR,g\u000e\u001e+bg.\u001c\"a\u0004\n\u0011\u0005M1R\"\u0001\u000b\u000b\u0003U\tQa]2bY\u0006L!a\u0006\u000b\u0003\r\u0005s\u0017PU3g\u0011\u0015Ir\u0002\"\u0001\u001b\u0003\u0019a\u0014N\\5u}Q\tQ\u0002C\u0004\u001d\u001f\t\u0007I\u0011A\u000f\u0002\u0015\rCUiQ&Q\u001f&sE+F\u0001\u001f!\ty\u0002%D\u0001\t\u0013\t\t\u0003BA\u0004NKN\u001c\u0018mZ3\t\r\rz\u0001\u0015!\u0003\u001f\u0003-\u0019\u0005*R\"L!>Ke\n\u0016\u0011\t\u000f\u0015z!\u0019!C\u0001M\u0005\u0019AjT$\u0016\u0003\u001d\u0002\"\u0001K\u0017\u000e\u0003%R!AK\u0016\u0002\u000bMdg\r\u000e6\u000b\u00031\n1a\u001c:h\u0013\tq\u0013F\u0001\u0004M_\u001e<WM\u001d\u0005\u0007a=\u0001\u000b\u0011B\u0014\u0002\t1{u\t\t\u0004\u0006!\t\t\tAM\u000b\u0003g1\u001b\"!\r\u001b\u0011\u0005UBT\"\u0001\u001c\u000b\u0005]2\u0011\u0001\u0002;bg.L!!\u000f\u001c\u0003\tQ\u000b7o\u001b\u0005\twE\u0012\t\u0011)A\u0005y\u0005YA/Y:l\u0007>tG/\u001a=u!\t)T(\u0003\u0002?m\tYA+Y:l\u0007>tG/\u001a=u\u0011!\u0001\u0015G!A!\u0002\u0013\t\u0015\u0001B2p]\u001a\u0004\"AQ#\u000e\u0003\rS!\u0001\u0012\u0005\u0002\u000f\rdWo\u001d;fe&\u0011ai\u0011\u0002\u000b+N,'oQ8oM&<\u0007\"B\r2\t\u0003AEcA%V-B\u0019a\"\r&\u0011\u0005-cE\u0002\u0001\u0003\u0006\u001bF\u0012\rA\u0014\u0002\u0002)F\u0011qJ\u0015\t\u0003'AK!!\u0015\u000b\u0003\u000f9{G\u000f[5oOB\u00111cU\u0005\u0003)R\u00111!\u00118z\u0011\u0015Yt\t1\u0001=\u0011\u0015\u0001u\t1\u0001B\u0011\u001dA\u0016G1A\u0005\u0002e\u000bac\u00195fG.\u0004x.\u001b8u'R|'/\u001a$bGR|'/_\u000b\u00025B\u00111lX\u0007\u00029*\u00111!\u0018\u0006\u0003=\u001a\t1\u0002\u001e:b]N\f7\r^5p]&\u0011\u0001\r\u0018\u0002\u0017\u0007\",7m\u001b9pS:$8\u000b^8sK\u001a\u000b7\r^8ss\"1!-\rQ\u0001\ni\u000bqc\u00195fG.\u0004x.\u001b8u'R|'/\u001a$bGR|'/\u001f\u0011\t\u000f\u0011\f$\u0019!C\u0001K\u0006y1\r[3dWB|\u0017N\u001c;Ti>\u0014X-F\u0001g!\tYv-\u0003\u0002i9\ny1\t[3dWB|\u0017N\u001c;Ti>\u0014X\r\u0003\u0004kc\u0001\u0006IAZ\u0001\u0011G\",7m\u001b9pS:$8\u000b^8sK\u0002Bq\u0001\\\u0019C\u0002\u0013\u0005Q.\u0001\ndQ\u0016\u001c7\u000e]8j]RLe\u000e^3sm\u0006dW#\u00018\u0011\u0005My\u0017B\u00019\u0015\u0005\u0011auN\\4\t\rI\f\u0004\u0015!\u0003o\u0003M\u0019\u0007.Z2la>Lg\u000e^%oi\u0016\u0014h/\u00197!\u0011\u001d!\u0018G1A\u0005\u0002U\f\u0011c\u00195fG.\u0004x.\u001b8u\u001b\u0006t\u0017mZ3s+\u00051\bCA<{\u001b\u0005A(BA=\u0005\u0003\u0011IW\u000e\u001d7\n\u0005mD(!E\"iK\u000e\\\u0007o\\5oi6\u000bg.Y4fe\"1Q0\rQ\u0001\nY\f!c\u00195fG.\u0004x.\u001b8u\u001b\u0006t\u0017mZ3sA!9q0\rb\u0001\n\u0013i\u0017!G2iK\u000e\\\u0007o\\5oi\u0006#H/Z7qi&sG/\u001a:wC2Dq!a\u00012A\u0003%a.\u0001\u000edQ\u0016\u001c7\u000e]8j]R\fE\u000f^3naRLe\u000e^3sm\u0006d\u0007\u0005C\u0004\u0002\bE2\t!!\u0003\u0002\u001fA,'o]5ti\u0016tGo\u0015;bi\u0016,\"!a\u0003\u0011\t9\tiAS\u0005\u0004\u0003\u001f\u0011!a\u0004)feNL7\u000f^3oiN#\u0018\r^3\t\u000f\u0005M\u0011G\"\u0001\u0002\u0016\u0005q\u0001O]8dKN\u001cX*Z:tC\u001e,GCBA\f\u0003;\ty\u0002E\u0002\u0014\u00033I1!a\u0007\u0015\u0005\u0011)f.\u001b;\t\u000f\u0015\t\t\u00021\u0001\u0002\f!9\u0011\u0011EA\t\u0001\u0004q\u0012aB7fgN\fw-\u001a\u0005\t\u000bE\u0012\r\u0011\"\u0001\u0002\n!A\u0011qE\u0019!\u0002\u0013\tY!\u0001\u0004ti\u0006$X\r\t\u0005\b\u0003W\tDQIA\u0017\u0003\u001dygn\u0015;beR$B!a\u0006\u00020!A\u0011\u0011GA\u0015\u0001\u0004\t\u0019$A\u0005ti\u0006\u0014H\u000fV5nKB\u0019Q'!\u000e\n\u0007\u0005]bGA\u0005Ti\u0006\u0014H\u000fV5nK\"9\u00111H\u0019\u0005F\u0005u\u0012AB8o\u001d\u0016DH\u000f\u0006\u0003\u0002\u0018\u0005}\u0002bBA\u0011\u0003s\u0001\rA\b\u0005\b\u0003\u0007\nDQIA#\u0003\u0019ygn\u0015;paR\u0011\u0011q\u0003\u0005\b\u0003\u0013\nD\u0011BA&\u0003I\u00198\r[3ek2,7\t[3dWB|\u0017N\u001c;\u0015\t\u0005]\u0011Q\n\u0005\b\u0003\u001f\n9\u00051\u0001o\u0003!Ig\u000e^3sm\u0006d\u0007bBA*c\u0011%\u0011QK\u0001\u0016e\u0016\u0004xN\u001d;DQ\u0016\u001c7\u000e]8j]R\u001cEn\\2l)\u0011\t9\"a\u0016\t\u0011\u0005e\u0013\u0011\u000ba\u0001\u00037\n\u0011\u0002^5nKN$\u0018-\u001c9\u0011\t\u0005u\u0013\u0011\u000f\b\u0005\u0003?\niG\u0004\u0003\u0002b\u0005-d\u0002BA2\u0003Sj!!!\u001a\u000b\u0007\u0005\u001dD\"\u0001\u0004=e>|GOP\u0005\u0002\u0017%\u0011\u0011BC\u0005\u0004\u0003_B\u0011a\u00029bG.\fw-Z\u0005\u0005\u0003g\n)HA\u0005US6,7\u000b^1na*\u0019\u0011q\u000e\u0005")
/* loaded from: input_file:io/gearpump/streaming/state/api/PersistentTask.class */
public abstract class PersistentTask<T> extends Task {
    public final TaskContext io$gearpump$streaming$state$api$PersistentTask$$taskContext;
    private final CheckpointStoreFactory checkpointStoreFactory;
    private final CheckpointStore checkpointStore;
    private final long checkpointInterval;
    private final CheckpointManager checkpointManager;
    private final long checkpointAttemptInterval;
    private final PersistentState<T> state;

    public static Message CHECKPOINT() {
        return PersistentTask$.MODULE$.CHECKPOINT();
    }

    public CheckpointStoreFactory checkpointStoreFactory() {
        return this.checkpointStoreFactory;
    }

    public CheckpointStore checkpointStore() {
        return this.checkpointStore;
    }

    public long checkpointInterval() {
        return this.checkpointInterval;
    }

    public CheckpointManager checkpointManager() {
        return this.checkpointManager;
    }

    private long checkpointAttemptInterval() {
        return this.checkpointAttemptInterval;
    }

    public abstract PersistentState<T> persistentState();

    public abstract void processMessage(PersistentState<T> persistentState, Message message);

    public PersistentState<T> state() {
        return this.state;
    }

    @Override // io.gearpump.streaming.task.Task, io.gearpump.streaming.task.TaskInterface
    public final void onStart(StartTime startTime) {
        long startTime2 = startTime.startTime();
        Option<byte[]> recover = checkpointManager().recover(startTime2);
        new PersistentTask$$anonfun$onStart$1(this, startTime2);
        if (!recover.isEmpty()) {
            state().recover(startTime2, (byte[]) recover.get());
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        io$gearpump$streaming$state$api$PersistentTask$$reportCheckpointClock(startTime2);
        scheduleCheckpoint(checkpointAttemptInterval());
    }

    @Override // io.gearpump.streaming.task.Task, io.gearpump.streaming.task.TaskInterface
    public final void onNext(Message message) {
        Message CHECKPOINT = PersistentTask$.MODULE$.CHECKPOINT();
        if (CHECKPOINT != null ? !CHECKPOINT.equals(message) : message != null) {
            Option<Object> update = checkpointManager().update(message.timestamp());
            new PersistentTask$$anonfun$onNext$2(this);
            if (!update.isEmpty()) {
                state().setNextCheckpointTime(BoxesRunTime.unboxToLong(update.get()));
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            }
            processMessage(state(), message);
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            return;
        }
        if (checkpointManager().shouldCheckpoint(this.io$gearpump$streaming$state$api$PersistentTask$$taskContext.upstreamMinClock())) {
            Option<Object> checkpointTime = checkpointManager().getCheckpointTime();
            PersistentTask$$anonfun$onNext$1 persistentTask$$anonfun$onNext$1 = new PersistentTask$$anonfun$onNext$1(this);
            if (!checkpointTime.isEmpty()) {
                long unboxToLong = BoxesRunTime.unboxToLong(checkpointTime.get());
                byte[] checkpoint = state().checkpoint();
                Option<Object> checkpoint2 = checkpointManager().checkpoint(unboxToLong, checkpoint);
                new PersistentTask$$anonfun$onNext$1$$anonfun$apply$mcVJ$sp$1(persistentTask$$anonfun$onNext$1);
                if (!checkpoint2.isEmpty()) {
                    persistentTask$$anonfun$onNext$1.$outer.state().setNextCheckpointTime(BoxesRunTime.unboxToLong(checkpoint2.get()));
                    BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                }
                this.io$gearpump$streaming$state$api$PersistentTask$$taskContext.output(new Message(checkpoint, unboxToLong));
                io$gearpump$streaming$state$api$PersistentTask$$reportCheckpointClock(unboxToLong);
                BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
            }
        }
        scheduleCheckpoint(checkpointAttemptInterval());
        BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
    }

    @Override // io.gearpump.streaming.task.Task, io.gearpump.streaming.task.TaskInterface
    public final void onStop() {
        checkpointManager().close();
    }

    private void scheduleCheckpoint(long j) {
        this.io$gearpump$streaming$state$api$PersistentTask$$taskContext.scheduleOnce(new FiniteDuration(j, TimeUnit.MILLISECONDS), new PersistentTask$$anonfun$scheduleCheckpoint$1(this));
    }

    public void io$gearpump$streaming$state$api$PersistentTask$$reportCheckpointClock(long j) {
        package$.MODULE$.actorRef2Scala(this.io$gearpump$streaming$state$api$PersistentTask$$taskContext.appMaster()).$bang(new ReportCheckpointClock(this.io$gearpump$streaming$state$api$PersistentTask$$taskContext.taskId(), j), self());
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public PersistentTask(TaskContext taskContext, UserConfig userConfig) {
        super(taskContext, userConfig);
        this.io$gearpump$streaming$state$api$PersistentTask$$taskContext = taskContext;
        this.checkpointStoreFactory = (CheckpointStoreFactory) userConfig.getValue(PersistentStateConfig$.MODULE$.STATE_CHECKPOINT_STORE_FACTORY(), system()).get();
        this.checkpointStore = checkpointStoreFactory().getCheckpointStore(userConfig, taskContext);
        this.checkpointInterval = BoxesRunTime.unboxToLong(userConfig.getLong(PersistentStateConfig$.MODULE$.STATE_CHECKPOINT_INTERVAL_MS()).get());
        this.checkpointManager = new CheckpointManager(checkpointInterval(), checkpointStore());
        this.checkpointAttemptInterval = 1000L;
        this.state = persistentState();
    }
}
