package com.github.dexecutor.ignite;

import com.github.dexecutor.core.DexecutorState;
import com.github.dexecutor.core.ExecutionEngine;
import com.github.dexecutor.core.support.Preconditions;
import com.github.dexecutor.core.task.ExecutionResult;
import com.github.dexecutor.core.task.Task;
import com.github.dexecutor.core.task.TaskExecutionException;
import java.lang.Comparable;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/dexecutor/ignite/IgniteExecutionEngine.class */
public final class IgniteExecutionEngine<T extends Comparable<T>, R> implements ExecutionEngine<T, R> {
    private static final Logger logger = LoggerFactory.getLogger(IgniteExecutionEngine.class);
    private IgniteCompute igniteCompute;
    private BlockingQueue<ExecutionResult<T, R>> completionQueue;
    private final DexecutorState<T, R> dexecutorState;

    public IgniteExecutionEngine(DexecutorState<T, R> dexecutorState, IgniteCompute igniteCompute) {
        this(dexecutorState, igniteCompute, new LinkedBlockingQueue());
    }

    public IgniteExecutionEngine(DexecutorState<T, R> dexecutorState, IgniteCompute igniteCompute, BlockingQueue<Future<ExecutionResult<T, R>>> blockingQueue) {
        Preconditions.checkNotNull(igniteCompute, "Executer Service should not be null");
        Preconditions.checkNotNull(blockingQueue, "BlockingQueue should not be null");
        this.dexecutorState = dexecutorState;
        this.igniteCompute = igniteCompute.withAsync();
        this.completionQueue = new LinkedBlockingQueue();
    }

    public void submit(Task<T, R> task) {
        logger.debug("Received Task {}", task.getId());
        this.igniteCompute.call(new SerializableCallable(task));
        this.igniteCompute.future().listen(newListener());
    }

    private IgniteInClosure<IgniteFuture<Object>> newListener() {
        return new IgniteInClosure<IgniteFuture<Object>>() { // from class: com.github.dexecutor.ignite.IgniteExecutionEngine.1
            private static final long serialVersionUID = 1;

            public void apply(IgniteFuture<Object> igniteFuture) {
                IgniteExecutionEngine.this.completionQueue.add((ExecutionResult) igniteFuture.get());
            }
        };
    }

    public ExecutionResult<T, R> processResult() throws TaskExecutionException {
        try {
            ExecutionResult<T, R> take = this.completionQueue.take();
            if (take.isSuccess()) {
                this.dexecutorState.removeErrored(take);
            } else {
                this.dexecutorState.addErrored(take);
            }
            return take;
        } catch (InterruptedException e) {
            throw new TaskExecutionException("Task interrupted");
        }
    }

    public boolean isDistributed() {
        return true;
    }

    public boolean isAnyTaskInError() {
        return this.dexecutorState.erroredCount() > 0;
    }
}
