package com.googlecode.jazure.sdk.job;

import com.google.inject.Inject;
import com.googlecode.jazure.sdk.aggregator.Aggregator;
import com.googlecode.jazure.sdk.aggregator.CompletionStrategy;
import com.googlecode.jazure.sdk.aggregator.CorrelatedTasksAggregatingHandler;
import com.googlecode.jazure.sdk.aggregator.CorrelationStrategy;
import com.googlecode.jazure.sdk.core.ProjectConfiguration;
import com.googlecode.jazure.sdk.endpoint.QueueStorageEndpoint;
import com.googlecode.jazure.sdk.event.EventPublisher;
import com.googlecode.jazure.sdk.job.JobConfig;
import com.googlecode.jazure.sdk.job.exception.JobNotRunningException;
import com.googlecode.jazure.sdk.lifecycle.LifeCycleWrapper;
import com.googlecode.jazure.sdk.lifecycle.LifeCycledRunnable;
import com.googlecode.jazure.sdk.lifecycle.LifeCycles;
import com.googlecode.jazure.sdk.schedule.Scheduler;
import com.googlecode.jazure.sdk.schedule.Schedulers;
import com.googlecode.jazure.sdk.schedule.SimpleRepeatTriggers;
import com.googlecode.jazure.sdk.task.Task;
import com.googlecode.jazure.sdk.task.TaskInvocation;
import com.googlecode.jazure.sdk.task.TaskInvocations;
import com.googlecode.jazure.sdk.task.tracker.TaskInvocationSaveEvent;
import com.googlecode.jazure.sdk.task.tracker.TaskInvocationUpdateEvent;
import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/googlecode/jazure/sdk/job/AbstractJob.class */
public abstract class AbstractJob<T extends JobConfig> implements Job<T> {
    protected final Aggregator<T> aggregator;
    protected final String taskQueue;
    protected final String resultQueue;

    @Inject
    protected EventPublisher eventPublisher;

    @Inject
    protected ProjectConfiguration pc;

    @Inject
    protected QueueStorageEndpoint endpoint;
    protected Scheduler senderScheduler;
    protected Scheduler receiverScheduler;
    protected Scheduler aggregatorScheduler;
    protected CorrelatedTasksAggregatingHandler<T> correlatedTasksAggregatingHandler;
    protected Logger logger = LoggerFactory.getLogger(getClass());
    protected Queue<TaskInvocation> loaded = new ConcurrentLinkedQueue();
    protected Queue<TaskInvocation> received = new ConcurrentLinkedQueue();
    protected LifeCycleWrapper wrapper = LifeCycles.wrapped();

    public AbstractJob(Aggregator<T> aggregator, CorrelationStrategy correlationStrategy, CompletionStrategy completionStrategy, String str, String str2) {
        this.aggregator = aggregator;
        this.correlatedTasksAggregatingHandler = new CorrelatedTasksAggregatingHandler<>(aggregator, correlationStrategy, completionStrategy);
        this.taskQueue = str;
        this.resultQueue = str2;
    }

    protected abstract void startLoader();

    protected abstract void stopLoader();

