package org.neo4j.kernel.impl.index.schema;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.TreeSet;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.neo4j.index.internal.gbptree.Layout;
import org.neo4j.index.internal.gbptree.RawBytes;
import org.neo4j.index.internal.gbptree.SimpleByteArrayLayout;
import org.neo4j.kernel.api.index.IndexSample;
import org.neo4j.kernel.impl.index.schema.BlockStorage;
import org.neo4j.test.extension.Inject;
import org.neo4j.test.extension.OtherThreadExtension;
import org.neo4j.test.extension.RandomExtension;
import org.neo4j.test.rule.OtherThreadRule;
import org.neo4j.test.rule.RandomRule;

@ExtendWith({RandomExtension.class, OtherThreadExtension.class})
/* loaded from: input_file:org/neo4j/kernel/impl/index/schema/BlockEntryStreamMergerTest.class */
class BlockEntryStreamMergerTest {
    private static final int QUEUE_SIZE = 5;
    private static final int BATCH_SIZE = 10;

    @Inject
    private RandomRule random;

    @Inject
    private OtherThreadRule t2;
    private final Layout<RawBytes, RawBytes> layout = new SimpleByteArrayLayout();
    private final List<BlockEntry<RawBytes, RawBytes>> allData = new ArrayList();

    BlockEntryStreamMergerTest() {
    }

