package org.deeplearning4j.iterativereduce.actor.multilayer;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.AddressFromURIString;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.cluster.Cluster;
import akka.contrib.pattern.ClusterClient;
import akka.contrib.pattern.ClusterSingletonManager;
import akka.contrib.pattern.DistributedPubSubExtension;
import akka.contrib.pattern.DistributedPubSubMediator;
import akka.routing.RoundRobinPool;
import java.io.Serializable;
import java.net.URI;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.deeplearning4j.datasets.DataSet;
import org.deeplearning4j.datasets.iterator.DataSetIterator;
import org.deeplearning4j.iterativereduce.actor.core.ClusterListener;
import org.deeplearning4j.iterativereduce.actor.core.ModelSaver;
import org.deeplearning4j.iterativereduce.actor.core.actor.BatchActor;
import org.deeplearning4j.iterativereduce.actor.core.actor.ModelSavingActor;
import org.deeplearning4j.iterativereduce.actor.util.ActorRefUtils;
import org.deeplearning4j.iterativereduce.tracker.statetracker.StateTracker;
import org.deeplearning4j.iterativereduce.tracker.statetracker.hazelcast.HazelCastStateTracker;
import org.deeplearning4j.nn.BaseMultiLayerNetwork;
import org.deeplearning4j.scaleout.conf.Conf;
import org.deeplearning4j.scaleout.conf.DeepLearningConfigurable;
import org.deeplearning4j.scaleout.iterativereduce.multi.UpdateableImpl;
import org.jblas.DoubleMatrix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;

/* loaded from: input_file:org/deeplearning4j/iterativereduce/actor/multilayer/ActorNetworkRunner.class */
public class ActorNetworkRunner implements DeepLearningConfigurable, Serializable {
    private static final long serialVersionUID = -4385335922485305364L;
    private transient ActorSystem system;
    private Integer epochs;
    private ActorRef mediator;
    private BaseMultiLayerNetwork startingNetwork;
    private static Logger log = LoggerFactory.getLogger(ActorNetworkRunner.class);
    private static String systemName = "ClusterSystem";
    private String type;
    private Address masterAddress;
    private DataSetIterator iter;
    protected ActorRef masterActor;
    protected ModelSaver modelSaver;
    private transient ScheduledExecutorService exec;
    private transient StateTracker<UpdateableImpl> stateTracker;
    private Conf conf;
    private boolean finetune;
    private int stateTrackerPort;

    public ActorNetworkRunner(String str, DataSetIterator dataSetIterator, BaseMultiLayerNetwork baseMultiLayerNetwork) {
        this.type = "master";
        this.finetune = false;
        this.stateTrackerPort = -1;
        this.type = str;
        this.iter = dataSetIterator;
        this.startingNetwork = baseMultiLayerNetwork;
    }

    public ActorNetworkRunner(String str, DataSetIterator dataSetIterator) {
        this(str, dataSetIterator, null);
    }

    public ActorNetworkRunner(DataSetIterator dataSetIterator) {
        this("master", dataSetIterator, null);
    }

    public ActorNetworkRunner(DataSetIterator dataSetIterator, BaseMultiLayerNetwork baseMultiLayerNetwork) {
        this("master", dataSetIterator, baseMultiLayerNetwork);
    }

    public ActorNetworkRunner(String str, String str2) {
        this.type = "master";
        this.finetune = false;
        this.stateTrackerPort = -1;
        this.type = str;
        URI create = URI.create(str2);
        this.masterAddress = Address.apply(create.getScheme(), create.getUserInfo(), create.getHost(), create.getPort());
    }

    public ActorNetworkRunner() {
        this.type = "master";
        this.finetune = false;
        this.stateTrackerPort = -1;
    }

