package org.neo4j.kernel.impl.transaction.log;

import java.io.File;
import java.io.Flushable;
import java.io.IOException;
import java.lang.StackWalker;
import java.lang.Thread;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
import org.neo4j.adversaries.ClassGuardedAdversary;
import org.neo4j.adversaries.CountingAdversary;
import org.neo4j.adversaries.fs.AdversarialFileSystemAbstraction;
import org.neo4j.io.fs.DelegatingStoreChannel;
import org.neo4j.io.fs.EphemeralFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemLifecycleAdapter;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.io.layout.DatabaseLayout;
import org.neo4j.kernel.impl.api.TestCommand;
import org.neo4j.kernel.impl.api.TestCommandReaderFactory;
import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.transaction.SimpleLogVersionRepository;
import org.neo4j.kernel.impl.transaction.SimpleTransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit;
import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader;
import org.neo4j.kernel.impl.transaction.log.files.LogFile;
import org.neo4j.kernel.impl.transaction.log.files.LogFiles;
import org.neo4j.kernel.impl.transaction.log.files.LogFilesBuilder;
import org.neo4j.kernel.impl.transaction.log.files.TransactionLogFiles;
import org.neo4j.kernel.impl.transaction.log.rotation.LogRotation;
import org.neo4j.kernel.impl.transaction.tracing.LogAppendEvent;
import org.neo4j.kernel.impl.transaction.tracing.LogForceWaitEvent;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.logging.NullLog;
import org.neo4j.monitoring.DatabaseHealth;
import org.neo4j.monitoring.DatabasePanicEventGenerator;
import org.neo4j.monitoring.Health;
import org.neo4j.storageengine.api.StoreId;
import org.neo4j.storageengine.api.TransactionIdStore;
import org.neo4j.test.DoubleLatch;
import org.neo4j.test.Race;
import org.neo4j.test.ThreadTestUtils;
import org.neo4j.test.extension.EphemeralNeo4jLayoutExtension;
import org.neo4j.test.extension.Inject;
import org.neo4j.test.extension.LifeExtension;

@EphemeralNeo4jLayoutExtension
@ExtendWith({LifeExtension.class})
/* loaded from: input_file:org/neo4j/kernel/impl/transaction/log/BatchingTransactionAppenderConcurrencyTest.class */
public class BatchingTransactionAppenderConcurrencyTest {
    private static final long MILLISECONDS_TO_WAIT = TimeUnit.MINUTES.toMillis(1);
    private static final Predicate<StackTraceElement[]> IN_CORRECT_FORCE_AFTER_APPEND_METHOD = stackTraceElementArr -> {
        return Stream.of((Object[]) stackTraceElementArr).anyMatch(stackTraceElement -> {
            return stackTraceElement.getMethodName().equals("forceAfterAppend");
        });
    };
    private static ExecutorService executor;

    @Inject
    private LifeSupport life;

    @Inject
    private DatabaseLayout databaseLayout;
    private final LogAppendEvent logAppendEvent = LogAppendEvent.NULL;
    private final LogFiles logFiles = (LogFiles) Mockito.mock(TransactionLogFiles.class);
    private final LogFile logFile = (LogFile) Mockito.mock(LogFile.class);
    private final LogRotation logRotation = LogRotation.NO_ROTATION;
    private final TransactionMetadataCache transactionMetadataCache = new TransactionMetadataCache();
    private final TransactionIdStore transactionIdStore = new SimpleTransactionIdStore();
    private final SimpleLogVersionRepository logVersionRepository = new SimpleLogVersionRepository();
    private final Health databaseHealth = (Health) Mockito.mock(DatabaseHealth.class);
    private final Semaphore forceSemaphore = new Semaphore(0);
    private final BlockingQueue<ChannelCommand> channelCommandQueue = new LinkedBlockingQueue(2);

    /* loaded from: input_file:org/neo4j/kernel/impl/transaction/log/BatchingTransactionAppenderConcurrencyTest$ChannelCommand.class */
    private enum ChannelCommand {
        emptyBufferIntoChannelAndClearIt,
        force,
        dummy
    }

    /* loaded from: input_file:org/neo4j/kernel/impl/transaction/log/BatchingTransactionAppenderConcurrencyTest$CommandQueueChannel.class */
    class CommandQueueChannel extends InMemoryClosableChannel implements Flushable {
        CommandQueueChannel() {
        }

