package io.gearpump.streaming.task;

import akka.actor.ActorRef;
import io.gearpump.serializer.SerializerPool;
import io.gearpump.transport.Express;
import io.gearpump.transport.HostPort;
import io.gearpump.transport.netty.TaskMessage;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.immutable.Map;
import scala.collection.mutable.Queue;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: ExpressTransport.scala */
@ScalaSignature(bytes = "\u0006\u0001i4A!\u0001\u0002\u0001\u0017\tI1+\u001a8e\u0019\u0006$XM\u001d\u0006\u0003\u0007\u0011\tA\u0001^1tW*\u0011QAB\u0001\ngR\u0014X-Y7j]\u001eT!a\u0002\u0005\u0002\u0011\u001d,\u0017M\u001d9v[BT\u0011!C\u0001\u0003S>\u001c\u0001a\u0005\u0002\u0001\u0019A\u0011Q\u0002E\u0007\u0002\u001d)\tq\"A\u0003tG\u0006d\u0017-\u0003\u0002\u0012\u001d\t1\u0011I\\=SK\u001aD\u0001b\u0005\u0001\u0003\u0002\u0003\u0006I\u0001F\u0001\bKb\u0004(/Z:t!\t)\u0002$D\u0001\u0017\u0015\t9b!A\u0005ue\u0006t7\u000f]8si&\u0011\u0011D\u0006\u0002\b\u000bb\u0004(/Z:t\u0011!Y\u0002A!A!\u0002\u0013a\u0012AD:fe&\fG.\u001b>feB{w\u000e\u001c\t\u0003;\u0001j\u0011A\b\u0006\u0003?\u0019\t!b]3sS\u0006d\u0017N_3s\u0013\t\tcD\u0001\bTKJL\u0017\r\\5{KJ\u0004vn\u001c7\t\u0011\r\u0002!\u0011!Q\u0001\n\u0011\naa]3oI\u0016\u0014\bCA\u0013+\u001b\u00051#BA\u0014)\u0003\u0015\t7\r^8s\u0015\u0005I\u0013\u0001B1lW\u0006L!a\u000b\u0014\u0003\u0011\u0005\u001bGo\u001c:SK\u001aDQ!\f\u0001\u0005\u00029\na\u0001P5oSRtD\u0003B\u00182eM\u0002\"\u0001\r\u0001\u000e\u0003\tAQa\u0005\u0017A\u0002QAQa\u0007\u0017A\u0002qAQa\t\u0017A\u0002\u0011Bq!\u000e\u0001A\u0002\u0013%a'\u0001\u0004ck\u001a4WM]\u000b\u0002oA!\u0001(P C\u001b\u0005I$B\u0001\u001e<\u0003%IW.\\;uC\ndWM\u0003\u0002=\u001d\u0005Q1m\u001c7mK\u000e$\u0018n\u001c8\n\u0005yJ$aA'baB\u0011Q\u0002Q\u0005\u0003\u0003:\u0011A\u0001T8oOB\u00191I\u0012%\u000e\u0003\u0011S!!R\u001e\u0002\u000f5,H/\u00192mK&\u0011q\t\u0012\u0002\u0006#V,W/\u001a\t\u0003\u00132k\u0011A\u0013\u0006\u0003\u0017Z\tQA\\3uifL!!\u0014&\u0003\u0017Q\u000b7o['fgN\fw-\u001a\u0005\b\u001f\u0002\u0001\r\u0011\"\u0003Q\u0003)\u0011WO\u001a4fe~#S-\u001d\u000b\u0003#R\u0003\"!\u0004*\n\u0005Ms!\u0001B+oSRDq!\u0016(\u0002\u0002\u0003\u0007q'A\u0002yIEBaa\u0016\u0001!B\u00139\u0014a\u00022vM\u001a,'\u000f\t\u0005\u00063\u0002!\tAW\u0001\u000bC\u0012$W*Z:tC\u001e,GcA)\\;\")A\f\u0017a\u0001\u007f\u0005YAO]1ogB|'\u000f^%e\u0011\u0015q\u0006\f1\u0001I\u0003-!\u0018m]6NKN\u001c\u0018mZ3\t\u000b\u0001\u0004A\u0011B1\u0002'M,g\u000e\u001a)f]\u0012LgnZ'fgN\fw-Z:\u0015\u0005E\u0013\u0007\"\u0002/`\u0001\u0004y\u0004\"\u00023\u0001\t\u0003)\u0017\u0001\u00064mkND\u0007+\u001a8eS:<W*Z:tC\u001e,7\u000fF\u0002RM\"DQaZ2A\u0002\u0011\n!\u0002\\8dC2\f5\r^8s\u0011\u0015a6\r1\u0001@\u0011\u0015!\u0007\u0001\"\u0001k)\r\t6\u000e\u001d\u0005\u0006Y&\u0004\r!\\\u0001\u000ee\u0016lw\u000e^3BI\u0012\u0014Xm]:\u0011\u0005Uq\u0017BA8\u0017\u0005!Aun\u001d;Q_J$\b\"\u0002/j\u0001\u0004y\u0004\"\u0002:\u0001\t\u0003\u0019\u0018AE:f]\u0012\fE\u000e\u001c)f]\u0012LgnZ'tON$\u0012!\u0015\u0005\u0006k\u0002!\tA^\u0001\u0013Q\u0006\u001c\b+\u001a8eS:<W*Z:tC\u001e,7/F\u0001x!\ti\u00010\u0003\u0002z\u001d\t9!i\\8mK\u0006t\u0007")
/* loaded from: input_file:io/gearpump/streaming/task/SendLater.class */
public class SendLater {
    private final Express express;
    private final SerializerPool serializerPool;
    private final ActorRef sender;
    private Map<Object, Queue<TaskMessage>> buffer = Predef$.MODULE$.Map().empty();