    public Address startBackend(Address address, String str, Conf conf, DataSetIterator dataSetIterator, StateTracker<UpdateableImpl> stateTracker) {
        ActorRefUtils.addShutDownForSystem(this.system);
        this.system.actorOf(Props.create(ClusterListener.class, new Object[0]));
        ActorRef actorOf = this.system.actorOf(Props.create(BatchActor.class, new Object[]{dataSetIterator, stateTracker, conf}), "batch");
        log.info("Started batch actor");
        Props create = this.startingNetwork != null ? Props.create(MasterActor.class, new Object[]{conf, actorOf, this.startingNetwork, stateTracker}) : Props.create(MasterActor.class, new Object[]{conf, actorOf, stateTracker});
        Address selfAddress = address == null ? Cluster.get(this.system).selfAddress() : address;
        conf.setMasterUrl(selfAddress.toString());
        if (this.exec == null) {
            this.exec = Executors.newScheduledThreadPool(2);
        }
        Cluster.get(this.system).join(selfAddress);
        this.exec.schedule(new Runnable() { // from class: org.deeplearning4j.iterativereduce.actor.multilayer.ActorNetworkRunner.1
            @Override // java.lang.Runnable
            public void run() {
                Cluster.get(ActorNetworkRunner.this.system).publishCurrentClusterState();
            }
        }, 10L, TimeUnit.SECONDS);
        this.masterActor = this.system.actorOf(ClusterSingletonManager.defaultProps(create, "master", PoisonPill.getInstance(), "master"));
        log.info("Started master with address " + selfAddress.toString());
        conf.setMasterAbsPath(ActorRefUtils.absPath(this.masterActor, this.system));
        log.info("Set master abs path " + conf.getMasterAbsPath());
        return selfAddress;
    }

    public void finetune() {
        this.finetune = true;
        if (this.startingNetwork == null) {
            throw new IllegalStateException("No network to finetune!");
        }
    }

