package org.neo4j.internal.batchimport.staging;

import java.lang.Thread;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.neo4j.internal.batchimport.Configuration;
import org.neo4j.internal.batchimport.executor.ProcessorScheduler;
import org.neo4j.internal.batchimport.stats.Keys;
import org.neo4j.internal.batchimport.stats.StatsProvider;
import org.neo4j.io.pagecache.PageSwapper;
import org.neo4j.io.pagecache.context.CursorContext;
import org.neo4j.io.pagecache.tracing.DefaultPageCacheTracer;
import org.neo4j.io.pagecache.tracing.PageCacheTracer;
import org.neo4j.io.pagecache.tracing.PinEvent;
import org.neo4j.test.rule.OtherThreadRule;

/* loaded from: input_file:org/neo4j/internal/batchimport/staging/ProcessorStepTest.class */
class ProcessorStepTest {
    private final OtherThreadRule t2 = new OtherThreadRule();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/neo4j/internal/batchimport/staging/ProcessorStepTest$BlockingProcessorStep.class */
    public static class BlockingProcessorStep<T> extends ProcessorStep<T> {
        private final CountDownLatch latch;

        BlockingProcessorStep(StageControl stageControl, Configuration configuration, int i, CountDownLatch countDownLatch) {
            super(stageControl, "test", configuration, i, PageCacheTracer.NULL, new StatsProvider[0]);
            this.latch = countDownLatch;
        }

        protected void process(T t, BatchSender batchSender, CursorContext cursorContext) throws Throwable {
            this.latch.await();
        }
    }

    /* loaded from: input_file:org/neo4j/internal/batchimport/staging/ProcessorStepTest$MyProcessorStep.class */
    private static class MyProcessorStep extends ProcessorStep<Integer> {
        private final AtomicInteger nextExpected;

        private MyProcessorStep(StageControl stageControl, int i) {
            this(stageControl, i, PageCacheTracer.NULL);
        }

        private MyProcessorStep(StageControl stageControl, int i, PageCacheTracer pageCacheTracer) {
            super(stageControl, "test", Configuration.DEFAULT, i, pageCacheTracer, new StatsProvider[0]);
            this.nextExpected = new AtomicInteger();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void process(Integer num, BatchSender batchSender, CursorContext cursorContext) {
            PinEvent beginPin = cursorContext.getCursorTracer().beginPin(false, 1L, (PageSwapper) null);
            beginPin.hit();
            beginPin.done();
            this.nextExpected.incrementAndGet();
        }
    }

    ProcessorStepTest() {
    }

    @BeforeEach
    void setUp() {
        this.t2.init("processor-step");
    }

    @AfterEach
    void tearDown() {
        this.t2.close();
    }

