package org.neo4j.unsafe.impl.batchimport.staging;

import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.Assert;
import org.junit.Test;
import org.neo4j.unsafe.impl.batchimport.Configuration;
import org.neo4j.unsafe.impl.batchimport.stats.Keys;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;

/* loaded from: input_file:org/neo4j/unsafe/impl/batchimport/staging/StageTest.class */
public class StageTest {

    /* loaded from: input_file:org/neo4j/unsafe/impl/batchimport/staging/StageTest$ReceiveOrderAssertingStep.class */
    private static class ReceiveOrderAssertingStep extends ProcessorStep<Object> {
        private final AtomicLong lastTicket;
        private final long processingTime;
        private final boolean endOfLine;

        ReceiveOrderAssertingStep(StageControl stageControl, String str, Configuration configuration, long j, boolean z) {
            super(stageControl, str, configuration, 1, new StatsProvider[0]);
            this.lastTicket = new AtomicLong();
            this.processingTime = j;
            this.endOfLine = z;
        }

        public long receive(long j, Object obj) {
            Assert.assertEquals("For " + obj + " in " + name(), this.lastTicket.getAndIncrement(), j);
            return super.receive(j, obj);
        }

        protected void process(Object obj, BatchSender batchSender) {
            try {
                Thread.sleep(this.processingTime);
                if (this.endOfLine) {
                    return;
                }
                batchSender.send(obj);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Test
    public void shouldReceiveBatchesInOrder() throws Exception {
        Configuration.Overridden overridden = new Configuration.Overridden(Configuration.DEFAULT) { // from class: org.neo4j.unsafe.impl.batchimport.staging.StageTest.1
            public int batchSize() {
                return 10;
            }
        };
        Stage stage = new Stage("Test stage", overridden, 1);
        final long batchSize = 1000 * overridden.batchSize();
        stage.add(new ProducerStep(stage.control(), "Producer", overridden) { // from class: org.neo4j.unsafe.impl.batchimport.staging.StageTest.2
            private final Object theObject = new Object();
            private long i;

            protected Object nextBatchOrNull(long j, int i) {
                if (this.i >= batchSize) {
                    return null;
                }
                Object[] objArr = new Object[i];
                Arrays.fill(objArr, this.theObject);
                this.i += i;
                return objArr;
            }
        });
        for (int i = 0; i < 3; i++) {
            stage.add(new ReceiveOrderAssertingStep(stage.control(), "Step" + i, overridden, i, false));
        }
        stage.add(new ReceiveOrderAssertingStep(stage.control(), "Final step", overridden, 0L, true));
        StageExecution execute = stage.execute();
        Iterator it = execute.steps().iterator();
        while (it.hasNext()) {
            ((Step) it.next()).processors(1);
        }
        new ExecutionSupervisor(ExecutionMonitors.invisible()).supervise(execute);
        for (Step step : execute.steps()) {
            Assert.assertEquals("For " + step, 1000L, step.stats().stat(Keys.done_batches).asLong());
        }
        stage.close();
    }
}
