package org.deeplearning4j.iterativereduce.actor.core.actor;

import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.contrib.pattern.DistributedPubSubExtension;
import akka.contrib.pattern.DistributedPubSubMediator;
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
import akka.japi.Creator;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.SerializationUtils;
import org.deeplearning4j.datasets.iterator.DataSetIterator;
import org.deeplearning4j.iterativereduce.actor.core.FinetuneMessage;
import org.deeplearning4j.iterativereduce.actor.core.ResetMessage;
import org.deeplearning4j.scaleout.iterativereduce.multi.UpdateableImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/deeplearning4j/iterativereduce/actor/core/actor/BatchActor.class */
public class BatchActor extends UntypedActor {
    private DataSetIterator iter;
    private final ActorRef mediator = DistributedPubSubExtension.get(getContext().system()).mediator();
    private int numTimesReset;
    private static Logger log = LoggerFactory.getLogger(BatchActor.class);
    private int maxReset;
    private ScheduledExecutorService iterChecker;
    public static final String FINETUNE = "finetune";

    /* loaded from: input_file:org/deeplearning4j/iterativereduce/actor/core/actor/BatchActor$BatchActorFactory.class */
    public static class BatchActorFactory implements Creator<BatchActor> {
        private static final long serialVersionUID = -2260113511909990862L;
        private DataSetIterator iter;
        private int maxReset = 1;

        public BatchActorFactory(DataSetIterator dataSetIterator, int i) {
            if (dataSetIterator == null) {
                throw new IllegalArgumentException("Iter can't be null");
            }
            this.iter = dataSetIterator;
        }

        /* renamed from: create, reason: merged with bridge method [inline-methods] */
        public BatchActor m1create() throws Exception {
            return new BatchActor(this.iter, this.maxReset);
        }
    }

    public BatchActor(DataSetIterator dataSetIterator, int i) {
        this.maxReset = 1;
        this.iter = dataSetIterator;
        this.maxReset = i;
        this.mediator.tell(new DistributedPubSubMediator.Subscribe(org.deeplearning4j.iterativereduce.actor.multilayer.MasterActor.SHUTDOWN, getSelf()), getSelf());
        this.mediator.tell(new DistributedPubSubMediator.Subscribe(FINETUNE, getSelf()), getSelf());
        this.mediator.tell(new DistributedPubSubMediator.Publish(DoneReaper.REAPER, getSelf()), this.mediator);
        this.iterChecker = Executors.newScheduledThreadPool(1);
        this.iterChecker.scheduleAtFixedRate(new Runnable() { // from class: org.deeplearning4j.iterativereduce.actor.core.actor.BatchActor.1
            @Override // java.lang.Runnable
            public void run() {
                if (BatchActor.this.maxReset == BatchActor.this.numTimesReset) {
                    BatchActor.log.info("Shutting down via batch actor and max resets");
                    try {
                        BatchActor.this.iterChecker.awaitTermination(60L, TimeUnit.SECONDS);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }, 10L, 60L, TimeUnit.SECONDS);
    }

    public void onReceive(Object obj) throws Exception {
        if (obj instanceof DistributedPubSubMediator.SubscribeAck) {
            log.info("Susbcribed");
        } else if (obj instanceof ResetMessage) {
            this.iter.reset();
            this.numTimesReset++;
        } else if (obj instanceof FinetuneMessage) {
            UpdateableImpl updateable = ((FinetuneMessage) obj).getUpdateable();
            final UpdateableImpl clone = SerializationUtils.clone(updateable);
            Futures.future(new Callable<UpdateableImpl>() { // from class: org.deeplearning4j.iterativereduce.actor.core.actor.BatchActor.2
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public UpdateableImpl call() throws Exception {
                    BatchActor.this.mediator.tell(new DistributedPubSubMediator.Publish(ModelSavingActor.SAVE, clone), BatchActor.this.mediator);
                    return clone;
                }
            }, context().dispatcher()).onComplete(new OnComplete<UpdateableImpl>() { // from class: org.deeplearning4j.iterativereduce.actor.core.actor.BatchActor.3
                public void onComplete(Throwable th, UpdateableImpl updateableImpl) throws Throwable {
                    if (th != null) {
                        throw th;
                    }
                }
            }, context().dispatcher());
            this.mediator.tell(new DistributedPubSubMediator.Publish(org.deeplearning4j.iterativereduce.actor.multilayer.MasterActor.BROADCAST, updateable), this.mediator);
            try {
                Thread.sleep(15000L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            if (this.iter.hasNext()) {
                this.mediator.tell(new DistributedPubSubMediator.Publish(org.deeplearning4j.iterativereduce.actor.multilayer.MasterActor.MASTER, this.iter.next()), this.mediator);
            }
        } else if (this.iter.hasNext()) {
            this.mediator.tell(new DistributedPubSubMediator.Publish(org.deeplearning4j.iterativereduce.actor.multilayer.MasterActor.MASTER, this.iter.next()), this.mediator);
        } else {
            unhandled(obj);
        }
        this.mediator.tell(new DistributedPubSubMediator.Publish(DoneReaper.REAPER, this.iter), this.mediator);
    }

    public DataSetIterator getIter() {
        return this.iter;
    }

    public int getNumTimesReset() {
        return this.numTimesReset;
    }
}
