package org.neo4j.internal.batchimport.staging;

import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.neo4j.internal.batchimport.Configuration;
import org.neo4j.internal.batchimport.executor.ProcessorScheduler;
import org.neo4j.internal.batchimport.stats.Keys;
import org.neo4j.internal.batchimport.stats.StatsProvider;
import org.neo4j.io.pagecache.context.CursorContext;
import org.neo4j.io.pagecache.tracing.PageCacheTracer;
import org.neo4j.test.RandomSupport;
import org.neo4j.test.extension.Inject;
import org.neo4j.test.extension.RandomExtension;

@ExtendWith({RandomExtension.class})
/* loaded from: input_file:org/neo4j/internal/batchimport/staging/StageTest.class */
class StageTest {
    private static final int TEST_BATCH_SIZE = 100;

    @Inject
    private RandomSupport random;

    /* loaded from: input_file:org/neo4j/internal/batchimport/staging/StageTest$ChaosMonkey.class */
    private class ChaosMonkey {
        private final int performance;

        private ChaosMonkey() {
            this.performance = StageTest.this.random.nextInt(0, 50);
        }

        void makeChaos() {
            try {
                Thread.sleep(this.performance);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            if (ThreadLocalRandom.current().nextFloat() < 0.01d) {
                throw new RuntimeException("Chaos monkey causing failure");
            }
        }
    }

    /* loaded from: input_file:org/neo4j/internal/batchimport/staging/StageTest$ReceiveOrderAssertingStep.class */
    private static class ReceiveOrderAssertingStep extends ProcessorStep<Object> {
        private final AtomicLong lastTicket;
        private final long processingTime;
        private final boolean endOfLine;

        ReceiveOrderAssertingStep(StageControl stageControl, String str, Configuration configuration, long j, boolean z) {
            super(stageControl, str, configuration, 1, PageCacheTracer.NULL, new StatsProvider[0]);
            this.lastTicket = new AtomicLong();
            this.processingTime = j;
            this.endOfLine = z;
        }

        public long receive(long j, Object obj) {
            Assertions.assertEquals(this.lastTicket.getAndIncrement(), j, "For " + obj + " in " + name());
            return super.receive(j, obj);
        }

        protected void process(Object obj, BatchSender batchSender, CursorContext cursorContext) {
            try {
                Thread.sleep(this.processingTime);
                if (this.endOfLine) {
                    return;
                }
                batchSender.send(obj);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/neo4j/internal/batchimport/staging/StageTest$TestProcessContext.class */
    public static class TestProcessContext implements ProcessContext {
        private final AtomicLong globalCounterAccumulator;
        private final CountDownLatch processedBatches;
        private final AtomicLong batches = new AtomicLong(100);
        long counter;

        TestProcessContext(AtomicLong atomicLong, CountDownLatch countDownLatch) {
            this.globalCounterAccumulator = atomicLong;
            this.processedBatches = countDownLatch;
        }

        public void close() {
            this.globalCounterAccumulator.getAndAdd(this.counter);
            this.processedBatches.countDown();
        }
    }

    StageTest() {
    }

    @Test
    void shouldReceiveBatchesInOrder() {
        Configuration.Overridden overridden = new Configuration.Overridden(Configuration.DEFAULT) { // from class: org.neo4j.internal.batchimport.staging.StageTest.1
            public int batchSize() {
                return 10;
            }
        };
        Stage stage = new Stage("Test stage", (String) null, overridden, 1);
        final long batchSize = 1000 * overridden.batchSize();
        stage.add(new PullingProducerStep<ProcessContext>(stage.control(), overridden) { // from class: org.neo4j.internal.batchimport.staging.StageTest.2
            private final Object theObject = new Object();
            private long i;

            protected Object nextBatchOrNull(long j, int i, ProcessContext processContext) {
                if (this.i >= batchSize) {
                    return null;
                }
                Object[] objArr = new Object[i];
                Arrays.fill(objArr, this.theObject);
                this.i += i;
                return objArr;
            }

            protected long position() {
                return 0L;
            }
        });
        for (int i = 0; i < 3; i++) {
            stage.add(new ReceiveOrderAssertingStep(stage.control(), "Step" + i, overridden, i, false));
        }
        stage.add(new ReceiveOrderAssertingStep(stage.control(), "Final step", overridden, 0L, true));
        StageExecution execute = stage.execute();
        Iterator it = execute.steps().iterator();
        while (it.hasNext()) {
            ((Step) it.next()).processors(1);
        }
        new ExecutionSupervisor(ExecutionMonitor.INVISIBLE).supervise(execute);
        for (Step step : execute.steps()) {
            Assertions.assertEquals(1000L, step.stats().stat(Keys.done_batches).asLong(), "For " + step);
        }
        stage.close();
    }

    @Test
    void processContextResources() throws InterruptedException {
        Configuration.Overridden overridden = new Configuration.Overridden(Configuration.DEFAULT);
        final AtomicLong atomicLong = new AtomicLong();
        final CountDownLatch countDownLatch = new CountDownLatch(1000 + 1);
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        try {
            Stage stage = new Stage("Test stage", (String) null, overridden, 1, (runnable, str) -> {
                newCachedThreadPool.submit(runnable);
            }, StageExecution.DEFAULT_PANIC_MONITOR);
            try {
                stage.add(new PullingProducerStep<TestProcessContext>(stage.control(), overridden) { // from class: org.neo4j.internal.batchimport.staging.StageTest.3
                    /* JADX INFO: Access modifiers changed from: protected */
                    public Object nextBatchOrNull(long j, int i, TestProcessContext testProcessContext) {
                        if (testProcessContext.batches.decrementAndGet() < 0) {
                            return null;
                        }
                        testProcessContext.counter++;
                        Object[] objArr = new Object[i];
                        Arrays.fill(objArr, new Object());
                        return objArr;
                    }

                    /* JADX INFO: Access modifiers changed from: protected */
                    /* renamed from: processContext, reason: merged with bridge method [inline-methods] */
                    public TestProcessContext m14processContext() {
                        return new TestProcessContext(atomicLong, countDownLatch);
                    }

                    protected long position() {
                        return 0L;
                    }
                });
                stage.add(new ProcessorStep<Object>(stage.control(), "consumer", overridden, 1, PageCacheTracer.NULL, new StatsProvider[0]) { // from class: org.neo4j.internal.batchimport.staging.StageTest.4
                    protected void process(Object obj, BatchSender batchSender, CursorContext cursorContext) {
                    }
                });
                StageExecution execute = stage.execute();
                Iterator it = execute.steps().iterator();
                while (it.hasNext()) {
                    ((Step) it.next()).processors(Runtime.getRuntime().availableProcessors());
                }
                for (int i = 0; i < 1000; i++) {
                    Iterator it2 = execute.steps().iterator();
                    while (it2.hasNext()) {
                        ((Step) it2.next()).receive(i, (Object) null);
                    }
                }
                new ExecutionSupervisor(ExecutionMonitor.INVISIBLE).supervise(execute);
                Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.MINUTES));
                Assertions.assertEquals((1000 + 1) * TEST_BATCH_SIZE, atomicLong.get());
                stage.close();
            } finally {
            }
        } finally {
            newCachedThreadPool.shutdown();
        }
    }