    @Override // com.googlecode.jazure.sdk.lifecycle.LifeCycle
    public void start() {
        this.wrapper.start(new Runnable() { // from class: com.googlecode.jazure.sdk.job.AbstractJob.1
            @Override // java.lang.Runnable
            public void run() {
                AbstractJob.this.initializeSchedulers();
                AbstractJob.this.startLoader();
                AbstractJob.this.scheduleSend();
                AbstractJob.this.scheduleReceive();
                AbstractJob.this.scheduleAggregate();
                AbstractJob.this.eventPublisher.publishEvent(new JobStartedEvent(AbstractJob.this));
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void initializeSchedulers() {
        this.senderScheduler = Schedulers.newFixRateScheduler(1, threadNamePrefix("CloudSender"));
        this.receiverScheduler = Schedulers.newFixRateScheduler(1, threadNamePrefix("CloudReceiver"));
        this.aggregatorScheduler = Schedulers.newFixRateScheduler(1, threadNamePrefix("Aggregator"));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String threadNamePrefix(String str) {
        return "JAzure - " + str + " - " + getJobConfig().getId();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleSend() {
        this.senderScheduler.schedule(decorate(new Runnable() { // from class: com.googlecode.jazure.sdk.job.AbstractJob.2
            @Override // java.lang.Runnable
            public void run() {
                TaskInvocation poll = AbstractJob.this.loaded.poll();
                if (poll == null) {
                    return;
                }
                AbstractJob.this.sendTask(poll);
            }
        }), SimpleRepeatTriggers.create(this.pc.getBusPollInterval()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleLoaded(Collection<Task> collection) {
        for (TaskInvocation taskInvocation : TaskInvocations.createPending(this.pc, getJobConfig(), collection, this.taskQueue, this.resultQueue)) {
            this.eventPublisher.publishEvent(new TaskInvocationSaveEvent(taskInvocation));
            this.loaded.add(taskInvocation);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendTask(TaskInvocation taskInvocation) {
        this.endpoint.send(taskInvocation);
        this.eventPublisher.publishEvent(new TaskInvocationUpdateEvent(taskInvocation.executing()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleReceive() {
        this.receiverScheduler.schedule(decorate(new Runnable() { // from class: com.googlecode.jazure.sdk.job.AbstractJob.3
            @Override // java.lang.Runnable
            public void run() {
                TaskInvocation receive = AbstractJob.this.endpoint.receive(AbstractJob.this.resultQueue);
                if (receive == null) {
                    return;
                }
                AbstractJob.this.received.add(receive);
            }
        }), SimpleRepeatTriggers.create(this.pc.getBusPollInterval()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleAggregate() {
        this.aggregatorScheduler.schedule(decorate(new Runnable() { // from class: com.googlecode.jazure.sdk.job.AbstractJob.4
            /* JADX WARN: Multi-variable type inference failed */
            @Override // java.lang.Runnable
            public void run() {
                TaskInvocation poll = AbstractJob.this.received.poll();
                if (poll == null) {
                    return;
                }
                AbstractJob.this.eventPublisher.publishEvent(new TaskInvocationUpdateEvent(poll.completed()));
                if (AbstractJob.this.aggregator.aggregateSupported(poll)) {
                    AbstractJob.this.aggregator.aggregate(poll);
                }
                if (AbstractJob.this.aggregator.aggregateCorrelatedSupported(AbstractJob.this.getJobConfig())) {
                    AbstractJob.this.correlatedTasksAggregatingHandler.processCorrelatated(AbstractJob.this.getJobConfig(), poll);
                }
            }
        }), SimpleRepeatTriggers.create(this.pc.getBusPollInterval()));
    }

    @Override // com.googlecode.jazure.sdk.lifecycle.LifeCycle
    public void stop() {
        this.wrapper.stop(new Runnable() { // from class: com.googlecode.jazure.sdk.job.AbstractJob.5
            @Override // java.lang.Runnable
            public void run() {
                AbstractJob.this.stopLoader();
                AbstractJob.this.senderScheduler.shutdown();
                AbstractJob.this.receiverScheduler.shutdown();
                AbstractJob.this.aggregatorScheduler.shutdown();
                AbstractJob.this.destorySchedulers();
                AbstractJob.this.eventPublisher.publishEvent(new JobStoppedEvent(AbstractJob.this));
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void destorySchedulers() {
        this.senderScheduler = null;
        this.receiverScheduler = null;
        this.aggregatorScheduler = null;
    }

    @Override // com.googlecode.jazure.sdk.lifecycle.LifeCycle
    public boolean isRunning() {
        return this.wrapper.isRunning();
    }

    @Override // com.googlecode.jazure.sdk.job.Job
    public Aggregator<T> getAggregator() {
        return this.aggregator;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public LifeCycledRunnable decorate(Runnable runnable) {
        return LifeCycles.run(runnable, this);
    }

    @Override // com.googlecode.jazure.sdk.job.Job
    public Job<T> executeTask(TaskInvocation taskInvocation) throws JobNotRunningException {
        if (!isRunning()) {
            throw new JobNotRunningException("Job [" + getJobConfig().getId() + "] is not running");
        }
        sendTask(taskInvocation);
        return this;
    }
}
