package io.gearpump.streaming.state.api;

import akka.actor.package$;
import io.gearpump.Message;
import io.gearpump.cluster.UserConfig;
import io.gearpump.streaming.state.impl.CheckpointManager;
import io.gearpump.streaming.state.impl.PersistentStateConfig$;
import io.gearpump.streaming.task.ReportCheckpointClock;
import io.gearpump.streaming.task.StartTime;
import io.gearpump.streaming.task.Task;
import io.gearpump.streaming.task.TaskContext;
import io.gearpump.streaming.transaction.api.CheckpointStore;
import io.gearpump.streaming.transaction.api.CheckpointStoreFactory;
import scala.Option;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: PersistentTask.scala */
@ScalaSignature(bytes = "\u0006\u0001\u00055b!B\u0001\u0003\u0003\u0003i!A\u0004)feNL7\u000f^3oiR\u000b7o\u001b\u0006\u0003\u0007\u0011\t1!\u00199j\u0015\t)a!A\u0003ti\u0006$XM\u0003\u0002\b\u0011\u0005I1\u000f\u001e:fC6Lgn\u001a\u0006\u0003\u0013)\t\u0001bZ3beB,X\u000e\u001d\u0006\u0002\u0017\u0005\u0011\u0011n\\\u0002\u0001+\tq\u0011f\u0005\u0002\u0001\u001fA\u0011\u0001cE\u0007\u0002#)\u0011!CB\u0001\u0005i\u0006\u001c8.\u0003\u0002\u0015#\t!A+Y:l\u0011!1\u0002A!A!\u0002\u00139\u0012a\u0003;bg.\u001cuN\u001c;fqR\u0004\"\u0001\u0005\r\n\u0005e\t\"a\u0003+bg.\u001cuN\u001c;fqRD\u0001b\u0007\u0001\u0003\u0002\u0003\u0006I\u0001H\u0001\u0005G>tg\r\u0005\u0002\u001eA5\taD\u0003\u0002 \u0011\u000591\r\\;ti\u0016\u0014\u0018BA\u0011\u001f\u0005))6/\u001a:D_:4\u0017n\u001a\u0005\u0006G\u0001!\t\u0001J\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0007\u0015*d\u0007E\u0002'\u0001\u001dj\u0011A\u0001\t\u0003Q%b\u0001\u0001B\u0003+\u0001\t\u00071FA\u0001U#\ta#\u0007\u0005\u0002.a5\taFC\u00010\u0003\u0015\u00198-\u00197b\u0013\t\tdFA\u0004O_RD\u0017N\\4\u0011\u00055\u001a\u0014B\u0001\u001b/\u0005\r\te.\u001f\u0005\u0006-\t\u0002\ra\u0006\u0005\u00067\t\u0002\r\u0001\b\u0005\bq\u0001\u0011\r\u0011\"\u0001:\u0003Y\u0019\u0007.Z2la>Lg\u000e^*u_J,g)Y2u_JLX#\u0001\u001e\u0011\u0005mzT\"\u0001\u001f\u000b\u0005\ri$B\u0001 \u0007\u0003-!(/\u00198tC\u000e$\u0018n\u001c8\n\u0005\u0001c$AF\"iK\u000e\\\u0007o\\5oiN#xN]3GC\u000e$xN]=\t\r\t\u0003\u0001\u0015!\u0003;\u0003]\u0019\u0007.Z2la>Lg\u000e^*u_J,g)Y2u_JL\b\u0005C\u0004E\u0001\t\u0007I\u0011A#\u0002\u001f\rDWmY6q_&tGo\u0015;pe\u0016,\u0012A\u0012\t\u0003w\u001dK!\u0001\u0013\u001f\u0003\u001f\rCWmY6q_&tGo\u0015;pe\u0016DaA\u0013\u0001!\u0002\u00131\u0015\u0001E2iK\u000e\\\u0007o\\5oiN#xN]3!\u0011\u001da\u0005A1A\u0005\u00025\u000b!c\u00195fG.\u0004x.\u001b8u\u0013:$XM\u001d<bYV\ta\n\u0005\u0002.\u001f&\u0011\u0001K\f\u0002\u0005\u0019>tw\r\u0003\u0004S\u0001\u0001\u0006IAT\u0001\u0014G\",7m\u001b9pS:$\u0018J\u001c;feZ\fG\u000e\t\u0005\b)\u0002\u0011\r\u0011\"\u0001V\u0003E\u0019\u0007.Z2la>Lg\u000e^'b]\u0006<WM]\u000b\u0002-B\u0011qKW\u0007\u00021*\u0011\u0011\fB\u0001\u0005S6\u0004H.\u0003\u0002\\1\n\t2\t[3dWB|\u0017N\u001c;NC:\fw-\u001a:\t\ru\u0003\u0001\u0015!\u0003W\u0003I\u0019\u0007.Z2la>Lg\u000e^'b]\u0006<WM\u001d\u0011\t\u000b}\u0003a\u0011\u00011\u0002\u001fA,'o]5ti\u0016tGo\u0015;bi\u0016,\u0012!\u0019\t\u0004M\t<\u0013BA2\u0003\u0005=\u0001VM]:jgR,g\u000e^*uCR,\u0007\"B3\u0001\r\u00031\u0017A\u00049s_\u000e,7o]'fgN\fw-\u001a\u000b\u0004O*\\\u0007CA\u0017i\u0013\tIgF\u0001\u0003V]&$\b\"B\u0003e\u0001\u0004\t\u0007\"\u00027e\u0001\u0004i\u0017aB7fgN\fw-\u001a\t\u0003]>l\u0011\u0001C\u0005\u0003a\"\u0011q!T3tg\u0006<W\rC\u0004\u0006\u0001\t\u0007I\u0011\u00011\t\rM\u0004\u0001\u0015!\u0003b\u0003\u0019\u0019H/\u0019;fA!)Q\u000f\u0001C#m\u00069qN\\*uCJ$HCA4x\u0011\u0015AH\u000f1\u0001z\u0003%\u0019H/\u0019:u)&lW\r\u0005\u0002\u0011u&\u001110\u0005\u0002\n'R\f'\u000f\u001e+j[\u0016DQ! \u0001\u0005Fy\faa\u001c8OKb$HCA4��\u0011\u0015aG\u00101\u0001n\u0011\u001d\t\u0019\u0001\u0001C#\u0003\u000b\taa\u001c8Ti>\u0004H#A4\t\u000f\u0005%\u0001\u0001\"\u0003\u0002\f\u0005)\"/\u001a9peR\u001c\u0005.Z2la>Lg\u000e^\"m_\u000e\\GcA4\u0002\u000e!A\u0011qBA\u0004\u0001\u0004\t\t\"A\u0005uS6,7\u000f^1naB!\u00111CA\u0014\u001d\u0011\t)\"a\t\u000f\t\u0005]\u0011\u0011\u0005\b\u0005\u00033\ty\"\u0004\u0002\u0002\u001c)\u0019\u0011Q\u0004\u0007\u0002\rq\u0012xn\u001c;?\u0013\u0005Y\u0011BA\u0005\u000b\u0013\r\t)\u0003C\u0001\ba\u0006\u001c7.Y4f\u0013\u0011\tI#a\u000b\u0003\u0013QKW.Z*uC6\u0004(bAA\u0013\u0011\u0001")
/* loaded from: input_file:io/gearpump/streaming/state/api/PersistentTask.class */
public abstract class PersistentTask<T> extends Task {
    private final TaskContext taskContext;
    private final CheckpointStoreFactory checkpointStoreFactory;
    private final CheckpointStore checkpointStore;
    private final long checkpointInterval;
    private final CheckpointManager checkpointManager;
    private final PersistentState<T> state;

