package org.neo4j.kernel.impl.transaction.state.storeview;

import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongFunction;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.neo4j.configuration.Config;
import org.neo4j.configuration.GraphDatabaseInternalSettings;
import org.neo4j.internal.batchimport.Configuration;
import org.neo4j.internal.batchimport.staging.ExecutionSupervisors;
import org.neo4j.internal.batchimport.staging.ProcessorAssignmentStrategies;
import org.neo4j.io.pagecache.context.CursorContext;
import org.neo4j.io.pagecache.context.CursorContextFactory;
import org.neo4j.io.pagecache.context.EmptyVersionContextSupplier;
import org.neo4j.io.pagecache.tracing.PageCacheTracer;
import org.neo4j.kernel.impl.api.index.PropertyScanConsumer;
import org.neo4j.kernel.impl.api.index.StoreScan;
import org.neo4j.kernel.impl.api.index.TokenScanConsumer;
import org.neo4j.kernel.impl.scheduler.JobSchedulerFactory;
import org.neo4j.kernel.impl.transaction.state.storeview.PropertyAwareEntityStoreScan;
import org.neo4j.lock.Lock;
import org.neo4j.lock.LockService;
import org.neo4j.memory.EmptyMemoryTracker;
import org.neo4j.scheduler.JobScheduler;
import org.neo4j.storageengine.api.PropertySelection;
import org.neo4j.storageengine.api.StorageNodeCursor;
import org.neo4j.storageengine.api.StubStorageCursors;
import org.neo4j.storageengine.api.cursor.StoreCursors;
import org.neo4j.test.DoubleLatch;
import org.neo4j.values.storable.Value;
import org.neo4j.values.storable.Values;

/* loaded from: input_file:org/neo4j/kernel/impl/transaction/state/storeview/StoreScanStageTest.class */
class StoreScanStageTest {
    private static final int WORKERS = 8;
    private static final int LABEL = 1;
    private static final String KEY = "key";
    private static final int NUMBER_OF_BATCHES = 4;
    private static final CursorContextFactory CONTEXT_FACTORY = new CursorContextFactory(PageCacheTracer.NULL, EmptyVersionContextSupplier.EMPTY);
    private final Config dbConfig = Config.defaults(GraphDatabaseInternalSettings.index_population_workers, Integer.valueOf(WORKERS));
    private final Configuration config = new Configuration() { // from class: org.neo4j.kernel.impl.transaction.state.storeview.StoreScanStageTest.1
        public int maxNumberOfWorkerThreads() {
            return StoreScanStageTest.WORKERS;
        }

        public int batchSize() {
            return 10;
        }
    };
    private final JobScheduler jobScheduler = JobSchedulerFactory.createInitialisedScheduler();

    /* loaded from: input_file:org/neo4j/kernel/impl/transaction/state/storeview/StoreScanStageTest$AbortingExternalUpdatesCheck.class */
    private static class AbortingExternalUpdatesCheck implements StoreScan.ExternalUpdatesCheck {
        private final int abortAfterBatch;
        private final AtomicBoolean continueScanning;
        private int callCount;

        AbortingExternalUpdatesCheck(int i, AtomicBoolean atomicBoolean) {
            this.abortAfterBatch = i;
            this.continueScanning = atomicBoolean;
        }

        public boolean needToApplyExternalUpdates() {
            int i = this.callCount;
            this.callCount = i + StoreScanStageTest.LABEL;
            if (i != this.abortAfterBatch) {
                return false;
            }
            this.continueScanning.set(false);
            return false;
        }

        public void applyExternalUpdates(long j) {
            throw new IllegalStateException("Should not be called");
        }
    }

    /* loaded from: input_file:org/neo4j/kernel/impl/transaction/state/storeview/StoreScanStageTest$ControlledExternalUpdatesCheck.class */
    private static class ControlledExternalUpdatesCheck implements StoreScan.ExternalUpdatesCheck {
        private final long expectedNodeId;
        private final int applyOnBatchIndex;
        private final AtomicInteger numBatchesProcessed;
        private int checkCallCount;
        private volatile int applyCallCount;

        ControlledExternalUpdatesCheck(int i, int i2, AtomicInteger atomicInteger, boolean z) {
            this.applyOnBatchIndex = i2;
            this.numBatchesProcessed = atomicInteger;
            this.expectedNodeId = z ? (i * i2) - StoreScanStageTest.LABEL : Long.MAX_VALUE;
        }

        public boolean needToApplyExternalUpdates() {
            int i = this.checkCallCount;
            this.checkCallCount = i + StoreScanStageTest.LABEL;
            return i == this.applyOnBatchIndex;
        }

