package org.apache.bookkeeper.bookie.storage.directentrylogger;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import io.kubernetes.client.openapi.models.V1GitRepoVolumeSource;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.stream.Collectors;
import org.apache.bookkeeper.bookie.AbstractLogCompactor;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.EntryLogMetadata;
import org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor;
import org.apache.bookkeeper.bookie.storage.CompactionEntryLog;
import org.apache.bookkeeper.bookie.storage.EntryLogIds;
import org.apache.bookkeeper.bookie.storage.EntryLogScanner;
import org.apache.bookkeeper.bookie.storage.EntryLogger;
import org.apache.bookkeeper.common.util.ExceptionMessageHelper;
import org.apache.bookkeeper.common.util.nativeio.NativeIO;
import org.apache.bookkeeper.slogger.Slogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.LedgerDirUtil;

/* loaded from: input_file:META-INF/bundled-dependencies/bookkeeper-server-4.17.1.1.jar:org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.class */
public class DirectEntryLogger implements EntryLogger {
    private final Slogger slog;
    private final File ledgerDir;
    private final EntryLogIds ids;
    private final ExecutorService writeExecutor;
    private final ExecutorService flushExecutor;
    private final long maxFileSize;
    private final DirectEntryLoggerStats stats;
    private final ByteBufAllocator allocator;
    private final BufferPool writeBuffers;
    private final int readBufferSize;
    private final int maxSaneEntrySize;
    private WriterWithMetadata curWriter;
    private final NativeIO nativeIO;
    private final ThreadLocal<Cache<Integer, LogReader>> caches;
    private static final int NUMBER_OF_WRITE_BUFFERS = 8;
    private final List<Cache<?, ?>> allCaches = new CopyOnWriteArrayList();
    private List<Future<?>> pendingFlushes = new ArrayList();
    private final Set<Integer> unflushedLogs = ConcurrentHashMap.newKeySet();

    public DirectEntryLogger(File file, EntryLogIds entryLogIds, NativeIO nativeIO, ByteBufAllocator byteBufAllocator, ExecutorService executorService, ExecutorService executorService2, long j, int i, long j2, long j3, int i2, int i3, int i4, Slogger slogger, StatsLogger statsLogger) throws IOException {
        this.ledgerDir = file;
        this.flushExecutor = executorService2;
        this.writeExecutor = executorService;
        this.nativeIO = nativeIO;
        this.maxFileSize = j;
        this.maxSaneEntrySize = i;
        this.readBufferSize = Buffer.nextAlignment(i2);
        this.ids = entryLogIds;
        this.slog = slogger.kv(V1GitRepoVolumeSource.SERIALIZED_NAME_DIRECTORY, file).ctx(DirectEntryLogger.class);
        this.stats = new DirectEntryLoggerStats(statsLogger);
        this.allocator = byteBufAllocator;
        int nextAlignment = Buffer.nextAlignment((int) (j2 / 8));
        this.writeBuffers = new BufferPool(nativeIO, byteBufAllocator, nextAlignment, 8);
        long j4 = j3 / i3;
        if (j4 < i2) {
            this.slog.kv("reason", "perThreadBufferSize lower than readBufferSize (causes immediate reader cache eviction)").kv("totalReadBufferSize", Long.valueOf(j3)).kv("totalNumReadThreads", Integer.valueOf(i3)).kv("readBufferSize", Integer.valueOf(i2)).kv("perThreadBufferSize", Long.valueOf(j4)).error(Events.ENTRYLOGGER_MISCONFIGURED);
        }
        long j5 = j4 / i2;
        this.slog.kv("maxFileSize", Long.valueOf(j)).kv("maxSaneEntrySize", Integer.valueOf(i)).kv("totalWriteBufferSize", Long.valueOf(j2)).kv("singleWriteBufferSize", Integer.valueOf(nextAlignment)).kv("totalReadBufferSize", Long.valueOf(j3)).kv("readBufferSize", Integer.valueOf(i2)).kv("perThreadBufferSize", Long.valueOf(j4)).kv("maxCachedReadersPerThread", Long.valueOf(j5)).kv("maxCachedReaders", Long.valueOf(j5 * i3)).info(Events.ENTRYLOGGER_CREATED);
        this.caches = ThreadLocal.withInitial(() -> {
            Cache<?, ?> build = CacheBuilder.newBuilder().maximumWeight(j4).weigher((obj, obj2) -> {
                return i2;
            }).removalListener(removalNotification -> {
                try {
                    ((LogReader) removalNotification.getValue()).close();
                    this.stats.getCloseReaderCounter().inc();
                } catch (IOException e) {
                    this.slog.kv("logID", removalNotification.getKey()).error(Events.READER_CLOSE_ERROR);
                }
            }).expireAfterAccess(i4, TimeUnit.SECONDS).concurrencyLevel(1).build();
            this.allCaches.add(build);
            return build;
        });
    }