    @Test
    void shouldCloseOnPanic() {
        final Configuration configuration = Configuration.DEFAULT;
        TrackingPanicMonitor trackingPanicMonitor = new TrackingPanicMonitor();
        Stage stage = new Stage("test close on panic", null, configuration, this.random.nextBoolean() ? 1 : 0, ProcessorScheduler.SPAWN_THREAD, trackingPanicMonitor) { // from class: org.neo4j.internal.batchimport.staging.StageTest.5
            {
                add(new PullingProducerStep<ProcessContext>(control(), configuration) { // from class: org.neo4j.internal.batchimport.staging.StageTest.5.1
                    private volatile long ticket;
                    private final ChaosMonkey chaosMonkey;

                    {
                        this.chaosMonkey = new ChaosMonkey();
                    }

                    protected Object nextBatchOrNull(long j, int i, ProcessContext processContext) {
                        this.chaosMonkey.makeChaos();
                        this.ticket = j;
                        return new int[i];
                    }

                    protected long position() {
                        return this.ticket;
                    }
                });
                add(new ProcessorStep<Object>(control(), "processor", configuration, 2, PageCacheTracer.NULL, new StatsProvider[0]) { // from class: org.neo4j.internal.batchimport.staging.StageTest.5.2
                    private final ChaosMonkey chaosMonkey;

                    {
                        this.chaosMonkey = new ChaosMonkey();
                    }

                    protected void process(Object obj, BatchSender batchSender, CursorContext cursorContext) {
                        this.chaosMonkey.makeChaos();
                        batchSender.send(obj);
                    }
                });
                add(new ForkedProcessorStep<Object>(control(), "forked processor", configuration, new StatsProvider[0]) { // from class: org.neo4j.internal.batchimport.staging.StageTest.5.3
                    private final ChaosMonkey chaosMonkey;

                    {
                        this.chaosMonkey = new ChaosMonkey();
                    }

                    protected void forkedProcess(int i, int i2, Object obj) {
                        this.chaosMonkey.makeChaos();
                    }
                });
                add(new ProcessorStep<Object>(control(), "consumer", configuration, 1, PageCacheTracer.NULL, new StatsProvider[0]) { // from class: org.neo4j.internal.batchimport.staging.StageTest.5.4
                    private final ChaosMonkey chaosMonkey;

                    {
                        this.chaosMonkey = new ChaosMonkey();
                    }

                    protected void process(Object obj, BatchSender batchSender, CursorContext cursorContext) {
                        this.chaosMonkey.makeChaos();
                    }
                });
            }
        };
        Assertions.assertThrows(RuntimeException.class, () -> {
            ExecutionSupervisors.superviseDynamicExecution(stage);
        });
        Assertions.assertTrue(trackingPanicMonitor.hasReceivedPanic());
        Assertions.assertTrue(trackingPanicMonitor.getReceivedPanic().getMessage().contains("Chaos monkey"));
    }
}
