package org.apache.iotdb.db.pipe.receiver.protocol.pipeconsensus;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
import org.apache.iotdb.commons.pipe.connector.payload.pipeconsensus.request.PipeConsensusRequestType;
import org.apache.iotdb.commons.pipe.connector.payload.pipeconsensus.request.PipeConsensusRequestVersion;
import org.apache.iotdb.commons.pipe.connector.payload.pipeconsensus.request.PipeConsensusTransferFilePieceReq;
import org.apache.iotdb.commons.pipe.connector.payload.pipeconsensus.response.PipeConsensusTransferFilePieceResp;
import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiverAgent;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.consensus.pipe.PipeConsensus;
import org.apache.iotdb.consensus.pipe.PipeConsensusServerImpl;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
import org.apache.iotdb.consensus.pipe.thrift.TCommitId;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.exception.load.LoadFileException;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusDeleteNodeReq;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletBinaryReq;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletInsertNodeReq;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFilePieceReq;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealReq;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealWithModReq;
import org.apache.iotdb.db.pipe.consensus.metric.PipeConsensusReceiverMetrics;
import org.apache.iotdb.db.pipe.event.common.tsfile.aggregator.TsFileInsertionPointCounter;
import org.apache.iotdb.db.queryengine.plan.expression.multi.builtin.helper.SubStringFunctionHelper;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
import org.apache.iotdb.db.storageengine.load.LoadTsFileManager;
import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.class */
public class PipeConsensusReceiver {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeConsensusReceiver.class);
    private static final IoTDBConfig IOTDB_CONFIG = IoTDBDescriptor.getInstance().getConfig();
    private static final long PIPE_CONSENSUS_RECEIVER_MAX_WAITING_TIME_IN_MS = (IOTDB_CONFIG.getConnectionTimeoutInMS() / 6) * IOTDB_CONFIG.getIotConsensusV2PipelineSize();
    private static final long CLOSE_TSFILE_WRITER_MAX_WAIT_TIME_IN_MS = 5000;
    private static final long RETRY_WAIT_TIME = 500;
    private final RequestExecutor requestExecutor;
    private final PipeConsensus pipeConsensus;
    private final ConsensusGroupId consensusGroupId;
    private final ConsensusPipeName consensusPipeName;
    private final PipeConsensusTsFileWriterPool pipeConsensusTsFileWriterPool;
    private Future<?> tsFileWriterCheckerFuture;
    private final FolderManager folderManager;
    private final ScheduledExecutorService scheduledTsFileWriterCheckerPool = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(ThreadName.PIPE_CONSENSUS_TSFILE_WRITER_CHECKER.getName());
    private final List<String> receiveDirs = new ArrayList();
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private final ReadWriteLock tsFilePieceReadWriteLock = new ReentrantReadWriteLock(true);
    private final PipeConsensusReceiverMetrics pipeConsensusReceiverMetrics = new PipeConsensusReceiverMetrics(this);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.iotdb.db.pipe.receiver.protocol.pipeconsensus.PipeConsensusReceiver$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$iotdb$commons$pipe$connector$payload$pipeconsensus$request$PipeConsensusRequestType = new int[PipeConsensusRequestType.values().length];

        static {
            try {
                $SwitchMap$org$apache$iotdb$commons$pipe$connector$payload$pipeconsensus$request$PipeConsensusRequestType[PipeConsensusRequestType.TRANSFER_TS_FILE_PIECE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$iotdb$commons$pipe$connector$payload$pipeconsensus$request$PipeConsensusRequestType[PipeConsensusRequestType.TRANSFER_TS_FILE_PIECE_WITH_MOD.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$iotdb$commons$pipe$connector$payload$pipeconsensus$request$PipeConsensusRequestType[PipeConsensusRequestType.TRANSFER_TS_FILE_SEAL.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$iotdb$commons$pipe$connector$payload$pipeconsensus$request$PipeConsensusRequestType[PipeConsensusRequestType.TRANSFER_TS_FILE_SEAL_WITH_MOD.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$iotdb$commons$pipe$connector$payload$pipeconsensus$request$PipeConsensusRequestType[PipeConsensusRequestType.TRANSFER_DELETION.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$iotdb$commons$pipe$connector$payload$pipeconsensus$request$PipeConsensusRequestType[PipeConsensusRequestType.TRANSFER_TABLET_BINARY.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$iotdb$commons$pipe$connector$payload$pipeconsensus$request$PipeConsensusRequestType[PipeConsensusRequestType.TRANSFER_TABLET_INSERT_NODE.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$iotdb$commons$pipe$connector$payload$pipeconsensus$request$PipeConsensusRequestType[PipeConsensusRequestType.TRANSFER_TABLET_BATCH.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver$PipeConsensusTsFileWriter.class */
    public class PipeConsensusTsFileWriter {
        private final ConsensusPipeName consensusPipeName;
        private final int index;
        private File localWritingDir;
        private File writingFile;
        private RandomAccessFile writingFileWriter;
        private volatile boolean isUsed = false;
        private volatile TCommitId commitIdOfCorrespondingHolderEvent;
        private long lastUsedTs;

        public PipeConsensusTsFileWriter(int i, ConsensusPipeName consensusPipeName) {
            this.index = i;
            this.consensusPipeName = consensusPipeName;
        }

        public void rollToNextWritingPath() throws IOException, DiskSpaceInsufficientException {
            try {
                String receiverFileBaseDir = PipeConsensusReceiver.this.getReceiverFileBaseDir();
                if (Objects.isNull(receiverFileBaseDir)) {
                    PipeConsensusReceiver.LOGGER.warn("PipeConsensus-PipeName-{}: Failed to get pipeConsensus receiver file base directory, because folderManager is null. May because the disk is full.", this.consensusPipeName.toString());
                    throw new DiskSpaceInsufficientException(PipeConsensusReceiver.this.receiveDirs);
                }
                String str = receiverFileBaseDir + File.separator + this.index;
                this.localWritingDir = new File(str);
                PipeConsensusReceiver.this.deleteFileOrDirectoryIfExists(this.localWritingDir, String.format("TsFileWriter-%s roll to new dir and delete last writing dir", Integer.valueOf(this.index)));
                if (this.localWritingDir.mkdirs()) {
                    PipeConsensusReceiver.LOGGER.info("PipeConsensus-PipeName-{}: tsfileWriter-{} roll to writing path {}", new Object[]{this.consensusPipeName, Integer.valueOf(this.index), str});
                } else {
                    PipeConsensusReceiver.LOGGER.warn("PipeConsensus-PipeName-{}: Failed to create receiver tsFileWriter-{} file dir {}. May because authority or dir already exists etc.", new Object[]{this.consensusPipeName, Integer.valueOf(this.index), this.localWritingDir.getPath()});
                    throw new IOException(String.format("PipeConsensus-PipeName-%s: Failed to create tsFileWriter-%d receiver file dir %s. May because authority or dir already exists etc.", this.consensusPipeName, Integer.valueOf(this.index), this.localWritingDir.getPath()));
                }
            } catch (Exception e) {
                PipeConsensusReceiver.LOGGER.warn("Failed to init pipeConsensus receiver file folder manager because all disks of folders are full.", e);
                throw e;
            }
        }

        public File getLocalWritingDir() {
            return this.localWritingDir;
        }

        public File getWritingFile() {
            return this.writingFile;
        }

        public void setWritingFile(File file) {
            this.writingFile = file;
            if (file == null) {
                PipeConsensusReceiver.LOGGER.info("PipeConsensus-{}: TsFileWriter-{} set null writing file", this.consensusPipeName.toString(), Integer.valueOf(this.index));
            }
        }

        public RandomAccessFile getWritingFileWriter() {
            return this.writingFileWriter;
        }

        public void setWritingFileWriter(RandomAccessFile randomAccessFile) {
            this.writingFileWriter = randomAccessFile;
            if (randomAccessFile == null) {
                PipeConsensusReceiver.LOGGER.info("PipeConsensus-{}: TsFileWriter-{} set null writing file writer", this.consensusPipeName.toString(), Integer.valueOf(this.index));
            }
        }

        public TCommitId getCommitIdOfCorrespondingHolderEvent() {
            return this.commitIdOfCorrespondingHolderEvent;
        }

        public void setCommitIdOfCorrespondingHolderEvent(TCommitId tCommitId) {
            this.commitIdOfCorrespondingHolderEvent = tCommitId;
        }

        public boolean isUsed() {
            return this.isUsed;
        }

        public void setUsed(boolean z) {
            this.isUsed = z;
            if (this.isUsed) {
                this.lastUsedTs = System.currentTimeMillis();
            }
        }

        public void returnSelf(ConsensusPipeName consensusPipeName) throws DiskSpaceInsufficientException, IOException {
            if (PipeConsensusReceiver.this.receiveDirs.size() > 1) {
                rollToNextWritingPath();
            }
            this.commitIdOfCorrespondingHolderEvent = null;
            this.isUsed = false;
            PipeConsensusReceiver.LOGGER.info("PipeConsensus-PipeName-{}: tsFileWriter-{} returned self", consensusPipeName.toString(), Integer.valueOf(this.index));
        }

        public void closeSelf(ConsensusPipeName consensusPipeName) {
            if (this.writingFileWriter != null) {
                try {
                    this.writingFileWriter.close();
                    PipeConsensusReceiver.LOGGER.info("PipeConsensus-PipeName-{}: TsFileWriter-{} exit: Writing file writer was closed.", consensusPipeName.toString(), Integer.valueOf(this.index));
                } catch (Exception e) {
                    PipeConsensusReceiver.LOGGER.warn("PipeConsensus-PipeName-{}: TsFileWriter-{} exit: Close Writing file writer error.", new Object[]{consensusPipeName, Integer.valueOf(this.index), e});
                }
                setWritingFileWriter(null);
            } else if (PipeConsensusReceiver.LOGGER.isDebugEnabled()) {
                PipeConsensusReceiver.LOGGER.debug("PipeConsensus-PipeName-{}: TsFileWriter-{} exit: Writing file writer is null. No need to close.", consensusPipeName.toString(), Integer.valueOf(this.index));
            }
            if (this.writingFile != null) {
                PipeConsensusReceiver.this.deleteFileOrDirectoryIfExists(this.writingFile, String.format("TsFileWriter-%s exit: delete writing file", Integer.valueOf(this.index)));
                setWritingFile(null);
            } else if (PipeConsensusReceiver.LOGGER.isDebugEnabled()) {
                PipeConsensusReceiver.LOGGER.debug("PipeConsensus-PipeName-{}: TsFileWriter exit: Writing file is null. No need to delete.", consensusPipeName.toString());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver$PipeConsensusTsFileWriterPool.class */
    public class PipeConsensusTsFileWriterPool {
        private final Lock lock = new ReentrantLock();
        private final List<PipeConsensusTsFileWriter> pipeConsensusTsFileWriterPool = new ArrayList();
        private final ConsensusPipeName consensusPipeName;

        public PipeConsensusTsFileWriterPool(ConsensusPipeName consensusPipeName) throws DiskSpaceInsufficientException, IOException {
            this.consensusPipeName = consensusPipeName;
            for (int i = 0; i < PipeConsensusReceiver.IOTDB_CONFIG.getIotConsensusV2PipelineSize(); i++) {
                PipeConsensusTsFileWriter pipeConsensusTsFileWriter = new PipeConsensusTsFileWriter(i, consensusPipeName);
                pipeConsensusTsFileWriter.rollToNextWritingPath();
                this.pipeConsensusTsFileWriterPool.add(pipeConsensusTsFileWriter);
            }
            PipeConsensusReceiver.this.tsFileWriterCheckerFuture = ScheduledExecutorUtil.safelyScheduleWithFixedDelay(PipeConsensusReceiver.this.scheduledTsFileWriterCheckerPool, this::checkZombieTsFileWriter, 0L, PipeConsensusReceiver.IOTDB_CONFIG.getTsFileWriterCheckInterval(), TimeUnit.MILLISECONDS);
            PipeConsensusReceiver.LOGGER.info("Register {} with interval in seconds {} successfully.", ThreadName.PIPE_CONSENSUS_TSFILE_WRITER_CHECKER.getName(), Long.valueOf(PipeConsensusReceiver.IOTDB_CONFIG.getTsFileWriterCheckInterval()));
        }

        public PipeConsensusTsFileWriter borrowCorrespondingWriter(TCommitId tCommitId) {
            Optional<PipeConsensusTsFileWriter> findFirst = this.pipeConsensusTsFileWriterPool.stream().filter(pipeConsensusTsFileWriter -> {
                return pipeConsensusTsFileWriter.isUsed() && Objects.equals(tCommitId, pipeConsensusTsFileWriter.getCommitIdOfCorrespondingHolderEvent());
            }).findFirst();
            if (!findFirst.isPresent()) {
                this.lock.lock();
                while (!findFirst.isPresent()) {
                    try {
                        try {
                            findFirst = this.pipeConsensusTsFileWriterPool.stream().filter(pipeConsensusTsFileWriter2 -> {
                                return !pipeConsensusTsFileWriter2.isUsed();
                            }).findFirst();
                            Thread.sleep(PipeConsensusReceiver.RETRY_WAIT_TIME);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            PipeConsensusReceiver.LOGGER.warn("PipeConsensus{}: receiver thread get interrupted when waiting for borrowing tsFileWriter.", this.consensusPipeName);
                            this.lock.unlock();
                        }
                    } catch (Throwable th) {
                        this.lock.unlock();
                        throw th;
                    }
                }
                findFirst.get().setUsed(true);
                findFirst.get().setCommitIdOfCorrespondingHolderEvent(tCommitId);
                this.lock.unlock();
            }
            return findFirst.get();
        }

        private void checkZombieTsFileWriter() {
            this.pipeConsensusTsFileWriterPool.stream().filter((v0) -> {
                return v0.isUsed();
            }).forEach(pipeConsensusTsFileWriter -> {
                if (System.currentTimeMillis() - pipeConsensusTsFileWriter.lastUsedTs >= PipeConsensusReceiver.IOTDB_CONFIG.getTsFileWriterZombieThreshold()) {
                    try {
                        pipeConsensusTsFileWriter.closeSelf(this.consensusPipeName);
                        pipeConsensusTsFileWriter.returnSelf(this.consensusPipeName);
                        PipeConsensusReceiver.LOGGER.info("PipeConsensus-PipeName-{}: tsfile writer-{} is cleaned up because no new requests were received for too long.", this.consensusPipeName, Integer.valueOf(pipeConsensusTsFileWriter.index));
                    } catch (IOException | DiskSpaceInsufficientException e) {
                        PipeConsensusReceiver.LOGGER.warn("PipeConsensus-PipeName-{}: receiver watch dog failed to return tsFileWriter-{}.", new Object[]{this.consensusPipeName.toString(), Integer.valueOf(pipeConsensusTsFileWriter.index), e});
                    }
                }
            });
        }

        public void releaseAllWriters(ConsensusPipeName consensusPipeName) {
            this.pipeConsensusTsFileWriterPool.forEach(pipeConsensusTsFileWriter -> {
                long currentTimeMillis = System.currentTimeMillis();
                while (System.currentTimeMillis() - currentTimeMillis < PipeConsensusReceiver.CLOSE_TSFILE_WRITER_MAX_WAIT_TIME_IN_MS && pipeConsensusTsFileWriter.isUsed()) {
                    try {
                        Thread.sleep(PipeConsensusReceiver.RETRY_WAIT_TIME);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        PipeConsensusReceiver.LOGGER.warn("PipeConsensus-PipeName-{}: receiver thread get interrupted when exiting.", consensusPipeName.toString());
                    }
                }
                try {
                    pipeConsensusTsFileWriter.closeSelf(consensusPipeName);
                    pipeConsensusTsFileWriter.returnSelf(consensusPipeName);
                } catch (IOException | DiskSpaceInsufficientException e2) {
                    PipeConsensusReceiver.LOGGER.warn("PipeConsensus-PipeName-{}: receiver thread failed to return tsFileWriter-{} when exiting.", new Object[]{consensusPipeName.toString(), Integer.valueOf(pipeConsensusTsFileWriter.index), e2});
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver$RequestExecutor.class */
    public class RequestExecutor {
        private static final String THIS_NODE = "this node";
        private static final String PIPE_TASK = "pipe task";
        private final PipeConsensusReceiverMetrics metric;
        private final PipeConsensusTsFileWriterPool tsFileWriterPool;
        private final AtomicInteger WALEventCount = new AtomicInteger(0);
        private final AtomicInteger tsFileEventCount = new AtomicInteger(0);
        private volatile long onSyncedReplicateIndex = 0;
        private volatile int connectorRebootTimes = 0;
        private volatile int pipeTaskRestartTimes = 0;
        private final TreeSet<RequestMeta> reqExecutionOrderBuffer = new TreeSet<>(Comparator.comparingInt((v0) -> {
            return v0.getDataNodeRebootTimes();
        }).thenComparingLong((v0) -> {
            return v0.getReplicateIndex();
        }));
        private final Lock lock = new ReentrantLock();
        private final Condition condition = this.lock.newCondition();

        public RequestExecutor(PipeConsensusReceiverMetrics pipeConsensusReceiverMetrics, PipeConsensusTsFileWriterPool pipeConsensusTsFileWriterPool) {
            this.metric = pipeConsensusReceiverMetrics;
            this.tsFileWriterPool = pipeConsensusTsFileWriterPool;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public TPipeConsensusTransferResp onRequest(TPipeConsensusTransferReq tPipeConsensusTransferReq, boolean z, boolean z2) {
            long nanoTime = System.nanoTime();
            this.lock.lock();
            try {
                if (PipeConsensusReceiver.this.isClosed.get()) {
                    TPipeConsensusTransferResp closedResp = PipeConsensusReceiverAgent.closedResp(PipeConsensusReceiver.this.consensusPipeName.toString(), tPipeConsensusTransferReq.getCommitId());
                    this.condition.signalAll();
                    this.lock.unlock();
                    return closedResp;
                }
                long nanoTime2 = System.nanoTime();
                this.metric.recordAcquireExecutorLockTimer(nanoTime2 - nanoTime);
                TCommitId commitId = tPipeConsensusTransferReq.getCommitId();
                RequestMeta requestMeta = new RequestMeta(commitId);
                PipeConsensusReceiver.LOGGER.info("PipeConsensus-PipeName-{}: start to receive no.{} event", PipeConsensusReceiver.this.consensusPipeName, commitId);
                if (commitId.getDataNodeRebootTimes() < this.connectorRebootTimes) {
                    TPipeConsensusTransferResp deprecatedResp = deprecatedResp(THIS_NODE);
                    this.condition.signalAll();
                    this.lock.unlock();
                    return deprecatedResp;
                }
                if (commitId.getDataNodeRebootTimes() == this.connectorRebootTimes && commitId.getPipeTaskRestartTimes() < this.pipeTaskRestartTimes) {
                    TPipeConsensusTransferResp deprecatedResp2 = deprecatedResp(PIPE_TASK);
                    this.condition.signalAll();
                    this.lock.unlock();
                    return deprecatedResp2;
                }
                if (commitId.getDataNodeRebootTimes() > this.connectorRebootTimes) {
                    resetWithNewestRebootTime(commitId.getDataNodeRebootTimes());
                }
                if (commitId.getPipeTaskRestartTimes() > this.pipeTaskRestartTimes) {
                    resetWithNewestRestartTime(commitId.getPipeTaskRestartTimes());
                }
                if (z && !this.reqExecutionOrderBuffer.contains(requestMeta)) {
                    this.tsFileEventCount.incrementAndGet();
                }
                if (!z2 && !z) {
                    this.WALEventCount.incrementAndGet();
                }
                this.reqExecutionOrderBuffer.add(requestMeta);
                if (z) {
                    long nanoTime3 = System.nanoTime();
                    this.metric.recordDispatchWaitingTimer(nanoTime3 - nanoTime2);
                    requestMeta.setStartApplyNanos(nanoTime3);
                    this.condition.signalAll();
                    this.lock.unlock();
                    return null;
                }
                if (this.reqExecutionOrderBuffer.size() >= PipeConsensusReceiver.IOTDB_CONFIG.getIotConsensusV2PipelineSize() && !this.reqExecutionOrderBuffer.first().equals(requestMeta)) {
                    this.condition.signalAll();
                }
                while (true) {
                    if (this.reqExecutionOrderBuffer.first().equals(requestMeta) && commitId.getReplicateIndex() == this.onSyncedReplicateIndex + 1) {
                        long nanoTime4 = System.nanoTime();
                        this.metric.recordDispatchWaitingTimer(nanoTime4 - nanoTime2);
                        requestMeta.setStartApplyNanos(nanoTime4);
                        TPipeConsensusTransferResp loadEvent = PipeConsensusReceiver.this.loadEvent(tPipeConsensusTransferReq);
                        if (loadEvent != null && loadEvent.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                            onSuccess(commitId, z2);
                        }
                        this.condition.signalAll();
                        this.lock.unlock();
                        return loadEvent;
                    }
                    if (this.reqExecutionOrderBuffer.size() >= PipeConsensusReceiver.IOTDB_CONFIG.getIotConsensusV2PipelineSize() && this.reqExecutionOrderBuffer.first().equals(requestMeta)) {
                        PipeConsensusReceiver.LOGGER.info("PipeConsensus-PipeName-{}: no.{} event get executed because receiver buffer's len >= pipeline, current receiver syncIndex {}, current buffer len {}", new Object[]{PipeConsensusReceiver.this.consensusPipeName, commitId, Long.valueOf(this.onSyncedReplicateIndex), Integer.valueOf(this.reqExecutionOrderBuffer.size())});
                        long nanoTime5 = System.nanoTime();
                        this.metric.recordDispatchWaitingTimer(nanoTime5 - nanoTime2);
                        requestMeta.setStartApplyNanos(nanoTime5);
                        TPipeConsensusTransferResp loadEvent2 = PipeConsensusReceiver.this.loadEvent(tPipeConsensusTransferReq);
                        if (loadEvent2 != null && loadEvent2.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                            onSuccess(commitId, z2);
                        }
                        this.condition.signalAll();
                        this.lock.unlock();
                        return loadEvent2;
                    }
                    try {
                        boolean z3 = !this.condition.await(PipeConsensusReceiver.PIPE_CONSENSUS_RECEIVER_MAX_WAITING_TIME_IN_MS, TimeUnit.MILLISECONDS);
                        if (PipeConsensusReceiver.this.isClosed.get()) {
                            TPipeConsensusTransferResp closedResp2 = PipeConsensusReceiverAgent.closedResp(PipeConsensusReceiver.this.consensusPipeName.toString(), tPipeConsensusTransferReq.getCommitId());
                            this.condition.signalAll();
                            this.lock.unlock();
                            return closedResp2;
                        }
                        if (!this.reqExecutionOrderBuffer.contains(requestMeta)) {
                            TPipeConsensusTransferResp deprecatedResp3 = deprecatedResp(String.format("%s or %s", THIS_NODE, PIPE_TASK));
                            this.condition.signalAll();
                            this.lock.unlock();
                            return deprecatedResp3;
                        }
                        if (z3 && this.reqExecutionOrderBuffer.size() < PipeConsensusReceiver.IOTDB_CONFIG.getIotConsensusV2PipelineSize() && this.reqExecutionOrderBuffer.first() != null && this.reqExecutionOrderBuffer.first().equals(requestMeta)) {
                            PipeConsensusReceiver.LOGGER.info("PipeConsensus-PipeName-{}: no.{} event get executed after awaiting timeout, current receiver syncIndex: {}", new Object[]{PipeConsensusReceiver.this.consensusPipeName, commitId, Long.valueOf(this.onSyncedReplicateIndex)});
                            long nanoTime6 = System.nanoTime();
                            this.metric.recordDispatchWaitingTimer(nanoTime6 - nanoTime2);
                            requestMeta.setStartApplyNanos(nanoTime6);
                            TPipeConsensusTransferResp loadEvent3 = PipeConsensusReceiver.this.loadEvent(tPipeConsensusTransferReq);
                            if (loadEvent3 != null && loadEvent3.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                                onSuccess(commitId, z2);
                            }
                            this.condition.signalAll();
                            this.lock.unlock();
                            return loadEvent3;
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        PipeConsensusReceiver.LOGGER.warn("PipeConsensus-PipeName-{}: current waiting is interrupted. onSyncedCommitIndex: {}. Exception: ", new Object[]{PipeConsensusReceiver.this.consensusPipeName, Long.valueOf(commitId.getReplicateIndex()), e});
                        TPipeConsensusTransferResp tPipeConsensusTransferResp = new TPipeConsensusTransferResp(RpcUtils.getStatus(TSStatusCode.SHUT_DOWN_ERROR, "RPC processor is interrupted by shutdown hook when wait on condition!"));
                        this.condition.signalAll();
                        this.lock.unlock();
                        return tPipeConsensusTransferResp;
                    }
                }
            } catch (Throwable th) {
                this.condition.signalAll();
                this.lock.unlock();
                throw th;
            }
        }

        private void resetWithNewestRebootTime(int i) {
            PipeConsensusReceiver.LOGGER.info("PipeConsensus-PipeName-{}: receiver detected an newer rebootTimes, which indicates the leader has rebooted. receiver will reset all its data.", PipeConsensusReceiver.this.consensusPipeName);
            clear(true, false);
            this.connectorRebootTimes = i;
            this.pipeTaskRestartTimes = 0;
        }

        private void resetWithNewestRestartTime(int i) {
            PipeConsensusReceiver.LOGGER.info("PipeConsensus-PipeName-{}: receiver detected an newer pipeTaskRestartTimes, which indicates the pipe task has restarted. receiver will reset all its data.", PipeConsensusReceiver.this.consensusPipeName);
            clear(false, false);
            this.pipeTaskRestartTimes = i;
        }

        private void onSuccess(TCommitId tCommitId, boolean z) {
            PipeConsensusReceiver.LOGGER.info("PipeConsensus-PipeName-{}: process no.{} event successfully!", PipeConsensusReceiver.this.consensusPipeName, tCommitId);
            RequestMeta pollFirst = this.reqExecutionOrderBuffer.pollFirst();
            this.onSyncedReplicateIndex = tCommitId.getReplicateIndex();
            if (z) {
                this.tsFileEventCount.decrementAndGet();
                this.metric.recordReceiveTsFileTimer(System.nanoTime() - pollFirst.getStartApplyNanos());
            } else {
                this.WALEventCount.decrementAndGet();
                this.metric.recordReceiveWALTimer(System.nanoTime() - pollFirst.getStartApplyNanos());
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void clear(boolean z, boolean z2) {
            PipeConsensusReceiver.this.tsFilePieceReadWriteLock.writeLock().lock();
            try {
                this.reqExecutionOrderBuffer.clear();
                this.tsFileWriterPool.releaseAllWriters(PipeConsensusReceiver.this.consensusPipeName);
                if (z) {
                    this.onSyncedReplicateIndex = 0L;
                }
                if (z2) {
                    PipeConsensusReceiver.this.clearAllReceiverBaseDir();
                }
            } finally {
                PipeConsensusReceiver.this.tsFilePieceReadWriteLock.writeLock().unlock();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void tryClose() {
            this.lock.lock();
            try {
                PipeConsensusReceiver.this.isClosed.set(true);
            } finally {
                this.lock.unlock();
            }
        }

        private TPipeConsensusTransferResp deprecatedResp(String str) {
            TSStatus tSStatus = new TSStatus(RpcUtils.getStatus(TSStatusCode.PIPE_CONSENSUS_DEPRECATED_REQUEST, String.format("PipeConsensus receiver received a deprecated request, which may be sent before %s restarts. Consider to discard it", str)));
            PipeConsensusReceiver.LOGGER.info("PipeConsensus-PipeName-{}: received a deprecated request, which may be sent before {} restarts. Consider to discard it", PipeConsensusReceiver.this.consensusPipeName, str);
            return new TPipeConsensusTransferResp(tSStatus);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver$RequestMeta.class */
    public static class RequestMeta {
        private final TCommitId commitId;
        private long startApplyNanos = 0;

        public RequestMeta(TCommitId tCommitId) {
            this.commitId = tCommitId;
        }

        public int getDataNodeRebootTimes() {
            return this.commitId.getDataNodeRebootTimes();
        }

        public long getReplicateIndex() {
            return this.commitId.getReplicateIndex();
        }

        public void setStartApplyNanos(long j) {
            if (this.startApplyNanos == 0) {
                this.startApplyNanos = j;
            }
        }

        public long getStartApplyNanos() {
            return this.startApplyNanos == 0 ? System.nanoTime() : this.startApplyNanos;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return this.commitId.equals(((RequestMeta) obj).commitId);
        }

        public int hashCode() {
            return Objects.hash(this.commitId);
        }
    }

    public PipeConsensusReceiver(PipeConsensus pipeConsensus, ConsensusGroupId consensusGroupId, ConsensusPipeName consensusPipeName) {
        this.pipeConsensus = pipeConsensus;
        this.consensusGroupId = consensusGroupId;
        this.consensusPipeName = consensusPipeName;
        try {
            initiateTsFileBufferFolder(Arrays.asList(IoTDBDescriptor.getInstance().getConfig().getIotConsensusV2ReceiverFileDirs()));
            try {
                this.folderManager = new FolderManager(this.receiveDirs, DirectoryStrategyType.SEQUENCE_STRATEGY);
                this.pipeConsensusTsFileWriterPool = new PipeConsensusTsFileWriterPool(consensusPipeName);
                this.requestExecutor = new RequestExecutor(this.pipeConsensusReceiverMetrics, this.pipeConsensusTsFileWriterPool);
                MetricService.getInstance().addMetricSet(this.pipeConsensusReceiverMetrics);
            } catch (Exception e) {
                LOGGER.error("Fail to create pipeConsensus receiver file folders allocation strategy because all disks of folders are full.", e);
                throw new RuntimeException(e);
            }
        } catch (Exception e2) {
            LOGGER.error("Fail to initiate file buffer folder, Error msg: {}", e2.getMessage());
            throw new RuntimeException(e2);
        }
    }

    public TPipeConsensusTransferResp receive(TPipeConsensusTransferReq tPipeConsensusTransferReq) {
        TPipeConsensusTransferResp onRequest;
        long nanoTime = System.nanoTime();
        TPipeConsensusTransferResp preCheckForReceiver = preCheckForReceiver(tPipeConsensusTransferReq);
        if (preCheckForReceiver != null) {
            return preCheckForReceiver;
        }
        short type = tPipeConsensusTransferReq.getType();
        if (!PipeConsensusRequestType.isValidatedRequestType(type)) {
            TSStatus status = RpcUtils.getStatus(TSStatusCode.PIPE_TYPE_ERROR, String.format("PipeConsensus Unknown PipeRequestType %s.", Short.valueOf(type)));
            if (LOGGER.isWarnEnabled()) {
                LOGGER.warn("PipeConsensus Unknown PipeRequestType, response status = {}.", status);
            }
            return new TPipeConsensusTransferResp(status);
        }
        switch (AnonymousClass1.$SwitchMap$org$apache$iotdb$commons$pipe$connector$payload$pipeconsensus$request$PipeConsensusRequestType[PipeConsensusRequestType.valueOf(type).ordinal()]) {
            case 1:
            case 2:
                this.requestExecutor.onRequest(tPipeConsensusTransferReq, true, false);
                onRequest = loadEvent(tPipeConsensusTransferReq);
                break;
            case 3:
            case 4:
                onRequest = this.requestExecutor.onRequest(tPipeConsensusTransferReq, false, true);
                break;
            case 5:
            case 6:
            case 7:
            case 8:
            default:
                onRequest = this.requestExecutor.onRequest(tPipeConsensusTransferReq, false, false);
                break;
        }
        this.pipeConsensusReceiverMetrics.recordReceiveEventTimer(System.nanoTime() - nanoTime);
        return onRequest;
    }

    private TPipeConsensusTransferResp preCheckForReceiver(TPipeConsensusTransferReq tPipeConsensusTransferReq) {
        ConsensusGroupId createFromTConsensusGroupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tPipeConsensusTransferReq.getConsensusGroupId());
        PipeConsensusServerImpl impl = this.pipeConsensus.getImpl(createFromTConsensusGroupId);
        if (impl == null) {
            String format = String.format("PipeConsensus-PipeName-%s: unexpected consensusGroupId %s", this.consensusPipeName, createFromTConsensusGroupId);
            if (LOGGER.isErrorEnabled()) {
                LOGGER.error(format);
            }
            return new TPipeConsensusTransferResp(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), format));
        }
        if (!impl.isReadOnly()) {
            return null;
        }
        String format2 = String.format("PipeConsensus-PipeName-%s: fail to receive because system is read-only.", this.consensusPipeName);
        if (LOGGER.isErrorEnabled()) {
            LOGGER.error(format2);
        }
        return new TPipeConsensusTransferResp(RpcUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY.getStatusCode(), format2));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public TPipeConsensusTransferResp loadEvent(TPipeConsensusTransferReq tPipeConsensusTransferReq) {
        if (this.isClosed.get()) {
            return PipeConsensusReceiverAgent.closedResp(this.consensusPipeName.toString(), tPipeConsensusTransferReq.getCommitId());
        }
        try {
            short type = tPipeConsensusTransferReq.getType();
            if (PipeConsensusRequestType.isValidatedRequestType(type)) {
                switch (AnonymousClass1.$SwitchMap$org$apache$iotdb$commons$pipe$connector$payload$pipeconsensus$request$PipeConsensusRequestType[PipeConsensusRequestType.valueOf(type).ordinal()]) {
                    case 1:
                        return handleTransferFilePiece(PipeConsensusTsFilePieceReq.fromTPipeConsensusTransferReq(tPipeConsensusTransferReq), true);
                    case 2:
                        return handleTransferFilePiece(PipeConsensusTsFilePieceReq.fromTPipeConsensusTransferReq(tPipeConsensusTransferReq), false);
                    case 3:
                        return handleTransferFileSeal(PipeConsensusTsFileSealReq.fromTPipeConsensusTransferReq(tPipeConsensusTransferReq));
                    case 4:
                        return handleTransferFileSealWithMods(PipeConsensusTsFileSealWithModReq.fromTPipeConsensusTransferReq(tPipeConsensusTransferReq));
                    case 5:
                        return handleTransferDeletion(PipeConsensusDeleteNodeReq.fromTPipeConsensusTransferReq(tPipeConsensusTransferReq));
                    case 6:
                        return handleTransferTabletBinary(PipeConsensusTabletBinaryReq.fromTPipeConsensusTransferReq(tPipeConsensusTransferReq));
                    case 7:
                        return handleTransferTabletInsertNode(PipeConsensusTabletInsertNodeReq.fromTPipeConsensusTransferReq(tPipeConsensusTransferReq));
                    case 8:
                        LOGGER.info("PipeConsensus transfer batch hasn't been implemented yet.");
                        break;
                }
            }
            TSStatus status = RpcUtils.getStatus(TSStatusCode.PIPE_CONSENSUS_TYPE_ERROR, String.format("Unknown PipeConsensusRequestType %s.", Short.valueOf(type)));
            LOGGER.warn("PipeConsensus-PipeName-{}: Unknown PipeRequestType, response status = {}.", this.consensusPipeName, status);
            return new TPipeConsensusTransferResp(status);
        } catch (Exception e) {
            String format = String.format("Serialization error during pipe receiving, %s", e);
            LOGGER.warn("PipeConsensus-PipeName-{}: {}", new Object[]{this.consensusPipeName, format, e});
            return new TPipeConsensusTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, format));
        }
    }

    private TPipeConsensusTransferResp handleTransferTabletInsertNode(PipeConsensusTabletInsertNodeReq pipeConsensusTabletInsertNodeReq) throws ConsensusGroupNotExistException {
        PipeConsensusServerImpl pipeConsensusServerImpl = (PipeConsensusServerImpl) Optional.ofNullable(this.pipeConsensus.getImpl(this.consensusGroupId)).orElseThrow(() -> {
            return new ConsensusGroupNotExistException(this.consensusGroupId);
        });
        InsertNode insertNode = pipeConsensusTabletInsertNodeReq.getInsertNode();
        insertNode.markAsGeneratedByRemoteConsensusLeader();
        insertNode.setProgressIndex(ProgressIndexType.deserializeFrom(ByteBuffer.wrap(pipeConsensusTabletInsertNodeReq.getProgressIndex())));
        return new TPipeConsensusTransferResp(pipeConsensusServerImpl.writeOnFollowerReplica(insertNode));
    }

    private TPipeConsensusTransferResp handleTransferTabletBinary(PipeConsensusTabletBinaryReq pipeConsensusTabletBinaryReq) throws ConsensusGroupNotExistException {
        PipeConsensusServerImpl pipeConsensusServerImpl = (PipeConsensusServerImpl) Optional.ofNullable(this.pipeConsensus.getImpl(this.consensusGroupId)).orElseThrow(() -> {
            return new ConsensusGroupNotExistException(this.consensusGroupId);
        });
        InsertNode convertToInsertNode = pipeConsensusTabletBinaryReq.convertToInsertNode();
        convertToInsertNode.markAsGeneratedByRemoteConsensusLeader();
        convertToInsertNode.setProgressIndex(ProgressIndexType.deserializeFrom(ByteBuffer.wrap(pipeConsensusTabletBinaryReq.getProgressIndex())));
        return new TPipeConsensusTransferResp(pipeConsensusServerImpl.writeOnFollowerReplica(convertToInsertNode));
    }

    private TPipeConsensusTransferResp handleTransferDeletion(PipeConsensusDeleteNodeReq pipeConsensusDeleteNodeReq) throws ConsensusGroupNotExistException {
        PipeConsensusServerImpl pipeConsensusServerImpl = (PipeConsensusServerImpl) Optional.ofNullable(this.pipeConsensus.getImpl(this.consensusGroupId)).orElseThrow(() -> {
            return new ConsensusGroupNotExistException(this.consensusGroupId);
        });
        AbstractDeleteDataNode deleteDataNode = pipeConsensusDeleteNodeReq.getDeleteDataNode();
        deleteDataNode.markAsGeneratedByRemoteConsensusLeader();
        deleteDataNode.setProgressIndex(ProgressIndexType.deserializeFrom(ByteBuffer.wrap(pipeConsensusDeleteNodeReq.getProgressIndex())));
        return new TPipeConsensusTransferResp(pipeConsensusServerImpl.writeOnFollowerReplica(deleteDataNode));
    }

    private TPipeConsensusTransferResp handleTransferFilePiece(PipeConsensusTransferFilePieceReq pipeConsensusTransferFilePieceReq, boolean z) {
        this.tsFilePieceReadWriteLock.readLock().lock();
        try {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("PipeConsensus-PipeName-{}: starting to receive tsFile pieces", this.consensusPipeName);
            }
            long nanoTime = System.nanoTime();
            PipeConsensusTsFileWriter borrowCorrespondingWriter = this.pipeConsensusTsFileWriterPool.borrowCorrespondingWriter(pipeConsensusTransferFilePieceReq.getCommitId());
            long nanoTime2 = System.nanoTime();
            this.pipeConsensusReceiverMetrics.recordBorrowTsFileWriterTimer(nanoTime2 - nanoTime);
            try {
                updateWritingFileIfNeeded(borrowCorrespondingWriter, pipeConsensusTransferFilePieceReq.getFileName(), z);
                File writingFile = borrowCorrespondingWriter.getWritingFile();
                RandomAccessFile writingFileWriter = borrowCorrespondingWriter.getWritingFileWriter();
                if (isWritingFileOffsetNonCorrect(borrowCorrespondingWriter, pipeConsensusTransferFilePieceReq.getStartWritingOffset())) {
                    if (!writingFile.getName().endsWith(".tsfile")) {
                        writingFileWriter.setLength(0L);
                    }
                    TSStatus status = RpcUtils.getStatus(TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_OFFSET_RESET, String.format("Request sender to reset file reader's offset from %s to %s.", Long.valueOf(pipeConsensusTransferFilePieceReq.getStartWritingOffset()), Long.valueOf(writingFileWriter.length())));
                    LOGGER.warn("PipeConsensus-PipeName-{}: File offset reset requested by receiver, response status = {}.", this.consensusPipeName, status);
                    PipeConsensusTransferFilePieceResp tPipeConsensusTransferResp = PipeConsensusTransferFilePieceResp.toTPipeConsensusTransferResp(status, writingFileWriter.length());
                    this.tsFilePieceReadWriteLock.readLock().unlock();
                    return tPipeConsensusTransferResp;
                }
                long nanoTime3 = System.nanoTime();
                this.pipeConsensusReceiverMetrics.recordTsFilePiecePreCheckTime(nanoTime3 - nanoTime2);
                writingFileWriter.write(pipeConsensusTransferFilePieceReq.getFilePiece());
                this.pipeConsensusReceiverMetrics.recordTsFilePieceWriteTime(System.nanoTime() - nanoTime3);
                PipeConsensusTransferFilePieceResp tPipeConsensusTransferResp2 = PipeConsensusTransferFilePieceResp.toTPipeConsensusTransferResp(RpcUtils.SUCCESS_STATUS, writingFileWriter.length());
                this.tsFilePieceReadWriteLock.readLock().unlock();
                return tPipeConsensusTransferResp2;
            } catch (Exception e) {
                LOGGER.warn("PipeConsensus-PipeName-{}: Failed to write file piece from req {}.", new Object[]{this.consensusPipeName, pipeConsensusTransferFilePieceReq, e});
                TSStatus status2 = RpcUtils.getStatus(TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_ERROR, String.format("Failed to write file piece, because %s", e.getMessage()));
                try {
                    try {
                        PipeConsensusTransferFilePieceResp tPipeConsensusTransferResp3 = PipeConsensusTransferFilePieceResp.toTPipeConsensusTransferResp(status2, -1L);
                        closeCurrentWritingFileWriter(borrowCorrespondingWriter, false);
                        deleteCurrentWritingFile(borrowCorrespondingWriter);
                        try {
                            borrowCorrespondingWriter.returnSelf(this.consensusPipeName);
                        } catch (IOException | DiskSpaceInsufficientException e2) {
                            LOGGER.warn("PipeConsensus-PipeName-{}: Failed to return tsFileWriter {}.", new Object[]{this.consensusPipeName, borrowCorrespondingWriter, e2});
                        }
                        this.tsFilePieceReadWriteLock.readLock().unlock();
                        return tPipeConsensusTransferResp3;
                    } finally {
                    }
                } catch (IOException e3) {
                    PipeConsensusTransferFilePieceResp tPipeConsensusTransferResp4 = PipeConsensusTransferFilePieceResp.toTPipeConsensusTransferResp(status2);
                    closeCurrentWritingFileWriter(borrowCorrespondingWriter, false);
                    deleteCurrentWritingFile(borrowCorrespondingWriter);
                    try {
                        borrowCorrespondingWriter.returnSelf(this.consensusPipeName);
                    } catch (IOException | DiskSpaceInsufficientException e4) {
                        LOGGER.warn("PipeConsensus-PipeName-{}: Failed to return tsFileWriter {}.", new Object[]{this.consensusPipeName, borrowCorrespondingWriter, e4});
                    }
                    this.tsFilePieceReadWriteLock.readLock().unlock();
                    return tPipeConsensusTransferResp4;
                }
            }
        } catch (Throwable th) {
            this.tsFilePieceReadWriteLock.readLock().unlock();
            throw th;
        }
    }

    private TPipeConsensusTransferResp handleTransferFileSeal(PipeConsensusTsFileSealReq pipeConsensusTsFileSealReq) {
        LOGGER.info("PipeConsensus-PipeName-{}: starting to receive tsFile seal", this.consensusPipeName);
        long nanoTime = System.nanoTime();
        PipeConsensusTsFileWriter borrowCorrespondingWriter = this.pipeConsensusTsFileWriterPool.borrowCorrespondingWriter(pipeConsensusTsFileSealReq.getCommitId());
        long nanoTime2 = System.nanoTime();
        this.pipeConsensusReceiverMetrics.recordBorrowTsFileWriterTimer(nanoTime2 - nanoTime);
        File writingFile = borrowCorrespondingWriter.getWritingFile();
        RandomAccessFile writingFileWriter = borrowCorrespondingWriter.getWritingFileWriter();
        boolean z = false;
        try {
            try {
                try {
                    if (isWritingFileNonAvailable(borrowCorrespondingWriter)) {
                        TSStatus status = RpcUtils.getStatus(TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_ERROR, String.format("Failed to seal file, because writing file %s is not available.", writingFile));
                        LOGGER.warn(status.getMessage());
                        TPipeConsensusTransferResp tPipeConsensusTransferResp = new TPipeConsensusTransferResp(status);
                        closeCurrentWritingFileWriter(borrowCorrespondingWriter, false);
                        deleteCurrentWritingFile(borrowCorrespondingWriter);
                        if (0 != 0) {
                            try {
                                borrowCorrespondingWriter.returnSelf(this.consensusPipeName);
                            } catch (IOException | DiskSpaceInsufficientException e) {
                                LOGGER.warn("PipeConsensus-PipeName-{}: Failed to return tsFileWriter {}.", new Object[]{this.consensusPipeName, borrowCorrespondingWriter, e});
                            }
                        }
                        return tPipeConsensusTransferResp;
                    }
                    TPipeConsensusTransferResp checkFinalFileSeal = checkFinalFileSeal(borrowCorrespondingWriter, pipeConsensusTsFileSealReq.getFileName(), pipeConsensusTsFileSealReq.getFileLength());
                    if (Objects.nonNull(checkFinalFileSeal)) {
                        closeCurrentWritingFileWriter(borrowCorrespondingWriter, false);
                        deleteCurrentWritingFile(borrowCorrespondingWriter);
                        if (0 != 0) {
                            try {
                                borrowCorrespondingWriter.returnSelf(this.consensusPipeName);
                            } catch (IOException | DiskSpaceInsufficientException e2) {
                                LOGGER.warn("PipeConsensus-PipeName-{}: Failed to return tsFileWriter {}.", new Object[]{this.consensusPipeName, borrowCorrespondingWriter, e2});
                            }
                        }
                        return checkFinalFileSeal;
                    }
                    String absolutePath = writingFile.getAbsolutePath();
                    writingFileWriter.getFD().sync();
                    writingFileWriter.close();
                    borrowCorrespondingWriter.setWritingFileWriter(null);
                    borrowCorrespondingWriter.setWritingFile(null);
                    long nanoTime3 = System.nanoTime();
                    this.pipeConsensusReceiverMetrics.recordTsFileSealPreCheckTimer(nanoTime3 - nanoTime2);
                    updateWritePointCountMetrics(pipeConsensusTsFileSealReq.getPointCount(), absolutePath);
                    TSStatus loadFileToDataRegion = loadFileToDataRegion(absolutePath, ProgressIndexType.deserializeFrom(ByteBuffer.wrap(pipeConsensusTsFileSealReq.getProgressIndex())));
                    this.pipeConsensusReceiverMetrics.recordTsFileSealLoadTimer(System.nanoTime() - nanoTime3);
                    if (loadFileToDataRegion.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                        z = true;
                        LOGGER.info("PipeConsensus-PipeName-{}: Seal file {} successfully.", this.consensusPipeName, absolutePath);
                    } else {
                        LOGGER.warn("PipeConsensus-PipeName-{}: Failed to seal file {}, because {}.", new Object[]{this.consensusPipeName, absolutePath, loadFileToDataRegion.getMessage()});
                    }
                    TPipeConsensusTransferResp tPipeConsensusTransferResp2 = new TPipeConsensusTransferResp(loadFileToDataRegion);
                    closeCurrentWritingFileWriter(borrowCorrespondingWriter, false);
                    deleteCurrentWritingFile(borrowCorrespondingWriter);
                    if (z) {
                        try {
                            borrowCorrespondingWriter.returnSelf(this.consensusPipeName);
                        } catch (IOException | DiskSpaceInsufficientException e3) {
                            LOGGER.warn("PipeConsensus-PipeName-{}: Failed to return tsFileWriter {}.", new Object[]{this.consensusPipeName, borrowCorrespondingWriter, e3});
                        }
                    }
                    return tPipeConsensusTransferResp2;
                } catch (Throwable th) {
                    closeCurrentWritingFileWriter(borrowCorrespondingWriter, false);
                    deleteCurrentWritingFile(borrowCorrespondingWriter);
                    if (0 != 0) {
                        try {
                            borrowCorrespondingWriter.returnSelf(this.consensusPipeName);
                        } catch (IOException | DiskSpaceInsufficientException e4) {
                            LOGGER.warn("PipeConsensus-PipeName-{}: Failed to return tsFileWriter {}.", new Object[]{this.consensusPipeName, borrowCorrespondingWriter, e4});
                        }
                    }
                    throw th;
                }
            } catch (IOException e5) {
                LOGGER.warn("PipeConsensus-PipeName-{}: Failed to seal file {} from req {}.", new Object[]{this.consensusPipeName, writingFile, pipeConsensusTsFileSealReq, e5});
                TPipeConsensusTransferResp tPipeConsensusTransferResp3 = new TPipeConsensusTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_ERROR, String.format("Failed to seal file %s because %s", writingFile, e5.getMessage())));
                closeCurrentWritingFileWriter(borrowCorrespondingWriter, false);
                deleteCurrentWritingFile(borrowCorrespondingWriter);
                if (0 != 0) {
                    try {
                        borrowCorrespondingWriter.returnSelf(this.consensusPipeName);
                    } catch (IOException | DiskSpaceInsufficientException e6) {
                        LOGGER.warn("PipeConsensus-PipeName-{}: Failed to return tsFileWriter {}.", new Object[]{this.consensusPipeName, borrowCorrespondingWriter, e6});
                    }
                }
                return tPipeConsensusTransferResp3;
            }
        } catch (LoadFileException e7) {
            LOGGER.warn("PipeConsensus-PipeName-{}: Failed to load file {} from req {}.", new Object[]{this.consensusPipeName, writingFile, pipeConsensusTsFileSealReq, e7});
            TPipeConsensusTransferResp tPipeConsensusTransferResp4 = new TPipeConsensusTransferResp(RpcUtils.getStatus(TSStatusCode.LOAD_FILE_ERROR, String.format("Failed to seal file %s because %s", writingFile, e7.getMessage())));
            closeCurrentWritingFileWriter(borrowCorrespondingWriter, false);
            deleteCurrentWritingFile(borrowCorrespondingWriter);
            if (0 != 0) {
                try {
                    borrowCorrespondingWriter.returnSelf(this.consensusPipeName);
                } catch (IOException | DiskSpaceInsufficientException e8) {
                    LOGGER.warn("PipeConsensus-PipeName-{}: Failed to return tsFileWriter {}.", new Object[]{this.consensusPipeName, borrowCorrespondingWriter, e8});
                }
            }
            return tPipeConsensusTransferResp4;
        }
    }

    private TPipeConsensusTransferResp handleTransferFileSealWithMods(PipeConsensusTsFileSealWithModReq pipeConsensusTsFileSealWithModReq) {
        LOGGER.info("PipeConsensus-PipeName-{}: starting to receive tsFile seal with mods", this.consensusPipeName);
        long nanoTime = System.nanoTime();
        PipeConsensusTsFileWriter borrowCorrespondingWriter = this.pipeConsensusTsFileWriterPool.borrowCorrespondingWriter(pipeConsensusTsFileSealWithModReq.getCommitId());
        long nanoTime2 = System.nanoTime();
        this.pipeConsensusReceiverMetrics.recordBorrowTsFileWriterTimer(nanoTime2 - nanoTime);
        File writingFile = borrowCorrespondingWriter.getWritingFile();
        RandomAccessFile writingFileWriter = borrowCorrespondingWriter.getWritingFileWriter();
        File localWritingDir = borrowCorrespondingWriter.getLocalWritingDir();
        List list = (List) pipeConsensusTsFileSealWithModReq.getFileNames().stream().map(str -> {
            return new File(localWritingDir, str);
        }).collect(Collectors.toList());
        boolean z = false;
        try {
            try {
                if (isWritingFileNonAvailable(borrowCorrespondingWriter)) {
                    TSStatus status = RpcUtils.getStatus(TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_ERROR, String.format("Failed to seal file %s, because writing file %s is not available.", pipeConsensusTsFileSealWithModReq.getFileNames(), writingFile));
                    LOGGER.warn(status.getMessage());
                    TPipeConsensusTransferResp tPipeConsensusTransferResp = new TPipeConsensusTransferResp(status);
                    closeCurrentWritingFileWriter(borrowCorrespondingWriter, false);
                    IoTDBReceiverAgent.cleanPipeReceiverDir(localWritingDir);
                    if (0 != 0) {
                        try {
                            borrowCorrespondingWriter.returnSelf(this.consensusPipeName);
                        } catch (IOException | DiskSpaceInsufficientException e) {
                            LOGGER.warn("PipeConsensus-PipeName-{}: Failed to return tsFileWriter {}.", new Object[]{this.consensusPipeName, borrowCorrespondingWriter, e});
                        }
                    }
                    return tPipeConsensusTransferResp;
                }
                int i = 0;
                while (i < pipeConsensusTsFileSealWithModReq.getFileNames().size()) {
                    TPipeConsensusTransferResp checkFinalFileSeal = i == pipeConsensusTsFileSealWithModReq.getFileNames().size() - 1 ? checkFinalFileSeal(borrowCorrespondingWriter, (String) pipeConsensusTsFileSealWithModReq.getFileNames().get(i), ((Long) pipeConsensusTsFileSealWithModReq.getFileLengths().get(i)).longValue()) : checkNonFinalFileSeal(borrowCorrespondingWriter, (File) list.get(i), (String) pipeConsensusTsFileSealWithModReq.getFileNames().get(i), ((Long) pipeConsensusTsFileSealWithModReq.getFileLengths().get(i)).longValue());
                    if (Objects.nonNull(checkFinalFileSeal)) {
                        closeCurrentWritingFileWriter(borrowCorrespondingWriter, false);
                        IoTDBReceiverAgent.cleanPipeReceiverDir(localWritingDir);
                        if (0 != 0) {
                            try {
                                borrowCorrespondingWriter.returnSelf(this.consensusPipeName);
                            } catch (IOException | DiskSpaceInsufficientException e2) {
                                LOGGER.warn("PipeConsensus-PipeName-{}: Failed to return tsFileWriter {}.", new Object[]{this.consensusPipeName, borrowCorrespondingWriter, e2});
                            }
                        }
                        return checkFinalFileSeal;
                    }
                    i++;
                }
                writingFileWriter.getFD().sync();
                writingFileWriter.close();
                borrowCorrespondingWriter.setWritingFileWriter(null);
                borrowCorrespondingWriter.setWritingFile(null);
                List list2 = (List) list.stream().map((v0) -> {
                    return v0.getAbsolutePath();
                }).collect(Collectors.toList());
                long nanoTime3 = System.nanoTime();
                this.pipeConsensusReceiverMetrics.recordTsFileSealPreCheckTimer(nanoTime3 - nanoTime2);
                String str2 = (String) list2.get(1);
                updateWritePointCountMetrics(((Long) pipeConsensusTsFileSealWithModReq.getPointCounts().get(1)).longValue(), str2);
                TSStatus loadFileToDataRegion = loadFileToDataRegion(str2, ProgressIndexType.deserializeFrom(ByteBuffer.wrap(pipeConsensusTsFileSealWithModReq.getProgressIndex())));
                this.pipeConsensusReceiverMetrics.recordTsFileSealLoadTimer(System.nanoTime() - nanoTime3);
                if (loadFileToDataRegion.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                    z = true;
                    LOGGER.info("PipeConsensus-PipeName-{}: Seal file with mods {} successfully.", this.consensusPipeName, list2);
                } else {
                    LOGGER.warn("PipeConsensus-PipeName-{}: Failed to seal file {}, status is {}.", new Object[]{this.consensusPipeName, list2, loadFileToDataRegion});
                }
                TPipeConsensusTransferResp tPipeConsensusTransferResp2 = new TPipeConsensusTransferResp(loadFileToDataRegion);
                closeCurrentWritingFileWriter(borrowCorrespondingWriter, false);
                IoTDBReceiverAgent.cleanPipeReceiverDir(localWritingDir);
                if (z) {
                    try {
                        borrowCorrespondingWriter.returnSelf(this.consensusPipeName);
                    } catch (IOException | DiskSpaceInsufficientException e3) {
                        LOGGER.warn("PipeConsensus-PipeName-{}: Failed to return tsFileWriter {}.", new Object[]{this.consensusPipeName, borrowCorrespondingWriter, e3});
                    }
                }
                return tPipeConsensusTransferResp2;
            } catch (Throwable th) {
                closeCurrentWritingFileWriter(borrowCorrespondingWriter, false);
                IoTDBReceiverAgent.cleanPipeReceiverDir(localWritingDir);
                if (0 != 0) {
                    try {
                        borrowCorrespondingWriter.returnSelf(this.consensusPipeName);
                    } catch (IOException | DiskSpaceInsufficientException e4) {
                        LOGGER.warn("PipeConsensus-PipeName-{}: Failed to return tsFileWriter {}.", new Object[]{this.consensusPipeName, borrowCorrespondingWriter, e4});
                    }
                }
                throw th;
            }
        } catch (Exception e5) {
            LOGGER.warn("PipeConsensus-PipeName-{}: Failed to seal file {} from req {}.", new Object[]{this.consensusPipeName, list, pipeConsensusTsFileSealWithModReq, e5});
            TPipeConsensusTransferResp tPipeConsensusTransferResp3 = new TPipeConsensusTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_ERROR, String.format("Failed to seal file %s because %s", writingFile, e5.getMessage())));
            closeCurrentWritingFileWriter(borrowCorrespondingWriter, false);
            IoTDBReceiverAgent.cleanPipeReceiverDir(localWritingDir);
            if (0 != 0) {
                try {
                    borrowCorrespondingWriter.returnSelf(this.consensusPipeName);
                } catch (IOException | DiskSpaceInsufficientException e6) {
                    LOGGER.warn("PipeConsensus-PipeName-{}: Failed to return tsFileWriter {}.", new Object[]{this.consensusPipeName, borrowCorrespondingWriter, e6});
                }
            }
            return tPipeConsensusTransferResp3;
        }
    }

    private TPipeConsensusTransferResp checkNonFinalFileSeal(PipeConsensusTsFileWriter pipeConsensusTsFileWriter, File file, String str, long j) throws IOException {
        RandomAccessFile writingFileWriter = pipeConsensusTsFileWriter.getWritingFileWriter();
        if (!file.exists()) {
            TSStatus status = RpcUtils.getStatus(TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_ERROR, String.format("Failed to seal file %s, the file does not exist.", str));
            LOGGER.warn("PipeConsensus-PipeName-{}: Failed to seal file {}, because the file does not exist.", this.consensusPipeName, str);
            return new TPipeConsensusTransferResp(status);
        }
        if (j == file.length()) {
            return null;
        }
        TSStatus status2 = RpcUtils.getStatus(TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_ERROR, String.format("Failed to seal file %s, because the length of file is not correct. The original file has length %s, but receiver file has length %s.", str, Long.valueOf(j), Long.valueOf(writingFileWriter.length())));
        LOGGER.warn("PipeConsensus-PipeName-{}: Failed to seal file {} when check non final seal, because the length of file is not correct. The original file has length {}, but receiver file has length {}.", new Object[]{this.consensusPipeName, str, Long.valueOf(j), Long.valueOf(writingFileWriter.length())});
        return new TPipeConsensusTransferResp(status2);
    }

    private TSStatus loadFileToDataRegion(String str, ProgressIndex progressIndex) throws IOException, LoadFileException {
        DataRegion dataRegion = StorageEngine.getInstance().getDataRegion((DataRegionId) this.consensusGroupId);
        if (dataRegion != null) {
            dataRegion.loadNewTsFile(generateTsFileResource(str, progressIndex), true, false);
        } else {
            LOGGER.info("PipeConsensus-PipeName-{}: skip load tsfile-{} when sealing, because this region has been removed or migrated.", this.consensusPipeName, str);
        }
        return RpcUtils.SUCCESS_STATUS;
    }

    private void updateWritePointCountMetrics(long j, String str) {
        if (j >= 0) {
            updateWritePointCountMetrics(j);
            return;
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("PipeConsensus-PipeName-{}: The point count of TsFile {} is not given by sender, will read actual point count from TsFile.", this.consensusPipeName, str);
        }
        try {
            TsFileInsertionPointCounter tsFileInsertionPointCounter = new TsFileInsertionPointCounter(new File(str), null);
            try {
                updateWritePointCountMetrics(tsFileInsertionPointCounter.count());
                tsFileInsertionPointCounter.close();
            } finally {
            }
        } catch (IOException e) {
            LOGGER.warn("PipeConsensus-PipeName-{}: Failed to read TsFile when counting points: {}.", new Object[]{this.consensusPipeName, str, e});
        }
    }

    private void updateWritePointCountMetrics(long j) {
        Optional.ofNullable(StorageEngine.getInstance().getDataRegion((DataRegionId) this.consensusGroupId)).ifPresent(dataRegion -> {
            dataRegion.getNonSystemDatabaseName().ifPresent(str -> {
                LoadTsFileManager.updateWritePointCountMetrics(dataRegion, str, j, true);
            });
        });
    }

    private TsFileResource generateTsFileResource(String str, ProgressIndex progressIndex) throws IOException {
        File file = new File(str);
        TsFileResource tsFileResource = new TsFileResource(file);
        TsFileSequenceReader tsFileSequenceReader = new TsFileSequenceReader(file.getAbsolutePath());
        try {
            TsFileResourceUtils.updateTsFileResource(tsFileSequenceReader, tsFileResource);
            tsFileSequenceReader.close();
            tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
            tsFileResource.setProgressIndex(progressIndex);
            tsFileResource.setGeneratedByPipeConsensus(true);
            tsFileResource.serialize();
            return tsFileResource;
        } catch (Throwable th) {
            try {
                tsFileSequenceReader.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private boolean isWritingFileNonAvailable(PipeConsensusTsFileWriter pipeConsensusTsFileWriter) {
        File writingFile = pipeConsensusTsFileWriter.getWritingFile();
        RandomAccessFile writingFileWriter = pipeConsensusTsFileWriter.getWritingFileWriter();
        boolean z = (writingFile == null || !writingFile.exists() || writingFileWriter == null) ? false : true;
        if (!z) {
            Logger logger = LOGGER;
            Object[] objArr = new Object[5];
            objArr[0] = this.consensusPipeName;
            objArr[1] = writingFile;
            objArr[2] = Boolean.valueOf(writingFile == null);
            objArr[3] = Boolean.valueOf(writingFile != null && writingFile.exists());
            objArr[4] = Boolean.valueOf(writingFileWriter == null);
            logger.info("PipeConsensus-PipeName-{}: Writing file {} is not available. Writing file is null: {}, writing file exists: {}, writing file writer is null: {}.", objArr);
        }
        return !z;
    }

    private TPipeConsensusTransferResp checkFinalFileSeal(PipeConsensusTsFileWriter pipeConsensusTsFileWriter, String str, long j) throws IOException {
        File writingFile = pipeConsensusTsFileWriter.getWritingFile();
        RandomAccessFile writingFileWriter = pipeConsensusTsFileWriter.getWritingFileWriter();
        if (!isFileExistedAndNameCorrect(pipeConsensusTsFileWriter, str)) {
            TSStatus status = RpcUtils.getStatus(TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_ERROR, String.format("Failed to seal file %s, because writing file is %s.", str, writingFile));
            LOGGER.warn("PipeConsensus-PipeName-{}: Failed to seal file {}, because writing file is {}.", new Object[]{this.consensusPipeName, str, writingFile});
            return new TPipeConsensusTransferResp(status);
        }
        if (!isWritingFileOffsetNonCorrect(pipeConsensusTsFileWriter, j)) {
            return null;
        }
        TSStatus status2 = RpcUtils.getStatus(TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_ERROR, String.format("Failed to seal file %s, because the length of file is not correct. The original file has length %s, but receiver file has length %s.", str, Long.valueOf(j), Long.valueOf(writingFileWriter.length())));
        LOGGER.warn("PipeConsensus-PipeName-{}: Failed to seal file {} when check final seal file, because the length of file is not correct. The original file has length {}, but receiver file has length {}.", new Object[]{this.consensusPipeName, str, Long.valueOf(j), Long.valueOf(writingFileWriter.length())});
        return new TPipeConsensusTransferResp(status2);
    }

    private boolean isFileExistedAndNameCorrect(PipeConsensusTsFileWriter pipeConsensusTsFileWriter, String str) {
        File writingFile = pipeConsensusTsFileWriter.getWritingFile();
        return writingFile != null && writingFile.getName().equals(str);
    }

    private boolean isWritingFileOffsetNonCorrect(PipeConsensusTsFileWriter pipeConsensusTsFileWriter, long j) throws IOException {
        File writingFile = pipeConsensusTsFileWriter.getWritingFile();
        RandomAccessFile writingFileWriter = pipeConsensusTsFileWriter.getWritingFileWriter();
        boolean z = writingFileWriter.length() == j;
        if (!z) {
            LOGGER.warn("PipeConsensus-PipeName-{}: Writing file {}'s offset is {}, but request sender's offset is {}.", new Object[]{this.consensusPipeName, writingFile.getPath(), Long.valueOf(writingFileWriter.length()), Long.valueOf(j)});
        }
        return !z;
    }

    private void closeCurrentWritingFileWriter(PipeConsensusTsFileWriter pipeConsensusTsFileWriter, boolean z) {
        if (pipeConsensusTsFileWriter.getWritingFileWriter() == null) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("PipeConsensus-PipeName-{}: Current writing file writer is null. No need to close.", this.consensusPipeName.toString());
                return;
            }
            return;
        }
        if (z) {
            try {
                pipeConsensusTsFileWriter.getWritingFileWriter().getFD().sync();
            } catch (IOException e) {
                Logger logger = LOGGER;
                Object[] objArr = new Object[4];
                objArr[0] = this.consensusPipeName;
                objArr[1] = pipeConsensusTsFileWriter.getWritingFile() == null ? SubStringFunctionHelper.NULL_STRING : pipeConsensusTsFileWriter.getWritingFile().getPath();
                objArr[2] = e.getMessage();
                objArr[3] = e;
                logger.warn("PipeConsensus-PipeName-{}: Failed to close current writing file writer {}, because {}.", objArr);
            }
        }
        pipeConsensusTsFileWriter.getWritingFileWriter().close();
        LOGGER.info("PipeConsensus-PipeName-{}: Current writing file writer {} was closed.", this.consensusPipeName, pipeConsensusTsFileWriter.getWritingFile() == null ? SubStringFunctionHelper.NULL_STRING : pipeConsensusTsFileWriter.getWritingFile().getPath());
        pipeConsensusTsFileWriter.setWritingFileWriter(null);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void deleteFileOrDirectoryIfExists(File file, String str) {
        if (!file.exists()) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("PipeConsensus-PipeName-{}: {} {} is not existed. No need to delete.", new Object[]{this.consensusPipeName, str, file.getPath()});
                return;
            }
            return;
        }
        try {
            if (file.isDirectory()) {
                RetryUtils.retryOnException(() -> {
                    FileUtils.deleteDirectory(file);
                    return null;
                });
            } else {
                RetryUtils.retryOnException(() -> {
                    return FileUtils.delete(file);
                });
            }
            LOGGER.info("PipeConsensus-PipeName-{}: {} {} was deleted.", new Object[]{this.consensusPipeName, str, file.getPath()});
        } catch (IOException e) {
            LOGGER.warn("PipeConsensus-PipeName-{}: {} Failed to delete {}, because {}.", new Object[]{this.consensusPipeName, str, file.getPath(), e.getMessage(), e});
        }
    }

    private void deleteCurrentWritingFile(PipeConsensusTsFileWriter pipeConsensusTsFileWriter) {
        if (pipeConsensusTsFileWriter.getWritingFile() != null) {
            deleteFileOrDirectoryIfExists(pipeConsensusTsFileWriter.getWritingFile(), String.format("TsFileWriter-%s delete current writing file", Integer.valueOf(pipeConsensusTsFileWriter.index)));
        } else if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("PipeConsensus-PipeName-{}: Current writing file is null. No need to delete.", this.consensusPipeName.toString());
        }
    }

    private void updateWritingFileIfNeeded(PipeConsensusTsFileWriter pipeConsensusTsFileWriter, String str, boolean z) throws IOException {
        if (isFileExistedAndNameCorrect(pipeConsensusTsFileWriter, str)) {
            return;
        }
        Logger logger = LOGGER;
        Object[] objArr = new Object[3];
        objArr[0] = this.consensusPipeName;
        objArr[1] = str;
        objArr[2] = pipeConsensusTsFileWriter.getWritingFile() == null ? SubStringFunctionHelper.NULL_STRING : pipeConsensusTsFileWriter.getWritingFile().getPath();
        logger.info("PipeConsensus-PipeName-{}: Writing file {} is not existed or name is not correct, try to create it. Current writing file is {}.", objArr);
        closeCurrentWritingFileWriter(pipeConsensusTsFileWriter, !z);
        if (pipeConsensusTsFileWriter.getWritingFile() != null && z) {
            deleteCurrentWritingFile(pipeConsensusTsFileWriter);
        }
        if (!pipeConsensusTsFileWriter.getLocalWritingDir().exists()) {
            if (pipeConsensusTsFileWriter.getLocalWritingDir().mkdirs()) {
                LOGGER.info("PipeConsensus-PipeName-{}: Receiver file dir {} was created.", this.consensusPipeName, pipeConsensusTsFileWriter.getLocalWritingDir().getPath());
            } else {
                LOGGER.error("PipeConsensus-PipeName-{}: Failed to create receiver file dir {}.", this.consensusPipeName, pipeConsensusTsFileWriter.getLocalWritingDir().getPath());
            }
        }
        pipeConsensusTsFileWriter.setWritingFile(new File(pipeConsensusTsFileWriter.getLocalWritingDir(), str));
        pipeConsensusTsFileWriter.setWritingFileWriter(new RandomAccessFile(pipeConsensusTsFileWriter.getWritingFile(), "rw"));
        LOGGER.info("PipeConsensus-PipeName-{}: Writing file {} was created. Ready to write file pieces.", this.consensusPipeName, pipeConsensusTsFileWriter.getWritingFile().getPath());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getReceiverFileBaseDir() throws DiskSpaceInsufficientException {
        if (Objects.isNull(this.folderManager)) {
            return null;
        }
        return this.folderManager.getNextFolder();
    }

    private void initiateTsFileBufferFolder(List<String> list) throws IOException {
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            File file = new File(it.next(), this.consensusPipeName.toString());
            if (!new File(IoTDBDescriptor.getInstance().getConfig().getSystemDir()).exists()) {
                LOGGER.warn("PipeConsensus-PipeName-{}: Failed to create receiver file dir {}. Because parent system dir have been deleted due to system concurrently exit.", this.consensusPipeName, file.getPath());
                throw new IOException(String.format("PipeConsensus-PipeName-%s: Failed to create receiver file dir %s. Because parent system dir have been deleted due to system concurrently exit.", this.consensusPipeName, file.getPath()));
            }
            deleteFileOrDirectoryIfExists(file, "Initial Receiver: delete origin receive dir");
            if (!file.mkdirs()) {
                LOGGER.warn("PipeConsensus-PipeName-{}: Failed to create receiver file dir {}. May because authority or dir already exists etc.", this.consensusPipeName, file.getPath());
                throw new IOException(String.format("PipeConsensus-PipeName-%s: Failed to create receiver file dir %s. May because authority or dir already exists etc.", this.consensusPipeName, file.getPath()));
            }
            this.receiveDirs.add(file.getPath());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void clearAllReceiverBaseDir() {
        Iterator<String> it = this.receiveDirs.iterator();
        while (it.hasNext()) {
            deleteFileOrDirectoryIfExists(new File(it.next()), "Clear receive dir manually");
        }
    }

    public PipeConsensusRequestVersion getVersion() {
        return PipeConsensusRequestVersion.VERSION_1;
    }

    public synchronized void handleExit() {
        this.requestExecutor.tryClose();
        MetricService.getInstance().removeMetricSet(this.pipeConsensusReceiverMetrics);
        if (this.tsFileWriterCheckerFuture != null) {
            this.tsFileWriterCheckerFuture.cancel(false);
            this.tsFileWriterCheckerFuture = null;
        }
        this.requestExecutor.clear(false, true);
        LOGGER.info("Receiver-{} exit successfully.", this.consensusPipeName.toString());
    }

    public int getReceiveBufferSize() {
        return this.requestExecutor.reqExecutionOrderBuffer.size();
    }

    public int getWALEventCount() {
        return this.requestExecutor.WALEventCount.get();
    }

    public int getTsFileEventCount() {
        return this.requestExecutor.tsFileEventCount.get();
    }

    public String getConsensusGroupIdStr() {
        return this.consensusGroupId.toString();
    }
}
