package org.apache.iotdb.db.pipe.consensus.deletion.persist;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
import org.apache.iotdb.db.utils.MmapUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.class */
public class PageCacheDeletionBuffer implements DeletionBuffer {
    private static final double FSYNC_BUFFER_RATIO = 0.95d;
    private static final long MAX_WAIT_CLOSE_TIME_IN_MS = 10000;
    private final String dataRegionId;
    private final String baseDirectory;
    private final ExecutorService persistThread;
    private volatile ByteBuffer serializeBuffer;
    private volatile File logFile;
    private volatile FileOutputStream logStream;
    private volatile FileChannel logChannel;
    private static final Logger LOGGER = LoggerFactory.getLogger(PageCacheDeletionBuffer.class);
    private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
    private static final int QUEUE_CAPACITY = config.getDeletionAheadLogBufferQueueCapacity();
    public static int DAL_BUFFER_SIZE = config.getWalBufferSize() / 3;
    private final BlockingQueue<DeletionResource> deletionResources = new PriorityBlockingQueue(QUEUE_CAPACITY, (deletionResource, deletionResource2) -> {
        if (deletionResource.getProgressIndex().equals(deletionResource2.getProgressIndex())) {
            return 0;
        }
        return deletionResource.getProgressIndex().isAfter(deletionResource2.getProgressIndex()) ? 1 : -1;
    });
    private final Lock buffersLock = new ReentrantLock();
    private final AtomicInteger totalSize = new AtomicInteger(0);
    private final List<DeletionResource> pendingDeletionsInOneTask = new CopyOnWriteArrayList();
    private volatile boolean isClosed = false;
    private ProgressIndex maxProgressIndexInCurrentFile = MinimumProgressIndex.INSTANCE;

    /* loaded from: input_file:org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer$PersistTask.class */
    private class PersistTask implements Runnable {
        private final AtomicInteger currentTaskBatchSize;