    public CheckpointStoreFactory checkpointStoreFactory() {
        return this.checkpointStoreFactory;
    }

    public CheckpointStore checkpointStore() {
        return this.checkpointStore;
    }

    public long checkpointInterval() {
        return this.checkpointInterval;
    }

    public CheckpointManager checkpointManager() {
        return this.checkpointManager;
    }

    public abstract PersistentState<T> persistentState();

    public abstract void processMessage(PersistentState<T> persistentState, Message message);

    public PersistentState<T> state() {
        return this.state;
    }

    @Override // io.gearpump.streaming.task.Task, io.gearpump.streaming.task.TaskInterface
    public final void onStart(StartTime startTime) {
        long startTime2 = startTime.startTime();
        Option<byte[]> recover = checkpointManager().recover(startTime2);
        new PersistentTask$$anonfun$onStart$1(this, startTime2);
        if (!recover.isEmpty()) {
            state().recover(startTime2, (byte[]) recover.get());
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        state().setNextCheckpointTime(checkpointManager().getCheckpointTime());
        reportCheckpointClock(startTime2);
    }

    @Override // io.gearpump.streaming.task.Task, io.gearpump.streaming.task.TaskInterface
    public final void onNext(Message message) {
        long checkpointTime = checkpointManager().getCheckpointTime();
        processMessage(state(), message);
        checkpointManager().update(message.timestamp());
        if (checkpointManager().shouldCheckpoint(this.taskContext.upstreamMinClock())) {
            checkpointManager().checkpoint(checkpointTime, state().checkpoint());
            reportCheckpointClock(checkpointTime);
            state().setNextCheckpointTime(checkpointManager().updateCheckpointTime());
        }
    }

    @Override // io.gearpump.streaming.task.Task, io.gearpump.streaming.task.TaskInterface
    public final void onStop() {
        checkpointManager().close();
    }

    private void reportCheckpointClock(long j) {
        package$.MODULE$.actorRef2Scala(this.taskContext.appMaster()).$bang(new ReportCheckpointClock(this.taskContext.taskId(), j), self());
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public PersistentTask(TaskContext taskContext, UserConfig userConfig) {
        super(taskContext, userConfig);
        this.taskContext = taskContext;
        this.checkpointStoreFactory = (CheckpointStoreFactory) userConfig.getValue(PersistentStateConfig$.MODULE$.STATE_CHECKPOINT_STORE_FACTORY(), system()).get();
        this.checkpointStore = checkpointStoreFactory().getCheckpointStore(userConfig, taskContext);
        this.checkpointInterval = BoxesRunTime.unboxToLong(userConfig.getLong(PersistentStateConfig$.MODULE$.STATE_CHECKPOINT_INTERVAL_MS()).get());
        this.checkpointManager = new CheckpointManager(checkpointInterval(), checkpointStore());
        this.state = persistentState();
    }
}
