package replication;

import channels.Abort;
import channels.Abort$;
import channels.Connection;
import channels.LatentConnection;
import channels.MessageBuffer;
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec;
import de.rmgk.delay;
import rdts.base.Lattice$;
import rdts.base.LocalUid;
import rdts.base.Uid;
import rdts.time.Dots;
import rdts.time.Dots$;
import replication.ProtocolMessage;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Some$;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.Set;
import scala.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.ScalaRunTime$;
import scala.util.Failure;
import scala.util.Success;

/* compiled from: DeltaDissemination.scala */
/* loaded from: input_file:replication/DeltaDissemination.class */
public class DeltaDissemination<State> {
    private final LocalUid replicaId;
    private final Function1<State, BoxedUnit> receiveCallback;
    private final boolean immediateForward;
    private final Abort globalAbort = new Abort(Abort$.MODULE$.$lessinit$greater$default$1());
    private List<Connection<ProtocolMessage<State>>> connections = package$.MODULE$.Nil();
    private final Object lock = new Object() { // from class: replication.DeltaDissemination$$anon$1
    };
    private List<ProtocolMessage.Payload<State>> pastPayloads = package$.MODULE$.Nil();
    private Map<Uid, Dots> contexts = Predef$.MODULE$.Map().empty();

    public static <T> LatentConnection<ProtocolMessage<T>> jsoniterMessages(LatentConnection<MessageBuffer> latentConnection, JsonValueCodec<T> jsonValueCodec) {
        return DeltaDissemination$.MODULE$.jsoniterMessages(latentConnection, jsonValueCodec);
    }

    public DeltaDissemination(LocalUid localUid, Function1<State, BoxedUnit> function1, Option<Aead> option, boolean z) {
        this.replicaId = localUid;
        this.receiveCallback = function1;
        this.immediateForward = z;
    }

    public LocalUid replicaId() {
        return this.replicaId;
    }

    public final LocalUid given_LocalUid() {
        return replicaId();
    }

    public Abort globalAbort() {
        return this.globalAbort;
    }

    public List<Connection<ProtocolMessage<State>>> connections() {
        return this.connections;
    }

    public void connections_$eq(List<Connection<ProtocolMessage<State>>> list) {
        this.connections = list;
    }

    public delay.Callback<Object> debugCallbackAndRemoveCon(Connection<ProtocolMessage<State>> connection) {
        return r6 -> {
            if (r6 instanceof Success) {
                ((Success) r6).value();
                return;
            }
            if (!(r6 instanceof Failure)) {
                throw new MatchError(r6);
            }
            Throwable exception = ((Failure) r6).exception();
            ?? lock = lock();
            synchronized (lock) {
                connections_$eq(connections().filter(connection2 -> {
                    return connection2 != null ? !connection2.equals(connection) : connection != null;
                }));
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            }
            exception.printStackTrace();
        };
    }

    public void requestData() {
        connections().foreach(connection -> {
            ((Function1) connection.send(ProtocolMessage$Request$.MODULE$.apply(replicaId().uid(), selfContext())).handleInCtx().apply(BoxedUnit.UNIT)).apply(debugCallbackAndRemoveCon(connection));
        });
    }

    public void pingAll() {
        connections().foreach(connection -> {
            ((Function1) connection.send(ProtocolMessage$Ping$.MODULE$.apply(System.nanoTime())).handleInCtx().apply(given_LocalUid())).apply(debugCallbackAndRemoveCon(connection));
        });
    }

    public void addLatentConnection(LatentConnection<MessageBuffer> latentConnection, JsonValueCodec<State> jsonValueCodec) {
        addLatentConnection(DeltaDissemination$.MODULE$.jsoniterMessages(latentConnection, jsonValueCodec));
    }

    public void addLatentConnection(LatentConnection<ProtocolMessage<State>> latentConnection) {
        ((Function1) latentConnection.prepare(connection -> {
            return r6 -> {
                if (r6 instanceof Success) {
                    handleMessage((ProtocolMessage) ((Success) r6).value(), connection);
                } else {
                    if (!(r6 instanceof Failure)) {
                        throw new MatchError(r6);
                    }
                    ((Failure) r6).exception().printStackTrace();
                }
            };
        }).handleInCtx().apply(globalAbort())).apply(r6 -> {
            if (!(r6 instanceof Success)) {
                if (!(r6 instanceof Failure)) {
                    throw new MatchError(r6);
                }
                Throwable exception = ((Failure) r6).exception();
                Predef$.MODULE$.println("exception during connection activation");
                exception.printStackTrace();
                return;
            }
            Connection<ProtocolMessage<State>> connection2 = (Connection) ((Success) r6).value();
            ?? lock = lock();
            synchronized (lock) {
                connections_$eq(connections().$colon$colon(connection2));
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            }
            ((Function1) connection2.send(ProtocolMessage$Request$.MODULE$.apply(replicaId().uid(), selfContext())).handleInCtx().apply(BoxedUnit.UNIT)).apply(debugCallbackAndRemoveCon(connection2));
        });
    }

