package org.neo4j.unsafe.impl.batchimport.staging;

import java.lang.Thread;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.neo4j.helpers.collection.IteratorWrapper;
import org.neo4j.helpers.collection.Iterators;
import org.neo4j.test.DoubleLatch;
import org.neo4j.test.OtherThreadExecutor;
import org.neo4j.test.OtherThreadRule;
import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy;
import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutionPanicException;

/* loaded from: input_file:org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessingTest.class */
public class TicketedProcessingTest {

    @Rule
    public final OtherThreadRule<Void> t2 = new OtherThreadRule<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessingTest$StringJob.class */
    public static class StringJob {
        final String string;
        final CountDownLatch latch = new CountDownLatch(1);

        StringJob(String str) {
            this.string = str;
        }
    }

    @Test
    public void shouldReturnTicketsInOrder() throws Exception {
        final int i = 1000;
        ParkStrategy.Park park = new ParkStrategy.Park(2L, TimeUnit.MILLISECONDS);
        BiFunction biFunction = (num, r5) -> {
            if (ThreadLocalRandom.current().nextFloat() < 0.01f) {
                park.park(Thread.currentThread());
            }
            return Integer.valueOf(num.intValue() * 2);
        };
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        final TicketedProcessing ticketedProcessing = new TicketedProcessing("Doubler", availableProcessors, biFunction, () -> {
            return null;
        });
        Throwable th = null;
        try {
            ticketedProcessing.processors(availableProcessors - ticketedProcessing.processors(0));
            Future<RESULT> execute = this.t2.execute(new OtherThreadExecutor.WorkerCommand<Void, Void>() { // from class: org.neo4j.unsafe.impl.batchimport.staging.TicketedProcessingTest.1
                @Override // org.neo4j.test.OtherThreadExecutor.WorkerCommand
                public Void doWork(Void r6) throws Exception {
                    for (int i2 = 0; i2 < i; i2++) {
                        Assert.assertNotNull((Integer) ticketedProcessing.next());
                        Assert.assertEquals(i2 * 2, r0.intValue());
                    }
                    Assert.assertNull(ticketedProcessing.next());
                    return null;
                }
            });
            for (int i2 = 0; i2 < 1000; i2++) {
                ticketedProcessing.submit(Integer.valueOf(i2));
            }
            execute.get();
        } finally {
            if (ticketedProcessing != null) {
                if (0 != 0) {
                    try {
                        ticketedProcessing.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    ticketedProcessing.close();
                }
            }
        }
    }

    @Test
    public void shouldNotBeAbleToSubmitTooFarAhead() throws Exception {
        final TicketedProcessing ticketedProcessing = new TicketedProcessing("Parser", 2, (stringJob, r3) -> {
            DoubleLatch.awaitLatch(stringJob.latch);
            return Integer.valueOf(Integer.parseInt(stringJob.string));
        }, () -> {
            return null;
        });
        Throwable th = null;
        try {
            ticketedProcessing.processors(1);
            StringJob stringJob2 = new StringJob("1");
            ticketedProcessing.submit(stringJob2);
            StringJob stringJob3 = new StringJob("2");
            ticketedProcessing.submit(stringJob3);
            final StringJob stringJob4 = new StringJob("3");
            stringJob4.latch.countDown();
            Future<RESULT> execute = this.t2.execute(new OtherThreadExecutor.WorkerCommand<Void, Void>() { // from class: org.neo4j.unsafe.impl.batchimport.staging.TicketedProcessingTest.2
                @Override // org.neo4j.test.OtherThreadExecutor.WorkerCommand
                public Void doWork(Void r4) throws Exception {
                    ticketedProcessing.submit(stringJob4);
                    return null;
                }
            });
            this.t2.get().waitUntilThreadState(Thread.State.TIMED_WAITING, Thread.State.WAITING);
            stringJob2.latch.countDown();
            Assert.assertEquals(1L, ((Integer) ticketedProcessing.next()).intValue());
            execute.get();
            stringJob3.latch.countDown();
            Assert.assertEquals(2L, ((Integer) ticketedProcessing.next()).intValue());
            Assert.assertEquals(3L, ((Integer) ticketedProcessing.next()).intValue());
            if (ticketedProcessing != null) {
                if (0 == 0) {
                    ticketedProcessing.close();
                    return;
                }
                try {
                    ticketedProcessing.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (ticketedProcessing != null) {
                if (0 != 0) {
                    try {
                        ticketedProcessing.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    ticketedProcessing.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldNoticeSlurpPanic() throws Exception {
        IllegalStateException illegalStateException = new IllegalStateException("Consistently failing");
        TicketedProcessing ticketedProcessing = new TicketedProcessing("Parser", 2, (stringJob, r3) -> {
            return Integer.valueOf(Integer.parseInt(stringJob.string));
        }, () -> {
            return null;
        });
        Throwable th = null;
        try {
            try {
                ticketedProcessing.processors(1);
                Future slurp = ticketedProcessing.slurp(failingIterator(Iterators.iterator(new Supplier[]{() -> {
                    return new StringJob("1");
                }, () -> {
                    throw illegalStateException;
                }})), true);
                do {
                    try {
                    } catch (TaskExecutionPanicException e) {
                        Assert.assertSame(illegalStateException, e.getCause());
                    }
                } while (ticketedProcessing.next() != null);
                Assert.fail("Should have noticed the slurp failure");
                try {
                    slurp.get();
                    Assert.fail("Should have noticed the slurp failure");
                } catch (ExecutionException e2) {
                    Assert.assertSame(illegalStateException, e2.getCause());
                }
                if (ticketedProcessing != null) {
                    if (0 == 0) {
                        ticketedProcessing.close();
                        return;
                    }
                    try {
                        ticketedProcessing.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (ticketedProcessing != null) {
                if (th != null) {
                    try {
                        ticketedProcessing.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    ticketedProcessing.close();
                }
            }
            throw th4;
        }
    }

    private static Iterator<StringJob> failingIterator(Iterator<Supplier<StringJob>> it) {
        return new IteratorWrapper<StringJob, Supplier<StringJob>>(it) { // from class: org.neo4j.unsafe.impl.batchimport.staging.TicketedProcessingTest.3
            /* JADX INFO: Access modifiers changed from: protected */
            public StringJob underlyingObjectToObject(Supplier<StringJob> supplier) {
                return supplier.get();
            }
        };
    }
}
