package org.deeplearning4j.iterativereduce.actor.multilayer;

import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.Props;
import akka.cluster.Cluster;
import akka.contrib.pattern.DistributedPubSubMediator;
import akka.japi.Creator;
import java.io.DataOutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.math3.random.MersenneTwister;
import org.deeplearning4j.berkeley.Pair;
import org.deeplearning4j.iterativereduce.actor.core.UpdateMessage;
import org.deeplearning4j.iterativereduce.actor.core.actor.DoneReaper;
import org.deeplearning4j.iterativereduce.actor.core.api.EpochDoneListener;
import org.deeplearning4j.iterativereduce.akka.DeepLearningAccumulator;
import org.deeplearning4j.nn.BaseMultiLayerNetwork;
import org.deeplearning4j.scaleout.conf.Conf;
import org.deeplearning4j.scaleout.iterativereduce.multi.UpdateableImpl;
import org.jblas.DoubleMatrix;

/* loaded from: input_file:org/deeplearning4j/iterativereduce/actor/multilayer/MasterActor.class */
public class MasterActor extends org.deeplearning4j.iterativereduce.actor.core.actor.MasterActor<UpdateableImpl> {

    /* loaded from: input_file:org/deeplearning4j/iterativereduce/actor/multilayer/MasterActor$MasterActorFactory.class */
    public static class MasterActorFactory implements Creator<MasterActor> {
        private Conf conf;
        private ActorRef batchActor;
        private static final long serialVersionUID = 1932205634961409897L;

        public MasterActorFactory(Conf conf, ActorRef actorRef) {
            this.conf = conf;
            this.batchActor = actorRef;
        }

        /* renamed from: create, reason: merged with bridge method [inline-methods] */
        public MasterActor m9create() throws Exception {
            return new MasterActor(this.conf, this.batchActor);
        }
    }

    public MasterActor(Conf conf, ActorRef actorRef) {
        super(conf, actorRef);
        this.mediator.tell(new DistributedPubSubMediator.Publish(DoneReaper.REAPER, getSelf()), this.mediator);
    }

    public static Props propsFor(Conf conf, ActorRef actorRef) {
        return Props.create(new MasterActorFactory(conf, actorRef));
    }

    @Override // org.deeplearning4j.iterativereduce.actor.core.actor.MasterActor
    public UpdateableImpl compute(Collection<UpdateableImpl> collection, Collection<UpdateableImpl> collection2) {
        DeepLearningAccumulator deepLearningAccumulator = new DeepLearningAccumulator();
        Iterator<UpdateableImpl> it = collection.iterator();
        while (it.hasNext()) {
            deepLearningAccumulator.accumulate(it.next().get());
        }
        this.masterResults.set(deepLearningAccumulator.averaged());
        return this.masterResults;
    }

    @Override // org.deeplearning4j.iterativereduce.actor.core.actor.MasterActor
    public void setup(Conf conf) {
        this.masterResults = new UpdateableImpl(new BaseMultiLayerNetwork.Builder().numberOfInputs(conf.getnIn()).numberOfOutPuts(conf.getnOut()).withClazz(conf.getMultiLayerClazz()).hiddenLayerSizes(conf.getLayerSizes()).withRng(new MersenneTwister(conf.getSeed())).build());
        Conf copy = conf.copy();
        Address selfAddress = Cluster.get(context().system()).selfAddress();
        this.log.info("Starting worker");
        ActorRef startWorker = ActorNetworkRunner.startWorker(selfAddress, copy);
        this.mediator.tell(new DistributedPubSubMediator.Publish(MASTER, Integer.valueOf(conf.getPretrainEpochs())), this.mediator);
        this.mediator.tell(new DistributedPubSubMediator.Publish(DoneReaper.REAPER, startWorker), this.mediator);
    }

    @Override // org.deeplearning4j.iterativereduce.actor.core.actor.MasterActor
    public void onReceive(Object obj) throws Exception {
        if (obj instanceof DistributedPubSubMediator.SubscribeAck) {
            this.log.info("Subscribed " + ((DistributedPubSubMediator.SubscribeAck) obj).toString());
            return;
        }
        if (obj instanceof EpochDoneListener) {
            this.listener = (EpochDoneListener) obj;
            this.log.info("Set listener");
            return;
        }
        if (obj instanceof UpdateableImpl) {
            UpdateableImpl updateableImpl = (UpdateableImpl) obj;
            this.updates.add(updateableImpl);
            this.log.info("Num updates so far " + this.updates.size() + " and partition size is " + this.partition);
            if (this.updates.size() >= this.partition) {
                this.masterResults = compute(this.updates, this.updates);
                if (this.listener != null) {
                    this.listener.epochComplete(this.masterResults);
                }
                this.epochsComplete++;
                this.batchActor.tell(updateableImpl, getSelf());
                this.updates.clear();
                return;
            }
            return;
        }
        if (obj instanceof UpdateMessage) {
            this.mediator.tell(new DistributedPubSubMediator.Publish(BROADCAST, obj), getSelf());
            return;
        }
        if (!(obj instanceof List) && !(obj instanceof Pair)) {
            unhandled(obj);
            return;
        }
        if (obj instanceof List) {
            List<Pair<DoubleMatrix, DoubleMatrix>> list = (List) obj;
            splitListIntoRows(list);
            sendToWorkers(list);
        } else if (obj instanceof Pair) {
            Pair pair = (Pair) obj;
            List rowsAsList = ((DoubleMatrix) pair.getFirst()).rowsAsList();
            List rowsAsList2 = ((DoubleMatrix) pair.getSecond()).rowsAsList();
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < rowsAsList.size(); i++) {
                arrayList.add(new Pair<>(rowsAsList.get(i), rowsAsList2.get(i)));
            }
            sendToWorkers(arrayList);
        }
    }

    @Override // org.deeplearning4j.iterativereduce.actor.core.actor.MasterActor
    public void complete(DataOutputStream dataOutputStream) {
        this.masterResults.get().write(dataOutputStream);
    }
}