    public void setup(Conf conf) {
        this.system = ActorSystem.create(systemName);
        ActorRefUtils.addShutDownForSystem(this.system);
        this.mediator = DistributedPubSubExtension.get(this.system).mediator();
        this.epochs = Integer.valueOf(conf.getPretrainEpochs());
        if (!this.type.equals("master")) {
            Address parse = AddressFromURIString.parse(conf.getMasterUrl());
            Conf copy = conf.copy();
            Cluster.get(this.system).join(parse);
            try {
                String str = (String) parse.host().get();
                if (str == null) {
                    throw new IllegalArgumentException("No host set for worker");
                }
                int i = this.stateTrackerPort < 1 ? HazelCastStateTracker.DEFAULT_HAZELCAST_PORT : this.stateTrackerPort;
                this.stateTracker = new HazelCastStateTracker(str + ":" + i, "worker", i);
                startWorker(copy);
                this.system.scheduler().schedule(Duration.create(1L, TimeUnit.MINUTES), Duration.create(1L, TimeUnit.MINUTES), new Runnable() { // from class: org.deeplearning4j.iterativereduce.actor.multilayer.ActorNetworkRunner.3
                    @Override // java.lang.Runnable
                    public void run() {
                        ActorNetworkRunner.log.info("Current cluster members " + Cluster.get(ActorNetworkRunner.this.system).readView().members());
                    }
                }, this.system.dispatcher());
                log.info("Setup worker nodes");
            } catch (Exception e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        } else {
            if (this.iter == null) {
                throw new IllegalStateException("Unable to initialize no dataset to train");
            }
            log.info("Starting master");
            try {
                if (this.stateTrackerPort > 0) {
                    this.stateTracker = new HazelCastStateTracker(this.stateTrackerPort);
                } else {
                    this.stateTracker = new HazelCastStateTracker();
                }
                if (this.finetune) {
                    this.stateTracker.moveToFinetune();
                }
                this.masterAddress = startBackend(null, "master", conf, this.iter, this.stateTracker);
                Thread.sleep(60000L);
                if (this.startingNetwork != null) {
                    this.startingNetwork.setShouldBackProp(conf.isUseBackProp());
                    this.startingNetwork.setUseAdaGrad(conf.isUseAdaGrad());
                    this.startingNetwork.setUseRegularization(conf.isUseRegularization());
                }
                log.info("Starting model saver");
                if (this.modelSaver == null) {
                    this.system.actorOf(Props.create(ModelSavingActor.class, new Object[]{"model-saver"}));
                } else {
                    this.system.actorOf(Props.create(ModelSavingActor.class, new Object[]{this.modelSaver}));
                }
                conf.setMasterUrl(getMasterAddress().toString());
                conf.setMasterAbsPath(ActorRefUtils.absPath(this.masterActor, this.system));
                ActorRefUtils.registerConfWithZooKeeper(conf, this.system);
                this.system.scheduler().schedule(Duration.create(1L, TimeUnit.MINUTES), Duration.create(1L, TimeUnit.MINUTES), new Runnable() { // from class: org.deeplearning4j.iterativereduce.actor.multilayer.ActorNetworkRunner.2
                    @Override // java.lang.Runnable
                    public void run() {
                        if (ActorNetworkRunner.this.system.isTerminated()) {
                            return;
                        }
                        try {
                            ActorNetworkRunner.log.info("Current cluster members " + Cluster.get(ActorNetworkRunner.this.system).readView().members());
                        } catch (Exception e2) {
                            ActorNetworkRunner.log.warn("Tried reading cluster members during shutdown");
                        }
                    }
                }, this.system.dispatcher());
                log.info("Setup master with epochs " + this.epochs);
            } catch (Exception e2) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e2);
            }
        }
        this.conf = conf;
    }

    public void startWorker(Conf conf) {
        Address parse = AddressFromURIString.parse(conf.getMasterUrl());
        this.system.actorOf(Props.create(ClusterListener.class, new Object[0]));
        log.info("Attempting to join node " + parse);
        log.info("Starting workers");
        HashSet hashSet = new HashSet();
        hashSet.add(this.system.actorSelection(parse + "/user/"));
        RoundRobinPool roundRobinPool = new RoundRobinPool(Runtime.getRuntime().availableProcessors());
        ActorRef actorOf = this.system.actorOf(ClusterClient.defaultProps(hashSet), "clusterClient");
        try {
            log.info("Connecting hazelcast to host " + ((String) parse.host().get()));
            int numWorkers = this.stateTracker.numWorkers();
            if (numWorkers <= 1) {
                throw new IllegalStateException("Did not properly connect to cluster");
            }
            log.info("Joining cluster of size " + numWorkers);
            this.system.actorOf(roundRobinPool.props(WorkerActor.propsFor(actorOf, conf, this.stateTracker)), "worker");
            Cluster.get(this.system).join(parse);
            log.info("Worker joining cluster");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void train(List<DataSet> list) {
        log.info("Publishing to results for training");
        try {
            log.info("Waiting for cluster to go up...");
            Thread.sleep(30000L);
            log.info("Done waiting");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        log.info("Started pipeline");
        this.mediator.tell(new DistributedPubSubMediator.Publish(MasterActor.MASTER, list), this.mediator);
        log.info("Published results");
        while (!this.stateTracker.isDone()) {
            log.info("State tracker not done...blocking");
            try {
                Thread.sleep(15000L);
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
            }
        }
        shutdown();
    }

    public void train() {
        int numWorkers = this.stateTracker.numWorkers() * this.conf.getSplit();
        if (this.iter.hasNext()) {
            train(this.iter.next(numWorkers));
        } else {
            log.warn("No data found");
        }
    }

    public void train(DataSet dataSet) {
        train(dataSet.asList());
    }

    public void train(DoubleMatrix doubleMatrix, DoubleMatrix doubleMatrix2) {
        train(new DataSet(doubleMatrix, doubleMatrix2));
    }

    public Address getMasterAddress() {
        return this.masterAddress;
    }

    public StateTracker<UpdateableImpl> getStateTracker() {
        return this.stateTracker;
    }

    public void setStateTracker(StateTracker<UpdateableImpl> stateTracker) {
        this.stateTracker = stateTracker;
    }

    public void shutdown() {
        try {
            this.system.shutdown();
        } catch (Exception e) {
        }
        try {
            if (this.stateTracker != null) {
                this.stateTracker.shutdown();
            }
        } catch (Exception e2) {
        }
    }

    public ModelSaver getModelSaver() {
        return this.modelSaver;
    }

    public void setModelSaver(ModelSaver modelSaver) {
        this.modelSaver = modelSaver;
    }

    public int getStateTrackerPort() {
        return this.stateTrackerPort;
    }

    public void setStateTrackerPort(int i) {
        this.stateTrackerPort = i;
    }
}