        public void applyExternalUpdates(long j) {
            Assertions.assertThat(j).isEqualTo(this.expectedNodeId);
            Assertions.assertThat(this.numBatchesProcessed.get()).isEqualTo(this.applyOnBatchIndex);
            this.applyCallCount += StoreScanStageTest.LABEL;
        }
    }

    /* loaded from: input_file:org/neo4j/kernel/impl/transaction/state/storeview/StoreScanStageTest$ControlledLockFunction.class */
    private static class ControlledLockFunction implements LongFunction<Lock> {
        private final Set<Thread> seenThreads = ConcurrentHashMap.newKeySet();
        private final CountDownLatch latch = new CountDownLatch(2);

        private ControlledLockFunction() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.function.LongFunction
        public Lock apply(long j) {
            this.seenThreads.add(Thread.currentThread());
            this.latch.countDown();
            DoubleLatch.awaitLatch(this.latch);
            return null;
        }
    }

    /* loaded from: input_file:org/neo4j/kernel/impl/transaction/state/storeview/StoreScanStageTest$PropertyConsumer.class */
    private static class PropertyConsumer implements PropertyScanConsumer {
        private final Runnable action;

        PropertyConsumer(Runnable runnable) {
            this.action = runnable;
        }

        public PropertyScanConsumer.Batch newBatch() {
            return new PropertyScanConsumer.Batch() { // from class: org.neo4j.kernel.impl.transaction.state.storeview.StoreScanStageTest.PropertyConsumer.1
                public void addRecord(long j, long[] jArr, Map<Integer, Value> map) {
                }

                public void process() {
                    PropertyConsumer.this.action.run();
                }
            };
        }
    }

    /* loaded from: input_file:org/neo4j/kernel/impl/transaction/state/storeview/StoreScanStageTest$ThreadCapturingPropertyConsumer.class */
    private static class ThreadCapturingPropertyConsumer implements PropertyScanConsumer {
        private final Set<Thread> seenThreads = ConcurrentHashMap.newKeySet();

        private ThreadCapturingPropertyConsumer() {
        }

        public PropertyScanConsumer.Batch newBatch() {
            return new PropertyScanConsumer.Batch() { // from class: org.neo4j.kernel.impl.transaction.state.storeview.StoreScanStageTest.ThreadCapturingPropertyConsumer.1
                public void addRecord(long j, long[] jArr, Map<Integer, Value> map) {
                }

                public void process() {
                    ThreadCapturingPropertyConsumer.this.seenThreads.add(Thread.currentThread());
                }
            };
        }
    }

    /* loaded from: input_file:org/neo4j/kernel/impl/transaction/state/storeview/StoreScanStageTest$ThreadCapturingTokenConsumer.class */
    private static class ThreadCapturingTokenConsumer implements TokenScanConsumer {
        private final Set<Thread> seenThreads = ConcurrentHashMap.newKeySet();

        private ThreadCapturingTokenConsumer() {
        }

        public TokenScanConsumer.Batch newBatch() {
            return new TokenScanConsumer.Batch() { // from class: org.neo4j.kernel.impl.transaction.state.storeview.StoreScanStageTest.ThreadCapturingTokenConsumer.1
                public void addRecord(long j, long[] jArr) {
                }

                public void process() {
                    ThreadCapturingTokenConsumer.this.seenThreads.add(Thread.currentThread());
                }
            };
        }
    }

    StoreScanStageTest() {
    }