    @Test
    void shouldMergePartsIntoOneWithoutSampling() throws Exception {
        BlockEntryStreamMerger blockEntryStreamMerger = new BlockEntryStreamMerger(BlockEntryMergerTestUtils.buildParts(this.random, this.layout, this.allData), this.layout, (Comparator) null, BlockStorage.Cancellation.NOT_CANCELLABLE, BATCH_SIZE, QUEUE_SIZE);
        try {
            this.t2.execute(blockEntryStreamMerger);
            BlockEntryMergerTestUtils.assertMergedPartStream(this.allData, blockEntryStreamMerger);
            blockEntryStreamMerger.close();
        } catch (Throwable th) {
            try {
                blockEntryStreamMerger.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void shouldMergePartsIntoOneWithSampling() throws Exception {
        BlockEntryStreamMerger blockEntryStreamMerger = new BlockEntryStreamMerger(BlockEntryMergerTestUtils.buildParts(this.random, this.layout, this.allData), this.layout, this.layout, BlockStorage.Cancellation.NOT_CANCELLABLE, BATCH_SIZE, QUEUE_SIZE);
        try {
            Future execute = this.t2.execute(blockEntryStreamMerger);
            BlockEntryMergerTestUtils.assertMergedPartStream(this.allData, blockEntryStreamMerger);
            execute.get();
            IndexSample buildIndexSample = blockEntryStreamMerger.buildIndexSample();
            Assertions.assertThat(buildIndexSample.sampleSize()).isEqualTo(this.allData.size());
            Assertions.assertThat(buildIndexSample.indexSize()).isEqualTo(this.allData.size());
            Assertions.assertThat(buildIndexSample.uniqueValues()).isEqualTo(countUniqueKeys(this.allData));
            blockEntryStreamMerger.close();
        } catch (Throwable th) {
            try {
                blockEntryStreamMerger.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void shouldStopMergingWhenHalted() throws Exception {
        BlockEntryStreamMerger blockEntryStreamMerger = new BlockEntryStreamMerger(BlockEntryMergerTestUtils.buildParts(this.random, this.layout, this.allData, 4, randomRule -> {
            return 50;
        }), this.layout, (Comparator) null, BlockStorage.Cancellation.NOT_CANCELLABLE, BATCH_SIZE, QUEUE_SIZE);
        try {
            Future execute = this.t2.execute(blockEntryStreamMerger);
            this.t2.get().waitUntilWaiting(waitDetails -> {
                return waitDetails.isAt(BlockEntryStreamMerger.class, "call");
            });
            blockEntryStreamMerger.halt();
            execute.get();
            Assertions.assertThat(countEntries(blockEntryStreamMerger)).isEqualTo(50);
            blockEntryStreamMerger.close();
        } catch (Throwable th) {
            try {
                blockEntryStreamMerger.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void shouldStopMergingWhenCancelled() throws Exception {
        List<BlockEntryCursor<RawBytes, RawBytes>> buildParts = BlockEntryMergerTestUtils.buildParts(this.random, this.layout, this.allData, 4, randomRule -> {
            return 50;
        });
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        Layout<RawBytes, RawBytes> layout = this.layout;
        Objects.requireNonNull(atomicBoolean);
        BlockEntryStreamMerger blockEntryStreamMerger = new BlockEntryStreamMerger(buildParts, layout, (Comparator) null, atomicBoolean::get, BATCH_SIZE, QUEUE_SIZE);
        try {
            Future execute = this.t2.execute(blockEntryStreamMerger);
            this.t2.get().waitUntilWaiting(waitDetails -> {
                return waitDetails.isAt(BlockEntryStreamMerger.class, "call");
            });
            atomicBoolean.set(true);
            execute.get();
            Assertions.assertThat(countEntries(blockEntryStreamMerger)).isEqualTo(50);
            blockEntryStreamMerger.close();
        } catch (Throwable th) {
            try {
                blockEntryStreamMerger.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void shouldStopReaderFromAwaitingMoreBatchesWhenHalted() throws Exception {
        BlockEntryStreamMerger blockEntryStreamMerger = new BlockEntryStreamMerger(BlockEntryMergerTestUtils.buildParts(this.random, this.layout, this.allData, 4, randomRule -> {
            return 50;
        }), this.layout, (Comparator) null, BlockStorage.Cancellation.NOT_CANCELLABLE, BATCH_SIZE, QUEUE_SIZE);
        try {
            OtherThreadRule otherThreadRule = this.t2;
            Objects.requireNonNull(blockEntryStreamMerger);
            Future execute = otherThreadRule.execute(blockEntryStreamMerger::next);
            this.t2.get().waitUntilWaiting(waitDetails -> {
                return waitDetails.isAt(BlockEntryStreamMerger.class, "next");
            });
            blockEntryStreamMerger.halt();
            Assertions.assertThat((Boolean) execute.get()).isFalse();
            blockEntryStreamMerger.close();
        } catch (Throwable th) {
            try {
                blockEntryStreamMerger.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void shouldStopReaderFromAwaitingMoreBatchesWhenCancelled() throws Exception {
        BlockEntryStreamMerger blockEntryStreamMerger = new BlockEntryStreamMerger(BlockEntryMergerTestUtils.buildParts(this.random, this.layout, this.allData, 4, randomRule -> {
            return 50;
        }), this.layout, (Comparator) null, BlockStorage.Cancellation.NOT_CANCELLABLE, BATCH_SIZE, QUEUE_SIZE);
        try {
            OtherThreadRule otherThreadRule = this.t2;
            Objects.requireNonNull(blockEntryStreamMerger);
            Future execute = otherThreadRule.execute(blockEntryStreamMerger::next);
            this.t2.get().waitUntilWaiting(waitDetails -> {
                return waitDetails.isAt(BlockEntryStreamMerger.class, "next");
            });
            blockEntryStreamMerger.halt();
            Assertions.assertThat((Boolean) execute.get()).isFalse();
            blockEntryStreamMerger.close();
        } catch (Throwable th) {
            try {
                blockEntryStreamMerger.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private static int countEntries(BlockEntryStreamMerger<RawBytes, RawBytes> blockEntryStreamMerger) throws IOException {
        int i = 0;
        while (blockEntryStreamMerger.next()) {
            i++;
        }
        return i;
    }

    private long countUniqueKeys(List<BlockEntry<RawBytes, RawBytes>> list) {
        TreeSet treeSet = new TreeSet((Comparator) this.layout);
        list.forEach(blockEntry -> {
            treeSet.add((RawBytes) blockEntry.key());
        });
        return treeSet.size();
    }
}