    public Object lock() {
        return this.lock;
    }

    public List<ProtocolMessage.Payload<State>> allPayloads() {
        return this.pastPayloads;
    }

    private void rememberPayload(ProtocolMessage.Payload<State> payload) {
        this.pastPayloads = this.pastPayloads.$colon$colon(payload);
    }

    public Dots selfContext() {
        return (Dots) this.contexts.getOrElse(replicaId().uid(), DeltaDissemination::selfContext$$anonfun$1);
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable, java.lang.Object] */
    public void applyDelta(State state) {
        ProtocolMessage.Payload<State> apply;
        ?? lock = lock();
        synchronized (lock) {
            apply = ProtocolMessage$Payload$.MODULE$.apply(replicaId().uid(), Dots$.MODULE$.single(selfContext().nextDot(replicaId().uid())), (Dots) state);
            updateContext(replicaId().uid(), apply.dots());
            rememberPayload(apply);
        }
        disseminate(apply, disseminate$default$2());
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable, java.lang.Object] */
    public void updateContext(Uid uid, Dots dots) {
        ?? lock = lock();
        synchronized (lock) {
            this.contexts = this.contexts.updatedWith(uid, option -> {
                return (Option) Lattice$.MODULE$.optionLattice(Dots$.MODULE$.contextLattice()).merge(option, Some$.MODULE$.apply(dots));
            });
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
    }

    /* JADX WARN: Type inference failed for: r0v27, types: [java.lang.Throwable, java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v49, types: [java.lang.Throwable, java.lang.Object] */
    public void handleMessage(ProtocolMessage<State> protocolMessage, Connection<ProtocolMessage<State>> connection) {
        List<ProtocolMessage.Payload<State>> list;
        if (protocolMessage instanceof ProtocolMessage.Ping) {
            ((Function1) connection.send(ProtocolMessage$Pong$.MODULE$.apply(ProtocolMessage$Ping$.MODULE$.unapply((ProtocolMessage.Ping) protocolMessage)._1())).handleInCtx().apply(given_LocalUid())).apply(debugCallbackAndRemoveCon(connection));
            return;
        }
        if (protocolMessage instanceof ProtocolMessage.Pong) {
            Predef$.MODULE$.println("ping took " + (Predef$.MODULE$.long2Long(System.nanoTime() - ProtocolMessage$Pong$.MODULE$.unapply((ProtocolMessage.Pong) protocolMessage)._1()).doubleValue() / 1000000) + "ms");
            return;
        }
        if (protocolMessage instanceof ProtocolMessage.Request) {
            ProtocolMessage.Request unapply = ProtocolMessage$Request$.MODULE$.unapply((ProtocolMessage.Request) protocolMessage);
            Uid _1 = unapply._1();
            Dots _2 = unapply._2();
            ?? lock = lock();
            synchronized (lock) {
                list = this.pastPayloads;
            }
            list.filterNot(payload -> {
                return payload.dots().lessThanEquals(_2);
            }).foreach(payload2 -> {
                ((Function1) connection.send(payload2.addSender(replicaId().uid())).handleInCtx().apply(BoxedUnit.UNIT)).apply(debugCallbackAndRemoveCon(connection));
            });
            updateContext(_1, (Dots) Dots$.MODULE$.contextLattice().merge(selfContext(), _2));
            return;
        }
        if (!(protocolMessage instanceof ProtocolMessage.Payload)) {
            throw new MatchError(protocolMessage);
        }
        ProtocolMessage.Payload<State> payload3 = (ProtocolMessage.Payload) protocolMessage;
        ProtocolMessage.Payload unapply2 = ProtocolMessage$Payload$.MODULE$.unapply(payload3);
        Set<Uid> _12 = unapply2._1();
        Dots _22 = unapply2._2();
        Object _3 = unapply2._3();
        if (_22.lessThanEquals(selfContext())) {
            return;
        }
        ?? lock2 = lock();
        synchronized (lock2) {
            _12.foreach(uid -> {
                updateContext(uid, _22);
            });
            updateContext(replicaId().uid(), _22);
            rememberPayload(payload3);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        this.receiveCallback.apply(_3);
        if (this.immediateForward) {
            disseminate(payload3, (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Connection[]{connection})));
        }
    }

    public void disseminate(ProtocolMessage.Payload<State> payload, Set<Connection<ProtocolMessage<State>>> set) {
        connections().filterNot(connection -> {
            return set.contains(connection);
        }).foreach(connection2 -> {
            ((Function1) connection2.send(payload).handleInCtx().apply(BoxedUnit.UNIT)).apply(debugCallbackAndRemoveCon(connection2));
        });
    }

    public Set<Connection<ProtocolMessage<State>>> disseminate$default$2() {
        return Predef$.MODULE$.Set().empty();
    }

    private static final Dots selfContext$$anonfun$1() {
        return Dots$.MODULE$.empty();
    }
}