        private PersistTask() {
            this.currentTaskBatchSize = new AtomicInteger(0);
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    persistDeletion();
                    if (PageCacheDeletionBuffer.this.isClosed) {
                        return;
                    }
                    PageCacheDeletionBuffer.this.persistThread.submit(new PersistTask());
                } catch (IOException e) {
                    PageCacheDeletionBuffer.LOGGER.warn("Deletion persist: Cannot write to {}, may cause data inconsistency.", PageCacheDeletionBuffer.this.logFile, e);
                    PageCacheDeletionBuffer.this.pendingDeletionsInOneTask.forEach(deletionResource -> {
                        deletionResource.onPersistFailed(e);
                    });
                    PageCacheDeletionBuffer.this.rollbackFileAttribute(this.currentTaskBatchSize.get());
                    if (PageCacheDeletionBuffer.this.isClosed) {
                        return;
                    }
                    PageCacheDeletionBuffer.this.persistThread.submit(new PersistTask());
                }
            } catch (Throwable th) {
                if (!PageCacheDeletionBuffer.this.isClosed) {
                    PageCacheDeletionBuffer.this.persistThread.submit(new PersistTask());
                }
                throw th;
            }
        }

        private boolean serializeDeletionToBatchBuffer(DeletionResource deletionResource) {
            PageCacheDeletionBuffer.LOGGER.debug("Deletion persist-{}: serialize deletion resource {}", PageCacheDeletionBuffer.this.dataRegionId, deletionResource);
            ByteBuffer serialize = deletionResource.serialize();
            if (serialize.position() > PageCacheDeletionBuffer.this.serializeBuffer.remaining()) {
                return false;
            }
            PageCacheDeletionBuffer.this.serializeBuffer.put(serialize.array());
            PageCacheDeletionBuffer.this.totalSize.addAndGet(serialize.position());
            this.currentTaskBatchSize.addAndGet(serialize.position());
            return true;
        }

        private void persistDeletion() throws IOException {
            try {
                DeletionResource deletionResource = (DeletionResource) PageCacheDeletionBuffer.this.deletionResources.take();
                serializeDeletionToBatchBuffer(deletionResource);
                PageCacheDeletionBuffer.this.pendingDeletionsInOneTask.add(deletionResource);
                PageCacheDeletionBuffer.this.maxProgressIndexInCurrentFile = PageCacheDeletionBuffer.this.maxProgressIndexInCurrentFile.updateToMinimumEqualOrIsAfterProgressIndex(deletionResource.getProgressIndex());
                while (PageCacheDeletionBuffer.this.totalSize.get() < PageCacheDeletionBuffer.DAL_BUFFER_SIZE * PageCacheDeletionBuffer.FSYNC_BUFFER_RATIO) {
                    DeletionResource deletionResource2 = null;
                    try {
                        deletionResource2 = (DeletionResource) PageCacheDeletionBuffer.this.deletionResources.poll(PageCacheDeletionBuffer.config.getWalAsyncModeFsyncDelayInMs(), TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        PageCacheDeletionBuffer.LOGGER.warn("Interrupted when waiting for taking WALEntry from blocking queue to serialize.");
                        Thread.currentThread().interrupt();
                    }
                    if (deletionResource2 == null) {
                        PageCacheDeletionBuffer.this.appendCurrentBatch();
                        PageCacheDeletionBuffer.this.fsyncCurrentLoggingFile();
                        PageCacheDeletionBuffer.this.resetTaskAttribute();
                        return;
                    } else {
                        if (!serializeDeletionToBatchBuffer(deletionResource2)) {
                            PageCacheDeletionBuffer.this.deletionResources.add(deletionResource2);
                            PageCacheDeletionBuffer.this.appendCurrentBatch();
                            PageCacheDeletionBuffer.this.closeCurrentLoggingFile();
                            PageCacheDeletionBuffer.this.resetTaskAttribute();
                            PageCacheDeletionBuffer.this.switchLoggingFile();
                            return;
                        }
                        PageCacheDeletionBuffer.this.pendingDeletionsInOneTask.add(deletionResource2);
                        PageCacheDeletionBuffer.this.maxProgressIndexInCurrentFile = PageCacheDeletionBuffer.this.maxProgressIndexInCurrentFile.updateToMinimumEqualOrIsAfterProgressIndex(deletionResource2.getProgressIndex());
                    }
                }
                if (PageCacheDeletionBuffer.this.totalSize.get() > 0) {
                    PageCacheDeletionBuffer.this.appendCurrentBatch();
                    PageCacheDeletionBuffer.this.closeCurrentLoggingFile();
                    PageCacheDeletionBuffer.this.resetTaskAttribute();
                    PageCacheDeletionBuffer.this.switchLoggingFile();
                }
            } catch (InterruptedException e2) {
                PageCacheDeletionBuffer.LOGGER.warn("Interrupted when waiting for taking DeletionResource from blocking queue to serialize.");
                Thread.currentThread().interrupt();
            }
        }
    }

    public PageCacheDeletionBuffer(String str, String str2) {
        this.dataRegionId = str;
        this.baseDirectory = str2;
        allocateBuffers();
        this.persistThread = IoTDBThreadPoolFactory.newSingleThreadExecutor(ThreadName.PIPE_CONSENSUS_DELETION_SERIALIZE.getName() + "(group-" + str + ")");
    }

    @Override // org.apache.iotdb.db.pipe.consensus.deletion.persist.DeletionBuffer
    public void start() {
        this.persistThread.submit(new PersistTask());
        try {
            this.logFile = new File(this.baseDirectory, String.format("_%d-%d%s", 0, 0, DeletionResourceManager.DELETION_FILE_SUFFIX));
            this.logStream = new FileOutputStream(this.logFile, true);
            this.logChannel = this.logStream.getChannel();
            if (!this.logFile.exists() || this.logFile.length() == 0) {
                this.logChannel.write(ByteBuffer.wrap(DeletionResourceManager.MAGIC_VERSION_V1.getBytes(StandardCharsets.UTF_8)));
            }
            LOGGER.info("Deletion persist-{}: starting to persist, current writing: {}", this.dataRegionId, this.logFile);
        } catch (IOException e) {
            LOGGER.warn("Deletion persist: Cannot create file {}, please check your file system manually.", this.logFile, e);
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.iotdb.db.pipe.consensus.deletion.persist.DeletionBuffer
    public boolean isAllDeletionFlushed() {
        this.buffersLock.lock();
        try {
            return this.deletionResources.isEmpty() && ((Integer) Optional.ofNullable(this.serializeBuffer).map((v0) -> {
                return v0.position();
            }).orElse(0)).intValue() == 0;
        } finally {
            this.buffersLock.unlock();
        }
    }

    private void allocateBuffers() {
        try {
            this.serializeBuffer = ByteBuffer.allocateDirect(DAL_BUFFER_SIZE);
        } catch (OutOfMemoryError e) {
            LOGGER.error("Fail to allocate deletionBuffer-group-{}'s buffer because out of memory.", this.dataRegionId, e);
            close();
            throw e;
        }
    }

    @Override // org.apache.iotdb.db.pipe.consensus.deletion.persist.DeletionBuffer
    public void registerDeletionResource(DeletionResource deletionResource) {
        if (this.isClosed) {
            LOGGER.error("Fail to register DeletionResource into deletionBuffer-{} because this buffer is closed.", this.dataRegionId);
        } else {
            this.deletionResources.add(deletionResource);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void appendCurrentBatch() throws IOException {
        this.serializeBuffer.flip();
        this.logChannel.write(this.serializeBuffer);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void fsyncCurrentLoggingFile() throws IOException {
        LOGGER.info("Deletion persist-{}: current batch fsync due to timeout", this.dataRegionId);
        this.logChannel.force(false);
        this.pendingDeletionsInOneTask.forEach((v0) -> {
            v0.onPersistSucceed();
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeCurrentLoggingFile() throws IOException {
        LOGGER.info("Deletion persist-{}: current file has been closed", this.dataRegionId);
        this.logStream.close();
        this.logChannel.close();
        this.pendingDeletionsInOneTask.forEach((v0) -> {
            v0.onPersistSucceed();
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void resetTaskAttribute() {
        this.pendingDeletionsInOneTask.clear();
        clearBuffer();
    }

    private void resetFileAttribute() {
        this.totalSize.set(0);
        this.maxProgressIndexInCurrentFile = MinimumProgressIndex.INSTANCE;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void rollbackFileAttribute(int i) {
        this.totalSize.addAndGet(-i);
    }

    private void clearBuffer() {
        this.buffersLock.lock();
        try {
            this.serializeBuffer.clear();
        } finally {
            this.buffersLock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void switchLoggingFile() throws IOException {
        try {
            SimpleProgressIndex extractLocalSimpleProgressIndex = ReplicateProgressDataNodeManager.extractLocalSimpleProgressIndex(this.maxProgressIndexInCurrentFile);
            if (!(extractLocalSimpleProgressIndex instanceof SimpleProgressIndex)) {
                throw new IOException("Invalid deletion progress index: " + extractLocalSimpleProgressIndex);
            }
            SimpleProgressIndex simpleProgressIndex = extractLocalSimpleProgressIndex;
            this.logFile = new File(this.baseDirectory, String.format("_%d-%d%s", Integer.valueOf(simpleProgressIndex.getRebootTimes()), Long.valueOf(simpleProgressIndex.getMemTableFlushOrderId()), DeletionResourceManager.DELETION_FILE_SUFFIX));
            this.logStream = new FileOutputStream(this.logFile, true);
            this.logChannel = this.logStream.getChannel();
            if (!this.logFile.exists() || this.logFile.length() == 0) {
                this.logChannel.write(ByteBuffer.wrap(DeletionResourceManager.MAGIC_VERSION_V1.getBytes(StandardCharsets.UTF_8)));
            }
            LOGGER.info("Deletion persist-{}: switching to a new file, current writing: {}", this.dataRegionId, this.logFile);
        } finally {
            resetFileAttribute();
        }
    }

    @Override // org.apache.iotdb.db.pipe.consensus.deletion.persist.DeletionBuffer, java.lang.AutoCloseable
    public void close() {
        this.isClosed = true;
        waitUntilFlushAllDeletionsOrTimeOut();
        if (this.persistThread != null) {
            this.persistThread.shutdownNow();
            try {
                if (!this.persistThread.awaitTermination(30L, TimeUnit.SECONDS)) {
                    LOGGER.warn("persistThread did not terminate within {}s", 30);
                }
            } catch (InterruptedException e) {
                LOGGER.warn("DAL Thread {} still doesn't exit after 30s", this.dataRegionId);
                Thread.currentThread().interrupt();
            }
        }
        MmapUtil.clean(this.serializeBuffer);
        this.serializeBuffer = null;
    }

    private void waitUntilFlushAllDeletionsOrTimeOut() {
        long currentTimeMillis = System.currentTimeMillis();
        while (!isAllDeletionFlushed() && System.currentTimeMillis() - currentTimeMillis < MAX_WAIT_CLOSE_TIME_IN_MS) {
            try {
                Thread.sleep(50L);
            } catch (InterruptedException e) {
                LOGGER.error("Interrupted when waiting for all deletions flushed.");
                Thread.currentThread().interrupt();
            }
        }
    }

    @TestOnly
    public static void setDalBufferSize(int i) {
        DAL_BUFFER_SIZE = i;
    }
}