        public Flushable prepareForFlush() {
            try {
                BatchingTransactionAppenderConcurrencyTest.this.channelCommandQueue.put(ChannelCommand.emptyBufferIntoChannelAndClearIt);
                return this;
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        @Override // java.io.Flushable
        public void flush() throws IOException {
            try {
                BatchingTransactionAppenderConcurrencyTest.this.forceSemaphore.release();
                BatchingTransactionAppenderConcurrencyTest.this.channelCommandQueue.put(ChannelCommand.force);
            } catch (InterruptedException e) {
                throw new IOException(e);
            }
        }
    }

    /* loaded from: input_file:org/neo4j/kernel/impl/transaction/log/BatchingTransactionAppenderConcurrencyTest$OutOfMemoryAwareFileSystem.class */
    private static class OutOfMemoryAwareFileSystem extends EphemeralFileSystemAbstraction {
        private volatile boolean shouldOOM;

        private OutOfMemoryAwareFileSystem() {
        }

        public synchronized StoreChannel write(File file) throws IOException {
            return new DelegatingStoreChannel(super.write(file)) { // from class: org.neo4j.kernel.impl.transaction.log.BatchingTransactionAppenderConcurrencyTest.OutOfMemoryAwareFileSystem.1
                public void writeAll(ByteBuffer byteBuffer) throws IOException {
                    if (OutOfMemoryAwareFileSystem.this.shouldOOM) {
                        throw new OutOfMemoryError("Temporary buffer allocation failed");
                    }
                    super.writeAll(byteBuffer);
                }
            };
        }
    }

    /* loaded from: input_file:org/neo4j/kernel/impl/transaction/log/BatchingTransactionAppenderConcurrencyTest$SlowPanickingDatabaseHealth.class */
    private static class SlowPanickingDatabaseHealth extends DatabaseHealth {
        private final CountDownLatch panicLatch;
        private final CountDownLatch adversaryLatch;

        SlowPanickingDatabaseHealth(CountDownLatch countDownLatch, CountDownLatch countDownLatch2) {
            super((DatabasePanicEventGenerator) Mockito.mock(DatabasePanicEventGenerator.class), NullLog.getInstance());
            this.panicLatch = countDownLatch;
            this.adversaryLatch = countDownLatch2;
        }

        public void panic(Throwable th) {
            this.panicLatch.countDown();
            try {
                this.adversaryLatch.await();
                super.panic(th);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @BeforeAll
    static void setUpExecutor() {
        executor = Executors.newCachedThreadPool();
    }

    @AfterAll
    static void tearDownExecutor() {
        executor.shutdown();
        executor = null;
    }

    @BeforeEach
    void setUp() {
        Mockito.when(this.logFiles.getLogFile()).thenReturn(this.logFile);
        Mockito.when(this.logFile.getWriter()).thenReturn(new CommandQueueChannel());
    }

    @Test
    void shouldForceLogChannel() throws Throwable {
        BatchingTransactionAppender add = this.life.add(createTransactionAppender());
        this.life.start();
        add.forceAfterAppend(this.logAppendEvent);
        MatcherAssert.assertThat(this.channelCommandQueue.take(), Matchers.is(ChannelCommand.emptyBufferIntoChannelAndClearIt));
        MatcherAssert.assertThat(this.channelCommandQueue.take(), Matchers.is(ChannelCommand.force));
        Assertions.assertTrue(this.channelCommandQueue.isEmpty());
    }

    @Test
    void shouldWaitForOngoingForceToCompleteBeforeForcingAgain() throws Throwable {
        this.channelCommandQueue.put(ChannelCommand.dummy);
        BatchingTransactionAppender batchingTransactionAppender = (BatchingTransactionAppender) this.life.add(createTransactionAppender());
        this.life.start();
        Runnable createForceAfterAppendRunnable = createForceAfterAppendRunnable(batchingTransactionAppender);
        Future<?> submit = executor.submit(createForceAfterAppendRunnable);
        this.forceSemaphore.acquire();
        Thread fork = ThreadTestUtils.fork(createForceAfterAppendRunnable);
        ThreadTestUtils.awaitThreadState(fork, MILLISECONDS_TO_WAIT, Thread.State.TIMED_WAITING, new Thread.State[0]);
        MatcherAssert.assertThat(this.channelCommandQueue.take(), Matchers.is(ChannelCommand.dummy));
        MatcherAssert.assertThat(this.channelCommandQueue.take(), Matchers.is(ChannelCommand.emptyBufferIntoChannelAndClearIt));
        MatcherAssert.assertThat(this.channelCommandQueue.take(), Matchers.is(ChannelCommand.force));
        MatcherAssert.assertThat(this.channelCommandQueue.take(), Matchers.is(ChannelCommand.emptyBufferIntoChannelAndClearIt));
        MatcherAssert.assertThat(this.channelCommandQueue.take(), Matchers.is(ChannelCommand.force));
        submit.get();
        fork.join();
        Assertions.assertTrue(this.channelCommandQueue.isEmpty());
    }

    @Test
    void shouldBatchUpMultipleWaitingForceRequests() throws Throwable {
        this.channelCommandQueue.put(ChannelCommand.dummy);
        BatchingTransactionAppender batchingTransactionAppender = (BatchingTransactionAppender) this.life.add(createTransactionAppender());
        this.life.start();
        Runnable createForceAfterAppendRunnable = createForceAfterAppendRunnable(batchingTransactionAppender);
        Future<?> submit = executor.submit(createForceAfterAppendRunnable);
        this.forceSemaphore.acquire();
        Thread[] threadArr = new Thread[10];
        for (int i = 0; i < threadArr.length; i++) {
            threadArr[i] = ThreadTestUtils.fork(createForceAfterAppendRunnable);
        }
        for (Thread thread : threadArr) {
            ThreadTestUtils.awaitThreadState(thread, MILLISECONDS_TO_WAIT, IN_CORRECT_FORCE_AFTER_APPEND_METHOD, Thread.State.TIMED_WAITING, new Thread.State[0]);
        }
        MatcherAssert.assertThat(this.channelCommandQueue.take(), Matchers.is(ChannelCommand.dummy));
        MatcherAssert.assertThat(this.channelCommandQueue.take(), Matchers.is(ChannelCommand.emptyBufferIntoChannelAndClearIt));
        MatcherAssert.assertThat(this.channelCommandQueue.take(), Matchers.is(ChannelCommand.force));
        MatcherAssert.assertThat(this.channelCommandQueue.take(), Matchers.is(ChannelCommand.emptyBufferIntoChannelAndClearIt));
        MatcherAssert.assertThat(this.channelCommandQueue.take(), Matchers.is(ChannelCommand.force));
        submit.get();
        for (Thread thread2 : threadArr) {
            thread2.join();
        }
        Assertions.assertTrue(this.channelCommandQueue.isEmpty(), "Command queue: " + this.channelCommandQueue);
    }

    @Test
    void shouldHaveAllConcurrentAppendersSeePanic() throws Throwable {
        AdversarialFileSystemAbstraction adversarialFileSystemAbstraction = new AdversarialFileSystemAbstraction(new ClassGuardedAdversary(new CountingAdversary(1, true), failMethod(BatchingTransactionAppender.class, "force")), new EphemeralFileSystemAbstraction());
        this.life.add(new FileSystemLifecycleAdapter(adversarialFileSystemAbstraction));
        DatabaseHealth databaseHealth = new DatabaseHealth((DatabasePanicEventGenerator) Mockito.mock(DatabasePanicEventGenerator.class), NullLog.getInstance());
        LogFiles build = LogFilesBuilder.builder(this.databaseLayout, adversarialFileSystemAbstraction).withLogVersionRepository(this.logVersionRepository).withTransactionIdStore(this.transactionIdStore).withLogEntryReader(new VersionAwareLogEntryReader(new TestCommandReaderFactory())).withStoreId(StoreId.UNKNOWN).build();
        this.life.add(build);
        BatchingTransactionAppender add = this.life.add(new BatchingTransactionAppender(build, this.logRotation, this.transactionMetadataCache, this.transactionIdStore, databaseHealth));
        this.life.start();
        final CountDownLatch countDownLatch = new CountDownLatch(10);
        LogAppendEvent.Empty empty = new LogAppendEvent.Empty() { // from class: org.neo4j.kernel.impl.transaction.log.BatchingTransactionAppenderConcurrencyTest.1
            public LogForceWaitEvent beginLogForceWait() {
                countDownLatch.countDown();
                DoubleLatch.awaitLatch(countDownLatch);
                return super.beginLogForceWait();
            }
        };
        Race race = new Race();
        for (int i = 0; i < 10; i++) {
            race.addContestant(() -> {
                try {
                    add.append(tx(), empty);
                    Assertions.fail("No transaction should be considered appended");
                } catch (IOException e) {
                }
            });
        }
        race.go();
    }

    @Test
    void databasePanicShouldHandleOutOfMemoryErrors() throws IOException, InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        OutOfMemoryAwareFileSystem outOfMemoryAwareFileSystem = new OutOfMemoryAwareFileSystem();
        this.life.add(new FileSystemLifecycleAdapter(outOfMemoryAwareFileSystem));
        SlowPanickingDatabaseHealth slowPanickingDatabaseHealth = new SlowPanickingDatabaseHealth(countDownLatch, countDownLatch2);
        LogFiles build = LogFilesBuilder.builder(this.databaseLayout, outOfMemoryAwareFileSystem).withLogVersionRepository(this.logVersionRepository).withTransactionIdStore(this.transactionIdStore).withLogEntryReader(new VersionAwareLogEntryReader(new TestCommandReaderFactory())).withStoreId(StoreId.UNKNOWN).build();
        this.life.add(build);
        BatchingTransactionAppender add = this.life.add(new BatchingTransactionAppender(build, this.logRotation, this.transactionMetadataCache, this.transactionIdStore, slowPanickingDatabaseHealth));
        this.life.start();
        add.append(tx(), LogAppendEvent.NULL);
        outOfMemoryAwareFileSystem.shouldOOM = true;
        Future submit = executor.submit(() -> {
            return Long.valueOf(add.append(tx(), LogAppendEvent.NULL));
        });
        countDownLatch.await();
        outOfMemoryAwareFileSystem.shouldOOM = false;
        try {
            add.append(tx(), new LogAppendEvent.Empty() { // from class: org.neo4j.kernel.impl.transaction.log.BatchingTransactionAppenderConcurrencyTest.2
                public LogForceWaitEvent beginLogForceWait() {
                    countDownLatch2.countDown();
                    return super.beginLogForceWait();
                }
            });
            Assertions.fail("Should have failed since database should have panicked");
        } catch (IOException e) {
            Assertions.assertTrue(e.getMessage().contains("The database has encountered a critical error"));
        }
        try {
            submit.get();
            Assertions.fail("Should have failed with OutOfMemoryError error");
        } catch (ExecutionException e2) {
            Assertions.assertTrue(e2.getCause() instanceof OutOfMemoryError);
        }
        VersionAwareLogEntryReader versionAwareLogEntryReader = new VersionAwareLogEntryReader(new TestCommandReaderFactory());
        MatcherAssert.assertThat(Long.valueOf(build.getLowestLogVersion()), Matchers.is(Long.valueOf(build.getHighestLogVersion())));
        PhysicalLogVersionedStoreChannel openForVersion = build.openForVersion(build.getHighestLogVersion());
        try {
            ReadAheadLogChannel readAheadLogChannel = new ReadAheadLogChannel(openForVersion);
            try {
                LogEntryCursor logEntryCursor = new LogEntryCursor(versionAwareLogEntryReader, readAheadLogChannel);
                long j = 0;
                while (logEntryCursor.next()) {
                    try {
                        if (logEntryCursor.get() instanceof LogEntryCommit) {
                            j++;
                        }
                    } catch (Throwable th) {
                        try {
                            logEntryCursor.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                        throw th;
                    }
                }
                MatcherAssert.assertThat(Long.valueOf(j), Matchers.is(1L));
                logEntryCursor.close();
                readAheadLogChannel.close();
                if (openForVersion != null) {
                    openForVersion.close();
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (openForVersion != null) {
                try {
                    openForVersion.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    protected TransactionToApply tx() {
        PhysicalTransactionRepresentation physicalTransactionRepresentation = new PhysicalTransactionRepresentation(Collections.singletonList(new TestCommand()));
        physicalTransactionRepresentation.setHeader(new byte[0], 0L, 0L, 0L, 0);
        return new TransactionToApply(physicalTransactionRepresentation);
    }

    private Runnable createForceAfterAppendRunnable(BatchingTransactionAppender batchingTransactionAppender) {
        return () -> {
            try {
                batchingTransactionAppender.forceAfterAppend(this.logAppendEvent);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        };
    }

    private static Predicate<StackWalker.StackFrame> failMethod(Class<?> cls, String str) {
        return stackFrame -> {
            return stackFrame.getClassName().equals(cls.getName()) && stackFrame.getMethodName().equals(str);
        };
    }

    private BatchingTransactionAppender createTransactionAppender() {
        return new BatchingTransactionAppender(this.logFiles, this.logRotation, this.transactionMetadataCache, this.transactionIdStore, this.databaseHealth);
    }
}