    @Test
    void shouldUpholdProcessOrderingGuarantee() throws Exception {
        MyProcessorStep myProcessorStep = new MyProcessorStep(new SimpleStageControl(), 0);
        try {
            myProcessorStep.start(1);
            myProcessorStep.processors(4);
            for (int i = 0; i < 10; i++) {
                myProcessorStep.receive(i, Integer.valueOf(i));
            }
            myProcessorStep.endOfUpstream();
            myProcessorStep.awaitCompleted();
            Assertions.assertEquals(10, myProcessorStep.nextExpected.get());
            myProcessorStep.close();
        } catch (Throwable th) {
            try {
                myProcessorStep.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void tracePageCacheAccessOnProcess() throws Exception {
        SimpleStageControl simpleStageControl = new SimpleStageControl();
        DefaultPageCacheTracer defaultPageCacheTracer = new DefaultPageCacheTracer();
        MyProcessorStep myProcessorStep = new MyProcessorStep(simpleStageControl, 0, defaultPageCacheTracer);
        try {
            myProcessorStep.start(1);
            for (int i = 0; i < 10; i++) {
                myProcessorStep.receive(i, Integer.valueOf(i));
            }
            myProcessorStep.endOfUpstream();
            myProcessorStep.awaitCompleted();
            Assertions.assertEquals(10, myProcessorStep.nextExpected.get());
            myProcessorStep.close();
            org.assertj.core.api.Assertions.assertThat(defaultPageCacheTracer.pins()).isEqualTo(10);
            org.assertj.core.api.Assertions.assertThat(defaultPageCacheTracer.unpins()).isEqualTo(10);
        } catch (Throwable th) {
            try {
                myProcessorStep.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void shouldHaveTaskQueueSizeEqualToMaxNumberOfProcessors() throws Exception {
        SimpleStageControl simpleStageControl = new SimpleStageControl();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        final int i = 5;
        BlockingProcessorStep blockingProcessorStep = new BlockingProcessorStep(simpleStageControl, new Configuration() { // from class: org.neo4j.internal.batchimport.staging.ProcessorStepTest.1
            public int maxNumberOfProcessors() {
                return i;
            }
        }, 2, countDownLatch);
        try {
            blockingProcessorStep.start(1);
            blockingProcessorStep.processors(1);
            for (int i2 = 0; i2 < 2 + 5; i2++) {
                blockingProcessorStep.receive(i2, (Object) null);
            }
            Future execute = this.t2.execute(receive(2, blockingProcessorStep));
            this.t2.get().waitUntilThreadState(new Thread.State[]{Thread.State.TIMED_WAITING});
            countDownLatch.countDown();
            execute.get();
            blockingProcessorStep.close();
        } catch (Throwable th) {
            try {
                blockingProcessorStep.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void shouldRecycleDoneBatches() throws Exception {
        StageControl stageControl = (StageControl) Mockito.mock(StageControl.class);
        Mockito.when(stageControl.scheduler()).thenReturn(ProcessorScheduler.SPAWN_THREAD);
        MyProcessorStep myProcessorStep = new MyProcessorStep(stageControl, 0);
        try {
            myProcessorStep.start(1);
            for (int i = 0; i < 10; i++) {
                myProcessorStep.receive(i, Integer.valueOf(i));
            }
            myProcessorStep.endOfUpstream();
            myProcessorStep.awaitCompleted();
            ((StageControl) Mockito.verify(stageControl, Mockito.times(10))).recycle(ArgumentMatchers.any());
            myProcessorStep.close();
        } catch (Throwable th) {
            try {
                myProcessorStep.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldBeAbleToPropagatePanicOnBlockedProcessorsWhenLast() throws InterruptedException {
        shouldBeAbleToPropagatePanicOnBlockedProcessors(2, 1);
    }

    @Test
    public void shouldBeAbleToPropagatePanicOnBlockedProcessorsWhenNotLast() throws InterruptedException {
        shouldBeAbleToPropagatePanicOnBlockedProcessors(3, 1);
    }

    private void shouldBeAbleToPropagatePanicOnBlockedProcessors(int i, int i2) throws InterruptedException {
        final String str = "Failing just for fun";
        Configuration configuration = Configuration.DEFAULT;
        CountDownLatch countDownLatch = new CountDownLatch(1);
        TrackingPanicMonitor trackingPanicMonitor = new TrackingPanicMonitor();
        Stage stage = new Stage("Test", "Part", configuration, 1, ProcessorScheduler.SPAWN_THREAD, trackingPanicMonitor);
        stage.add(intProducer(configuration, stage, configuration.maxNumberOfProcessors() * 2));
        Step step = null;
        for (int i3 = 0; i3 < i; i3++) {
            if (i2 == i3) {
                step = new BlockingProcessorStep<Integer>(stage.control(), configuration, 1, countDownLatch) { // from class: org.neo4j.internal.batchimport.staging.ProcessorStepTest.2
                    /* JADX INFO: Access modifiers changed from: protected */
                    @Override // org.neo4j.internal.batchimport.staging.ProcessorStepTest.BlockingProcessorStep
                    public void process(Integer num, BatchSender batchSender, CursorContext cursorContext) throws Throwable {
                        super.process((AnonymousClass2) num, batchSender, cursorContext);
                        throw new RuntimeException(str);
                    }
                };
                stage.add(step);
            } else {
                stage.add(intProcessor(configuration, stage));
            }
        }
        try {
            StageExecution execute = stage.execute();
            while (step.stats().stat(Keys.received_batches).asLong() < configuration.maxNumberOfProcessors() + 1) {
                Thread.sleep(10L);
            }
            countDownLatch.countDown();
            execute.awaitCompletion();
            Objects.requireNonNull(execute);
            Assertions.assertEquals("Failing just for fun", ((RuntimeException) Assertions.assertThrows(RuntimeException.class, execute::assertHealthy)).getMessage());
            stage.close();
            Assertions.assertTrue(trackingPanicMonitor.hasReceivedPanic());
        } catch (Throwable th) {
            stage.close();
            throw th;
        }
    }

    private static ProducerStep intProducer(Configuration configuration, Stage stage, final int i) {
        return new ProducerStep(stage.control(), configuration) { // from class: org.neo4j.internal.batchimport.staging.ProcessorStepTest.3
            protected void process() {
                for (int i2 = 0; i2 < i; i2++) {
                    sendDownstream(Integer.valueOf(i2));
                }
            }

            protected long position() {
                return 0L;
            }
        };
    }

    private static ProcessorStep<Integer> intProcessor(Configuration configuration, Stage stage) {
        return new ProcessorStep<Integer>(stage.control(), "processor", configuration, 1, PageCacheTracer.NULL, new StatsProvider[0]) { // from class: org.neo4j.internal.batchimport.staging.ProcessorStepTest.4
            /* JADX INFO: Access modifiers changed from: protected */
            public void process(Integer num, BatchSender batchSender, CursorContext cursorContext) {
                batchSender.send(num);
            }
        };
    }

    private static Callable<Void> receive(int i, ProcessorStep<Void> processorStep) {
        return () -> {
            processorStep.receive(i, (Object) null);
            return null;
        };
    }
}
