package org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.payload.pipeconsensus.response.PipeConsensusTransferFilePieceResp;
import org.apache.iotdb.consensus.pipe.thrift.TCommitId;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.PipeConsensusAsyncConnector;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFilePieceReq;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFilePieceWithModReq;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealReq;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealWithModReq;
import org.apache.iotdb.db.pipe.consensus.metric.PipeConsensusConnectorMetrics;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.class */
public class PipeConsensusTsFileInsertionEventHandler implements AsyncMethodCallback<TPipeConsensusTransferResp> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeConsensusTsFileInsertionEventHandler.class);
    private final PipeTsFileInsertionEvent event;
    private final PipeConsensusAsyncConnector connector;
    private final TCommitId commitId;
    private final TConsensusGroupId consensusGroupId;
    private final String consensusPipeName;
    private final int thisDataNodeId;
    private final File tsFile;
    private final File modFile;
    private File currentFile;
    private final boolean transferMod;
    private final int readFileBufferSize;
    private final byte[] readBuffer;
    private long position;
    private RandomAccessFile reader;
    private final AtomicBoolean isSealSignalSent;
    private AsyncPipeConsensusServiceClient client;
    private final PipeConsensusConnectorMetrics metric;
    private final long createTime;
    private long startTransferPieceTime;

    public PipeConsensusTsFileInsertionEventHandler(PipeTsFileInsertionEvent pipeTsFileInsertionEvent, PipeConsensusAsyncConnector pipeConsensusAsyncConnector, TCommitId tCommitId, TConsensusGroupId tConsensusGroupId, String str, int i, PipeConsensusConnectorMetrics pipeConsensusConnectorMetrics) throws FileNotFoundException {
        this.event = pipeTsFileInsertionEvent;
        this.connector = pipeConsensusAsyncConnector;
        this.commitId = tCommitId;
        this.consensusGroupId = tConsensusGroupId;
        this.consensusPipeName = str;
        this.thisDataNodeId = i;
        this.tsFile = pipeTsFileInsertionEvent.getTsFile();
        this.modFile = pipeTsFileInsertionEvent.getModFile();
        this.transferMod = pipeTsFileInsertionEvent.isWithMod();
        this.currentFile = this.transferMod ? this.modFile : this.tsFile;
        this.readFileBufferSize = PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
        this.readBuffer = new byte[this.readFileBufferSize];
        this.position = 0L;
        this.reader = Objects.nonNull(this.modFile) ? new RandomAccessFile(this.modFile, "r") : new RandomAccessFile(this.tsFile, "r");
        this.isSealSignalSent = new AtomicBoolean(false);
        this.metric = pipeConsensusConnectorMetrics;
        this.createTime = System.nanoTime();
    }

    public void transfer(AsyncPipeConsensusServiceClient asyncPipeConsensusServiceClient) throws TException, IOException {
        this.startTransferPieceTime = System.nanoTime();
        this.client = asyncPipeConsensusServiceClient;
        asyncPipeConsensusServiceClient.setShouldReturnSelf(false);
        int read = this.reader.read(this.readBuffer);
        if (read != -1) {
            byte[] copyOfRange = read == this.readFileBufferSize ? this.readBuffer : Arrays.copyOfRange(this.readBuffer, 0, read);
            asyncPipeConsensusServiceClient.pipeConsensusTransfer(this.transferMod ? PipeConsensusTsFilePieceWithModReq.toTPipeConsensusTransferReq(this.currentFile.getName(), this.position, copyOfRange, this.commitId, this.consensusGroupId, this.thisDataNodeId) : PipeConsensusTsFilePieceReq.toTPipeConsensusTransferReq(this.currentFile.getName(), this.position, copyOfRange, this.commitId, this.consensusGroupId, this.thisDataNodeId), this);
            this.position += read;
        } else {
            if (this.currentFile != this.modFile) {
                if (this.currentFile == this.tsFile) {
                    this.isSealSignalSent.set(true);
                    asyncPipeConsensusServiceClient.pipeConsensusTransfer(this.transferMod ? PipeConsensusTsFileSealWithModReq.toTPipeConsensusTransferReq(this.modFile.getName(), this.modFile.length(), this.tsFile.getName(), this.tsFile.length(), this.event.getFlushPointCount(), this.commitId, this.consensusGroupId, this.event.getProgressIndex(), this.thisDataNodeId) : PipeConsensusTsFileSealReq.toTPipeConsensusTransferReq(this.tsFile.getName(), this.tsFile.length(), this.event.getFlushPointCount(), this.commitId, this.consensusGroupId, this.event.getProgressIndex(), this.thisDataNodeId), this);
                    return;
                }
                return;
            }
            this.currentFile = this.tsFile;
            this.position = 0L;
            try {
                this.reader.close();
            } catch (IOException e) {
                LOGGER.warn("PipeConsensus-{}: Failed to close file reader when successfully transferred mod file.", this.consensusPipeName, e);
            }
            this.reader = new RandomAccessFile(this.tsFile, "r");
            transfer(asyncPipeConsensusServiceClient);
        }
    }

    public void onComplete(TPipeConsensusTransferResp tPipeConsensusTransferResp) {
        if (!this.isSealSignalSent.get()) {
            try {
                PipeConsensusTransferFilePieceResp fromTPipeConsensusTransferResp = PipeConsensusTransferFilePieceResp.fromTPipeConsensusTransferResp(tPipeConsensusTransferResp);
                if (fromTPipeConsensusTransferResp.getStatus().getCode() == TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_OFFSET_RESET.getStatusCode()) {
                    this.position = fromTPipeConsensusTransferResp.getEndWritingOffset();
                    this.reader.seek(this.position);
                    LOGGER.info("PipeConsensus-{}: Redirect file position to {}.", this.consensusPipeName, Long.valueOf(this.position));
                } else {
                    TSStatus status = tPipeConsensusTransferResp.getStatus();
                    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
                        this.connector.statusHandler().handle(status, tPipeConsensusTransferResp.getStatus().getMessage(), this.tsFile.getName());
                    }
                }
                this.metric.recordConnectorTsFilePieceTransferTimer(System.nanoTime() - this.startTransferPieceTime);
                transfer(this.client);
                return;
            } catch (Exception e) {
                onError(e);
                return;
            }
        }
        try {
            TSStatus status2 = tPipeConsensusTransferResp.getStatus();
            if (status2.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && status2.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
                this.connector.statusHandler().handle(status2, String.format("Seal file %s error, result status %s.", this.tsFile, tPipeConsensusTransferResp.getStatus()), this.tsFile.getName());
            }
            if (status2.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                this.connector.removeEventFromBuffer(this.event);
            }
            try {
                try {
                    if (this.reader != null) {
                        this.reader.close();
                    }
                    this.event.decreaseReferenceCount(PipeConsensusTsFileInsertionEventHandler.class.getName(), true);
                    LOGGER.info("PipeConsensus-{}: Successfully transferred file {} (committer key={}, replicate index={}).", new Object[]{this.consensusPipeName, this.tsFile, this.event.getCommitterKey(), Long.valueOf(this.event.getReplicateIndexForIoTV2())});
                    if (this.client != null) {
                        this.client.setShouldReturnSelf(true);
                        this.client.returnSelf();
                    }
                    this.metric.recordConnectorTsFileTransferTimer(System.nanoTime() - this.createTime);
                } catch (IOException e2) {
                    LOGGER.warn("PipeConsensus-{}: Failed to close file reader when successfully transferred file.", this.consensusPipeName, e2);
                    this.event.decreaseReferenceCount(PipeConsensusTsFileInsertionEventHandler.class.getName(), true);
                    LOGGER.info("PipeConsensus-{}: Successfully transferred file {} (committer key={}, replicate index={}).", new Object[]{this.consensusPipeName, this.tsFile, this.event.getCommitterKey(), Long.valueOf(this.event.getReplicateIndexForIoTV2())});
                    if (this.client != null) {
                        this.client.setShouldReturnSelf(true);
                        this.client.returnSelf();
                    }
                    this.metric.recordConnectorTsFileTransferTimer(System.nanoTime() - this.createTime);
                }
            } catch (Throwable th) {
                this.event.decreaseReferenceCount(PipeConsensusTsFileInsertionEventHandler.class.getName(), true);
                LOGGER.info("PipeConsensus-{}: Successfully transferred file {} (committer key={}, replicate index={}).", new Object[]{this.consensusPipeName, this.tsFile, this.event.getCommitterKey(), Long.valueOf(this.event.getReplicateIndexForIoTV2())});
                if (this.client != null) {
                    this.client.setShouldReturnSelf(true);
                    this.client.returnSelf();
                }
                this.metric.recordConnectorTsFileTransferTimer(System.nanoTime() - this.createTime);
                throw th;
            }
        } catch (Exception e3) {
            onError(e3);
        }
    }

    public void onError(Exception exc) {
        LOGGER.warn("PipeConsensus-{}: Failed to transfer TsFileInsertionEvent {} (committer key {}, replicate index {}).", new Object[]{this.consensusPipeName, this.tsFile, this.event.getCommitterKey(), Long.valueOf(this.event.getReplicateIndexForIoTV2()), exc});
        try {
            try {
                if (this.reader != null) {
                    this.reader.close();
                }
                this.connector.addFailureEventToRetryQueue(this.event);
                this.metric.recordRetryCounter();
                if (this.client != null) {
                    this.client.setShouldReturnSelf(true);
                    this.client.returnSelf();
                }
            } catch (IOException e) {
                LOGGER.warn("PipeConsensus-{}: Failed to close file reader when failed to transfer file.", this.consensusPipeName, e);
                this.connector.addFailureEventToRetryQueue(this.event);
                this.metric.recordRetryCounter();
                if (this.client != null) {
                    this.client.setShouldReturnSelf(true);
                    this.client.returnSelf();
                }
            }
        } catch (Throwable th) {
            this.connector.addFailureEventToRetryQueue(this.event);
            this.metric.recordRetryCounter();
            if (this.client != null) {
                this.client.setShouldReturnSelf(true);
                this.client.returnSelf();
            }
            throw th;
        }
    }
}