    @AfterEach
    void tearDown() throws Exception {
        this.jobScheduler.close();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest(name = "parallelWrite={0}")
    void shouldGenerateUpdatesInParallel(boolean z) {
        StubStorageCursors someData = someData();
        PropertyAwareEntityStoreScan.CursorEntityIdIterator cursorEntityIdIterator = new PropertyAwareEntityStoreScan.CursorEntityIdIterator(someData.allocateNodeCursor(CursorContext.NULL_CONTEXT, StoreCursors.NULL));
        ThreadCapturingPropertyConsumer threadCapturingPropertyConsumer = new ThreadCapturingPropertyConsumer();
        ThreadCapturingTokenConsumer threadCapturingTokenConsumer = new ThreadCapturingTokenConsumer();
        ControlledLockFunction controlledLockFunction = new ControlledLockFunction();
        runScan(new StoreScanStage(this.dbConfig, this.config, (cursorContext, storeCursors) -> {
            return cursorEntityIdIterator;
        }, StoreScan.NO_EXTERNAL_UPDATES, new AtomicBoolean(true), someData, cursorContext2 -> {
            return StoreCursors.NULL;
        }, new int[]{LABEL}, PropertySelection.ALL_PROPERTIES, threadCapturingPropertyConsumer, threadCapturingTokenConsumer, new NodeCursorBehaviour(someData), controlledLockFunction, z, this.jobScheduler, CONTEXT_FACTORY, EmptyMemoryTracker.INSTANCE, true));
        Assertions.assertThat(controlledLockFunction.seenThreads.size()).isGreaterThan(LABEL);
        if (z) {
            Assertions.assertThat(threadCapturingPropertyConsumer.seenThreads.size()).isGreaterThan(LABEL);
            Assertions.assertThat(threadCapturingTokenConsumer.seenThreads.size()).isGreaterThan(LABEL);
        } else {
            Assertions.assertThat(threadCapturingPropertyConsumer.seenThreads.size()).isEqualTo(LABEL);
            Assertions.assertThat(threadCapturingTokenConsumer.seenThreads.size()).isEqualTo(LABEL);
        }
    }

    @Test
    void shouldPanicAndExitStageOnWriteFailure() {
        StubStorageCursors someData = someData();
        PropertyAwareEntityStoreScan.CursorEntityIdIterator cursorEntityIdIterator = new PropertyAwareEntityStoreScan.CursorEntityIdIterator(someData.allocateNodeCursor(CursorContext.NULL_CONTEXT, StoreCursors.NULL));
        StoreScanStage storeScanStage = new StoreScanStage(this.dbConfig, this.config, (cursorContext, storeCursors) -> {
            return cursorEntityIdIterator;
        }, StoreScan.NO_EXTERNAL_UPDATES, new AtomicBoolean(true), someData, cursorContext2 -> {
            return StoreCursors.NULL;
        }, new int[]{LABEL}, PropertySelection.ALL_PROPERTIES, new PropertyConsumer(() -> {
            throw new IllegalStateException("Failed to write");
        }), (TokenScanConsumer) null, new NodeCursorBehaviour(someData), j -> {
            return null;
        }, true, this.jobScheduler, CONTEXT_FACTORY, EmptyMemoryTracker.INSTANCE, true);
        Assertions.assertThatThrownBy(() -> {
            runScan(storeScanStage);
        }).isInstanceOf(IllegalStateException.class).hasMessageContaining("Failed to write");
    }

    @Test
    void shouldApplyExternalUpdatesIfThereAreSuch() {
        StubStorageCursors someData = someData();
        PropertyAwareEntityStoreScan.CursorEntityIdIterator cursorEntityIdIterator = new PropertyAwareEntityStoreScan.CursorEntityIdIterator(someData.allocateNodeCursor(CursorContext.NULL_CONTEXT, StoreCursors.NULL));
        AtomicInteger atomicInteger = new AtomicInteger();
        ControlledExternalUpdatesCheck controlledExternalUpdatesCheck = new ControlledExternalUpdatesCheck(this.config.batchSize(), 2, atomicInteger, true);
        Objects.requireNonNull(atomicInteger);
        runScan(new StoreScanStage(this.dbConfig, this.config, (obj, obj2) -> {
            return cursorEntityIdIterator;
        }, controlledExternalUpdatesCheck, new AtomicBoolean(true), someData, obj3 -> {
            return StoreCursors.NULL;
        }, new int[]{LABEL}, PropertySelection.ALL_PROPERTIES, new PropertyConsumer(atomicInteger::incrementAndGet), (TokenScanConsumer) null, new NodeCursorBehaviour(someData), j -> {
            return null;
        }, true, this.jobScheduler, CONTEXT_FACTORY, EmptyMemoryTracker.INSTANCE, true));
        Assertions.assertThat(controlledExternalUpdatesCheck.applyCallCount).isEqualTo(LABEL);
    }

    @Test
    void shouldAbortScanOnStopped() {
        StubStorageCursors someData = someData();
        PropertyAwareEntityStoreScan.CursorEntityIdIterator cursorEntityIdIterator = new PropertyAwareEntityStoreScan.CursorEntityIdIterator(someData.allocateNodeCursor(CursorContext.NULL_CONTEXT, StoreCursors.NULL));
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        AbortingExternalUpdatesCheck abortingExternalUpdatesCheck = new AbortingExternalUpdatesCheck(LABEL, atomicBoolean);
        Objects.requireNonNull(atomicInteger);
        runScan(new StoreScanStage(this.dbConfig, this.config, (obj, obj2) -> {
            return cursorEntityIdIterator;
        }, abortingExternalUpdatesCheck, atomicBoolean, someData, obj3 -> {
            return StoreCursors.NULL;
        }, new int[]{LABEL}, PropertySelection.ALL_PROPERTIES, new PropertyConsumer(atomicInteger::incrementAndGet), (TokenScanConsumer) null, new NodeCursorBehaviour(someData), j -> {
            return null;
        }, true, this.jobScheduler, CONTEXT_FACTORY, EmptyMemoryTracker.INSTANCE, true));
        Assertions.assertThat(atomicInteger.get()).isEqualTo(2);
    }

    @Test
    void shouldReportCorrectNumberOfEntitiesProcessed() {
        StubStorageCursors someData = someData();
        final AtomicReference atomicReference = new AtomicReference();
        PropertyAwareEntityStoreScan.CursorEntityIdIterator<StorageNodeCursor> cursorEntityIdIterator = new PropertyAwareEntityStoreScan.CursorEntityIdIterator<StorageNodeCursor>(someData.allocateNodeCursor(CursorContext.NULL_CONTEXT, StoreCursors.NULL)) { // from class: org.neo4j.kernel.impl.transaction.state.storeview.StoreScanStageTest.2
            private long manualCounter;

            protected boolean fetchNext() {
                Assertions.assertThat(((StoreScanStage) atomicReference.get()).numberOfIteratedEntities()).isEqualTo((this.manualCounter / StoreScanStageTest.this.config.batchSize()) * StoreScanStageTest.this.config.batchSize());
                this.manualCounter++;
                return super.fetchNext();
            }
        };
        StoreScanStage storeScanStage = new StoreScanStage(this.dbConfig, this.config, (obj, obj2) -> {
            return cursorEntityIdIterator;
        }, StoreScan.NO_EXTERNAL_UPDATES, new AtomicBoolean(true), someData, obj3 -> {
            return StoreCursors.NULL;
        }, new int[]{LABEL}, PropertySelection.ALL_PROPERTIES, new ThreadCapturingPropertyConsumer(), new ThreadCapturingTokenConsumer(), new NodeCursorBehaviour(someData), j -> {
            return LockService.NO_LOCK;
        }, true, this.jobScheduler, CONTEXT_FACTORY, EmptyMemoryTracker.INSTANCE, true);
        atomicReference.set(storeScanStage);
        runScan(storeScanStage);
        Assertions.assertThat(storeScanStage.numberOfIteratedEntities()).isEqualTo(this.config.batchSize() * 4);
    }

    @Test
    void shouldProvideMaxIdIfCannotDetermineCutOffPoint() {
        StubStorageCursors someData = someData();
        PropertyAwareEntityStoreScan.CursorEntityIdIterator cursorEntityIdIterator = new PropertyAwareEntityStoreScan.CursorEntityIdIterator(someData.allocateNodeCursor(CursorContext.NULL_CONTEXT, StoreCursors.NULL));
        AtomicInteger atomicInteger = new AtomicInteger();
        ControlledExternalUpdatesCheck controlledExternalUpdatesCheck = new ControlledExternalUpdatesCheck(this.config.batchSize(), 2, atomicInteger, false);
        Objects.requireNonNull(atomicInteger);
        runScan(new StoreScanStage(this.dbConfig, this.config, (cursorContext, storeCursors) -> {
            return cursorEntityIdIterator;
        }, controlledExternalUpdatesCheck, new AtomicBoolean(true), someData, cursorContext2 -> {
            return StoreCursors.NULL;
        }, new int[]{LABEL}, PropertySelection.ALL_PROPERTIES, new PropertyConsumer(atomicInteger::incrementAndGet), (TokenScanConsumer) null, new NodeCursorBehaviour(someData), j -> {
            return null;
        }, true, this.jobScheduler, CONTEXT_FACTORY, EmptyMemoryTracker.INSTANCE, false));
        Assertions.assertThat(controlledExternalUpdatesCheck.applyCallCount).isEqualTo(LABEL);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void runScan(StoreScanStage<StorageNodeCursor> storeScanStage) {
        ExecutionSupervisors.superviseDynamicExecution(ProcessorAssignmentStrategies.saturateSpecificStep(LABEL), storeScanStage);
    }

    private StubStorageCursors someData() {
        StubStorageCursors stubStorageCursors = new StubStorageCursors();
        for (int i = 0; i < this.config.batchSize() * NUMBER_OF_BATCHES; i += LABEL) {
            stubStorageCursors.withNode(i).labels(new long[]{1}).properties(new Object[]{KEY, Values.stringValue("name_" + i)});
        }
        return stubStorageCursors;
    }
}