    @Override // org.apache.bookkeeper.bookie.storage.EntryLogger
    public long addEntry(long j, ByteBuf byteBuf) throws IOException {
        long addEntry;
        long nanoTime = System.nanoTime();
        synchronized (this) {
            if (this.curWriter != null && this.curWriter.shouldRoll(byteBuf, this.maxFileSize)) {
                flushAndCloseCurrent();
                this.curWriter = null;
            }
            if (this.curWriter == null) {
                int nextId = this.ids.nextId();
                this.curWriter = new WriterWithMetadata(newDirectWriter(nextId), new EntryLogMetadata(nextId), this.allocator);
                this.slog.kv("newLogId", Integer.valueOf(nextId)).info(Events.LOG_ROLL);
            }
            addEntry = this.curWriter.addEntry(j, byteBuf);
        }
        this.stats.getAddEntryStats().registerSuccessfulEvent(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
        return addEntry;
    }

    @Override // org.apache.bookkeeper.bookie.storage.EntryLogger
    public ByteBuf readEntry(long j) throws IOException, Bookie.NoEntryException {
        return internalReadEntry(-1L, -1L, j, false);
    }

    @Override // org.apache.bookkeeper.bookie.storage.EntryLogger
    public ByteBuf readEntry(long j, long j2, long j3) throws IOException, Bookie.NoEntryException {
        return internalReadEntry(j, j2, j3, true);
    }

    private LogReader getReader(int i) throws IOException {
        try {
            LogReader logReader = this.caches.get().get(Integer.valueOf(i), () -> {
                this.stats.getOpenReaderCounter().inc();
                return newDirectReader(i);
            });
            if (!logReader.isClosed()) {
                return logReader;
            }
            this.stats.getCachedReadersServedClosedCounter().inc();
            throw new IOException(ExceptionMessageHelper.exMsg("Cached reader already closed").kv("logId", Integer.valueOf(i)).toString());
        } catch (ExecutionException e) {
            if (e.getCause() instanceof IOException) {
                throw ((IOException) e.getCause());
            }
            throw new IOException(ExceptionMessageHelper.exMsg("Error loading reader in cache").kv("logId", Integer.valueOf(i)).toString(), e);
        }
    }

    private ByteBuf internalReadEntry(long j, long j2, long j3, boolean z) throws IOException, Bookie.NoEntryException {
        int i = (int) (j3 >> 32);
        int i2 = (int) (j3 & (-1));
        long nanoTime = System.nanoTime();
        try {
            ByteBuf readEntryAt = getReader(i).readEntryAt(i2);
            if (z) {
                long j4 = readEntryAt.getLong(0);
                long j5 = readEntryAt.getLong(8);
                if (j4 != j || j5 != j2) {
                    ReferenceCountUtil.release(readEntryAt);
                    throw new IOException(ExceptionMessageHelper.exMsg("Bad location").kv("location", Long.valueOf(j3)).kv("expectedLedger", Long.valueOf(j)).kv("expectedEntry", Long.valueOf(j2)).kv("foundLedger", Long.valueOf(j4)).kv("foundEntry", Long.valueOf(j5)).toString());
                }
            }
            this.stats.getReadEntryStats().registerSuccessfulEvent(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
            return readEntryAt;
        } catch (EOFException e) {
            this.stats.getReadEntryStats().registerFailedEvent(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
            throw new Bookie.NoEntryException(ExceptionMessageHelper.exMsg("Entry location doesn't exist").kv("location", Long.valueOf(j3)).toString(), j, j2);
        }
    }

    @Override // org.apache.bookkeeper.bookie.storage.EntryLogger
    public void flush() throws IOException {
        List<Future<?>> list;
        long nanoTime = System.nanoTime();
        Future<?> flushCurrent = flushCurrent();
        synchronized (this) {
            list = this.pendingFlushes;
            this.pendingFlushes = new ArrayList();
        }
        list.add(flushCurrent);
        Iterator<Future<?>> it2 = list.iterator();
        while (it2.hasNext()) {
            try {
                it2.next().get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IOException("Interruped while flushing", e);
            } catch (ExecutionException e2) {
                if (!(e2.getCause() instanceof IOException)) {
                    throw new IOException("Exception flushing writer", e2);
                }
                throw ((IOException) e2.getCause());
            }
        }
        this.stats.getFlushStats().registerSuccessfulEvent(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
    }

    private Future<?> flushCurrent() throws IOException {
        WriterWithMetadata writerWithMetadata;
        synchronized (this) {
            writerWithMetadata = this.curWriter;
        }
        return writerWithMetadata != null ? this.flushExecutor.submit(() -> {
            long nanoTime = System.nanoTime();
            try {
                writerWithMetadata.flush();
                this.stats.getWriterFlushStats().registerSuccessfulEvent(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
                return null;
            } catch (Throwable th) {
                this.stats.getWriterFlushStats().registerFailedEvent(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
                throw th;
            }
        }) : CompletableFuture.completedFuture(null);
    }

    private void flushAndCloseCurrent() throws IOException {
        WriterWithMetadata writerWithMetadata;
        CompletableFuture completableFuture = new CompletableFuture();
        synchronized (this) {
            writerWithMetadata = this.curWriter;
            this.curWriter = null;
            this.pendingFlushes.add(completableFuture);
        }
        if (writerWithMetadata != null) {
            this.flushExecutor.execute(() -> {
                long nanoTime = System.nanoTime();
                try {
                    writerWithMetadata.finalizeAndClose();
                    this.stats.getWriterFlushStats().registerSuccessfulEvent(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
                    this.unflushedLogs.remove(Integer.valueOf(writerWithMetadata.logId()));
                    completableFuture.complete(null);
                } catch (Throwable th) {
                    this.stats.getWriterFlushStats().registerFailedEvent(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
                    completableFuture.completeExceptionally(th);
                }
            });
        } else {
            completableFuture.complete(null);
        }
    }

    @Override // org.apache.bookkeeper.bookie.storage.EntryLogger, java.lang.AutoCloseable
    public void close() throws IOException {
        flushAndCloseCurrent();
        flush();
        Iterator<Cache<?, ?>> it2 = this.allCaches.iterator();
        while (it2.hasNext()) {
            it2.next().invalidateAll();
        }
        this.writeBuffers.close();
    }

    @Override // org.apache.bookkeeper.bookie.storage.EntryLogger
    public Collection<Long> getFlushedLogIds() {
        return (Collection) LedgerDirUtil.logIdsInDirectory(this.ledgerDir).stream().filter(num -> {
            return !this.unflushedLogs.contains(num);
        }).map(num2 -> {
            return Long.valueOf(num2.intValue());
        }).collect(Collectors.toList());
    }

    @Override // org.apache.bookkeeper.bookie.storage.EntryLogger
    public boolean removeEntryLog(long j) {
        Preconditions.checkArgument(j < 2147483647L, "Entry log id must be an int [%d]", j);
        File logFile = logFile(this.ledgerDir, (int) j);
        boolean delete = logFile.delete();
        this.slog.kv("file", logFile).kv("logId", Long.valueOf(j)).kv("result", Boolean.valueOf(delete)).info(Events.LOG_DELETED);
        return delete;
    }

    @Override // org.apache.bookkeeper.bookie.storage.EntryLogger
    public void scanEntryLog(long j, EntryLogScanner entryLogScanner) throws IOException {
        Preconditions.checkArgument(j < 2147483647L, "Entry log id must be an int [%d]", j);
        LogReader newDirectReader = newDirectReader((int) j);
        Throwable th = null;
        try {
            try {
                LogReaderScan.scan(this.allocator, newDirectReader, entryLogScanner);
                if (newDirectReader != null) {
                    if (0 == 0) {
                        newDirectReader.close();
                        return;
                    }
                    try {
                        newDirectReader.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (newDirectReader != null) {
                if (th != null) {
                    try {
                        newDirectReader.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    newDirectReader.close();
                }
            }
            throw th4;
        }
    }

    @Override // org.apache.bookkeeper.bookie.storage.EntryLogger
    public boolean logExists(long j) {
        Preconditions.checkArgument(j < 2147483647L, "Entry log id must be an int [%d]", j);
        return logFile(this.ledgerDir, (int) j).exists();
    }

    @Override // org.apache.bookkeeper.bookie.storage.EntryLogger
    public EntryLogMetadata getEntryLogMetadata(long j, AbstractLogCompactor.Throttler throttler) throws IOException {
        try {
            return readEntryLogIndex(j);
        } catch (IOException e) {
            this.slog.kv("entryLogId", Long.valueOf(j)).kv("reason", e.getMessage()).info(Events.READ_METADATA_FALLBACK);
            return scanEntryLogMetadata(j, throttler);
        }
    }

    @VisibleForTesting
    EntryLogMetadata readEntryLogIndex(long j) throws IOException {
        Preconditions.checkArgument(j < 2147483647L, "Entry log id must be an int [%d]", j);
        LogReader newDirectReader = newDirectReader((int) j);
        Throwable th = null;
        try {
            try {
                EntryLogMetadata read = LogMetadata.read(newDirectReader);
                if (newDirectReader != null) {
                    if (0 != 0) {
                        try {
                            newDirectReader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        newDirectReader.close();
                    }
                }
                return read;
            } finally {
            }
        } catch (Throwable th3) {
            if (newDirectReader != null) {
                if (th != null) {
                    try {
                        newDirectReader.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    newDirectReader.close();
                }
            }
            throw th3;
        }
    }

    @VisibleForTesting
    EntryLogMetadata scanEntryLogMetadata(long j, final AbstractLogCompactor.Throttler throttler) throws IOException {
        final EntryLogMetadata entryLogMetadata = new EntryLogMetadata(j);
        scanEntryLog(j, new EntryLogScanner() { // from class: org.apache.bookkeeper.bookie.storage.directentrylogger.DirectEntryLogger.1
            @Override // org.apache.bookkeeper.bookie.storage.EntryLogScanner
            public void process(long j2, long j3, ByteBuf byteBuf) throws IOException {
                if (throttler != null) {
                    throttler.acquire(byteBuf.readableBytes());
                }
                entryLogMetadata.addLedgerSize(j2, byteBuf.readableBytes() + 4);
            }

            @Override // org.apache.bookkeeper.bookie.storage.EntryLogScanner
            public boolean accept(long j2) {
                return j2 >= 0;
            }
        });
        return entryLogMetadata;
    }

    @VisibleForTesting
    LogReader newDirectReader(int i) throws IOException {
        return new DirectReader(i, logFilename(this.ledgerDir, i), this.allocator, this.nativeIO, this.readBufferSize, this.maxSaneEntrySize, this.stats.getReadBlockStats());
    }

    private LogWriter newDirectWriter(int i) throws IOException {
        this.unflushedLogs.add(Integer.valueOf(i));
        DirectWriter directWriter = new DirectWriter(i, logFilename(this.ledgerDir, i), this.maxFileSize, this.writeExecutor, this.writeBuffers, this.nativeIO, this.slog);
        ByteBuf buffer = this.allocator.buffer(4096);
        try {
            Header.writeEmptyHeader(buffer);
            directWriter.writeAt(0L, buffer);
            directWriter.position(buffer.capacity());
            ReferenceCountUtil.release(buffer);
            return directWriter;
        } catch (Throwable th) {
            ReferenceCountUtil.release(buffer);
            throw th;
        }
    }

    public static File logFile(File file, int i) {
        return new File(file, Long.toHexString(i) + EntryLogger.LOG_FILE_SUFFIX);
    }

    public static String logFilename(File file, int i) {
        return logFile(file, i).toString();
    }

    @Override // org.apache.bookkeeper.bookie.storage.EntryLogger
    public CompactionEntryLog newCompactionLog(long j) throws IOException {
        return DirectCompactionEntryLog.newLog((int) j, this.ids.nextId(), this.ledgerDir, this.maxFileSize, this.writeExecutor, this.writeBuffers, this.nativeIO, this.allocator, this.slog);
    }

    @Override // org.apache.bookkeeper.bookie.storage.EntryLogger
    public Collection<CompactionEntryLog> incompleteCompactionLogs() {
        File[] listFiles;
        ArrayList arrayList = new ArrayList();
        if (this.ledgerDir.exists() && this.ledgerDir.isDirectory() && (listFiles = this.ledgerDir.listFiles()) != null && listFiles.length > 0) {
            for (File file : listFiles) {
                if (file.getName().endsWith(TransactionalEntryLogCompactor.COMPACTING_SUFFIX)) {
                    try {
                        Files.deleteIfExists(file.toPath());
                    } catch (IOException e) {
                        this.slog.kv("file", file).warn(Events.COMPACTION_DELETE_FAILURE);
                    }
                }
                Matcher matcher = LedgerDirUtil.COMPACTED_FILE_PATTERN.matcher(file.getName());
                if (matcher.matches()) {
                    arrayList.add(DirectCompactionEntryLog.recoverLog(Integer.parseUnsignedInt(matcher.group(2), 16), Integer.parseUnsignedInt(matcher.group(1), 16), this.ledgerDir, this.readBufferSize, this.maxSaneEntrySize, this.nativeIO, this.allocator, this.stats.getReadBlockStats(), this.slog));
                }
            }
        }
        return arrayList;
    }
}