    private Map<Object, Queue<TaskMessage>> buffer() {
        return this.buffer;
    }

    private void buffer_$eq(Map<Object, Queue<TaskMessage>> map) {
        this.buffer = map;
    }

    public void addMessage(long j, TaskMessage taskMessage) {
        Queue queue = (Queue) buffer().getOrElse(BoxesRunTime.boxToLong(j), new SendLater$$anonfun$1(this));
        queue.enqueue(Predef$.MODULE$.wrapRefArray(new TaskMessage[]{taskMessage}));
        Map<Object, Queue<TaskMessage>> buffer = buffer();
        Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
        Predef$ predef$ = Predef$.MODULE$;
        buffer_$eq(buffer.$plus(new Tuple2(BoxesRunTime.boxToLong(j), queue)));
    }

    public void io$gearpump$streaming$task$SendLater$$sendPendingMessages(long j) {
        Option lookupLocalActor = this.express.lookupLocalActor(j);
        if (lookupLocalActor.isDefined()) {
            flushPendingMessages((ActorRef) lookupLocalActor.get(), j);
        } else {
            flushPendingMessages((HostPort) this.express.lookupRemoteAddress(j).get(), j);
        }
    }

    public void flushPendingMessages(ActorRef actorRef, long j) {
        Queue queue = (Queue) buffer().getOrElse(BoxesRunTime.boxToLong(j), new SendLater$$anonfun$2(this));
        while (queue.nonEmpty()) {
            actorRef.tell(this.serializerPool.get(Thread.currentThread().getId()).deserialize(((TaskMessage) queue.dequeue()).message()), this.sender);
        }
    }

    public void flushPendingMessages(HostPort hostPort, long j) {
        Queue queue = (Queue) buffer().getOrElse(BoxesRunTime.boxToLong(j), new SendLater$$anonfun$3(this));
        while (queue.nonEmpty()) {
            this.express.transport((TaskMessage) queue.dequeue(), hostPort);
        }
    }

    public void sendAllPendingMsgs() {
        buffer().keySet().foreach(new SendLater$$anonfun$sendAllPendingMsgs$1(this));
        buffer_$eq(Predef$.MODULE$.Map().empty());
    }

    public boolean hasPendingMessages() {
        return buffer().nonEmpty();
    }

    public SendLater(Express express, SerializerPool serializerPool, ActorRef actorRef) {
        this.express = express;
        this.serializerPool = serializerPool;
        this.sender = actorRef;
    }
}
