package org.neo4j.gds.traversal;

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.neo4j.gds.Algorithm;
import org.neo4j.gds.api.Graph;
import org.neo4j.gds.core.concurrency.ExecutorServiceUtil;
import org.neo4j.gds.core.concurrency.RunWithConcurrency;
import org.neo4j.gds.core.utils.TerminationFlag;
import org.neo4j.gds.core.utils.progress.tasks.ProgressTracker;
import org.neo4j.gds.core.utils.queue.QueueBasedSpliterator;
import org.neo4j.gds.ml.core.EmbeddingUtils;
import org.neo4j.gds.ml.core.samplers.RandomWalkSampler;

/* loaded from: input_file:org/neo4j/gds/traversal/RandomWalk.class */
public final class RandomWalk extends Algorithm<Stream<long[]>> {
    private static final long[] TOMBSTONE = new long[0];
    private final int concurrency;
    private final ExecutorService executorService;
    private final RandomWalkTaskSupplier taskSupplier;
    private final ExternalTerminationFlag externalTerminationFlag;
    private final BlockingQueue<long[]> walks;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/neo4j/gds/traversal/RandomWalk$ExternalTerminationFlag.class */
    public static final class ExternalTerminationFlag implements TerminationFlag {
        private volatile boolean running = true;
        private final Algorithm<?> algo;

        ExternalTerminationFlag(Algorithm<?> algorithm) {
            this.algo = algorithm;
        }

        public boolean running() {
            return this.running && this.algo.getTerminationFlag().running();
        }

        void stop() {
            this.running = false;
        }
    }

    public static RandomWalk create(Graph graph, int i, WalkParameters walkParameters, List<Long> list, int i2, Optional<Long> optional, ProgressTracker progressTracker, ExecutorService executorService) {
        if (graph.hasRelationshipProperty()) {
            EmbeddingUtils.validateRelationshipWeightPropertyValue(graph, i, d -> {
                return d >= 0.0d;
            }, "RandomWalk only supports non-negative weights.", executorService);
        }
        return new RandomWalk(graph, i, executorService, walkParameters, list, i2, optional, progressTracker);
    }

    private RandomWalk(Graph graph, int i, ExecutorService executorService, WalkParameters walkParameters, List<Long> list, int i2, Optional<Long> optional, ProgressTracker progressTracker) {
        super(progressTracker);
        this.concurrency = i;
        this.executorService = executorService;
        this.walks = new ArrayBlockingQueue(i2);
        this.externalTerminationFlag = new ExternalTerminationFlag(this);
        long longValue = optional.orElseGet(() -> {
            return Long.valueOf(new Random().nextLong());
        }).longValue();
        RandomWalkSampler.CumulativeWeightSupplier cumulativeWeights = RandomWalkCompanion.cumulativeWeights(graph, i, executorService, progressTracker);
        NextNodeSupplier nextNodeSupplier = RandomWalkCompanion.nextNodeSupplier(graph, list);
        Objects.requireNonNull(graph);
        this.taskSupplier = new RandomWalkTaskSupplier(graph::concurrentCopy, nextNodeSupplier, cumulativeWeights, this.walks, walkParameters, longValue, progressTracker, this.externalTerminationFlag);
    }

    /* renamed from: compute, reason: merged with bridge method [inline-methods] */
    public Stream<long[]> m137compute() {
        this.progressTracker.beginSubTask("RandomWalk");
        startWalkers(() -> {
            this.progressTracker.endSubTask("RandomWalk");
        });
        return streamWalks(this.walks);
    }

    private void startWalkers(Runnable runnable) {
        List list = (List) IntStream.range(0, this.concurrency).mapToObj(i -> {
            return this.taskSupplier.get();
        }).collect(Collectors.toList());
        CompletableFuture.runAsync(() -> {
            runTasks(list);
        }, ExecutorServiceUtil.DEFAULT_SINGLE_THREAD_POOL).whenComplete((r3, th) -> {
            runnable.run();
        });
    }

    private void runTasks(Iterable<? extends Runnable> iterable) {
        this.progressTracker.beginSubTask("create walks");
        RunWithConcurrency.builder().executor(this.executorService).concurrency(this.concurrency).tasks(iterable).terminationFlag(this.externalTerminationFlag).mayInterruptIfRunning(true).run();
        this.progressTracker.endSubTask("create walks");
        boolean z = false;
        while (!z) {
            try {
                if (!this.externalTerminationFlag.running()) {
                    break;
                } else {
                    z = this.walks.offer(TOMBSTONE, 100L, TimeUnit.MILLISECONDS);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }
        }
    }

    private Stream<long[]> streamWalks(BlockingQueue<long[]> blockingQueue) {
        Stream stream = StreamSupport.stream(new QueueBasedSpliterator(blockingQueue, TOMBSTONE, this.externalTerminationFlag, 100), false);
        ExternalTerminationFlag externalTerminationFlag = this.externalTerminationFlag;
        Objects.requireNonNull(externalTerminationFlag);
        return (Stream) stream.onClose(externalTerminationFlag::stop);
    }
}
