package org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Arrays;
import java.util.List;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.container.PipeConsensusClientMgrContainer;
import org.apache.iotdb.commons.client.sync.SyncPipeConsensusServiceClient;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.payload.pipeconsensus.response.PipeConsensusTransferFilePieceResp;
import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
import org.apache.iotdb.consensus.pipe.thrift.TCommitId;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.builder.PipeConsensusSyncBatchReqBuilder;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusDeleteNodeReq;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletBinaryReq;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletInsertNodeReq;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFilePieceReq;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFilePieceWithModReq;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealReq;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealWithModReq;
import org.apache.iotdb.db.pipe.consensus.metric.PipeConsensusConnectorMetrics;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.pipe.api.annotation.TableModel;
import org.apache.iotdb.pipe.api.annotation.TreeModel;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@TreeModel
@TableModel
/* loaded from: input_file:org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.class */
public class PipeConsensusSyncConnector extends IoTDBConnector {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeConsensusSyncConnector.class);
    private static final String PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT = "PipeConsensus: syncClient connection to %s:%s failed when %s, because: %s";
    private static final String TABLET_INSERTION_NODE_SCENARIO = "transfer insertionNode tablet";
    private static final String TSFILE_SCENARIO = "transfer tsfile";
    private static final String TABLET_BATCH_SCENARIO = "transfer tablet batch";
    private static final String DELETION_SCENARIO = "transfer deletion";
    private final IClientManager<TEndPoint, SyncPipeConsensusServiceClient> syncRetryClientManager = PipeConsensusClientMgrContainer.getInstance().newSyncClientManager();
    private final List<TEndPoint> peers;
    private final int thisDataNodeId;
    private final int consensusGroupId;
    private final PipeConsensusConnectorMetrics pipeConsensusConnectorMetrics;
    private PipeConsensusSyncBatchReqBuilder tabletBatchBuilder;

    public PipeConsensusSyncConnector(List<TEndPoint> list, int i, int i2, PipeConsensusConnectorMetrics pipeConsensusConnectorMetrics) {
        this.peers = list;
        this.consensusGroupId = i;
        this.thisDataNodeId = i2;
        this.pipeConsensusConnectorMetrics = pipeConsensusConnectorMetrics;
    }

    public void customize(PipeParameters pipeParameters, PipeConnectorRuntimeConfiguration pipeConnectorRuntimeConfiguration) throws Exception {
        super.customize(pipeParameters, pipeConnectorRuntimeConfiguration);
        if (this.isTabletBatchModeEnabled) {
            this.tabletBatchBuilder = new PipeConsensusSyncBatchReqBuilder(pipeParameters, new TConsensusGroupId(TConsensusGroupType.DataRegion, this.consensusGroupId), this.thisDataNodeId);
        }
        this.isTabletBatchModeEnabled = false;
    }

    public void handshake() throws Exception {
    }

    public void heartbeat() throws Exception {
    }

    public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
        try {
            if (!this.isTabletBatchModeEnabled) {
                long nanoTime = System.nanoTime();
                doTransferWrapper((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
                this.pipeConsensusConnectorMetrics.recordRetryWALTransferTimer(System.nanoTime() - nanoTime);
            } else if (this.tabletBatchBuilder.onEvent(tabletInsertionEvent)) {
                doTransfer();
            }
        } catch (Exception e) {
            throw new PipeConnectionException(String.format("Failed to transfer tablet insertion event %s, because %s.", tabletInsertionEvent, e.getMessage()), e);
        }
    }

    public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
        try {
            long nanoTime = System.nanoTime();
            if (this.isTabletBatchModeEnabled && !this.tabletBatchBuilder.isEmpty()) {
                doTransfer();
            }
            doTransfer((PipeTsFileInsertionEvent) tsFileInsertionEvent);
            this.pipeConsensusConnectorMetrics.recordRetryTsFileTransferTimer(System.nanoTime() - nanoTime);
        } catch (Exception e) {
            throw new PipeConnectionException(String.format("Failed to transfer tsfile insertion event %s, because %s.", tsFileInsertionEvent, e.getMessage()), e);
        }
    }

    public void transfer(Event event) throws Exception {
        if (this.isTabletBatchModeEnabled && !this.tabletBatchBuilder.isEmpty()) {
            doTransfer();
        }
        doTransferWrapper((PipeDeleteDataNodeEvent) event);
    }

    private void doTransfer() {
        try {
            SyncPipeConsensusServiceClient syncPipeConsensusServiceClient = (SyncPipeConsensusServiceClient) this.syncRetryClientManager.borrowClient(getFollowerUrl());
            try {
                this.tabletBatchBuilder.onSuccess();
                if (syncPipeConsensusServiceClient != null) {
                    syncPipeConsensusServiceClient.close();
                }
            } finally {
            }
        } catch (Exception e) {
            throw new PipeConnectionException(String.format(PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT, getFollowerUrl().getIp(), Integer.valueOf(getFollowerUrl().getPort()), TABLET_BATCH_SCENARIO, e.getMessage()), e);
        }
    }

    private void doTransferWrapper(PipeDeleteDataNodeEvent pipeDeleteDataNodeEvent) throws PipeException {
        if (pipeDeleteDataNodeEvent.increaseReferenceCount(PipeConsensusSyncConnector.class.getName())) {
            try {
                doTransfer(pipeDeleteDataNodeEvent);
            } finally {
                pipeDeleteDataNodeEvent.decreaseReferenceCount(PipeConsensusSyncConnector.class.getName(), false);
            }
        }
    }

    private void doTransfer(PipeDeleteDataNodeEvent pipeDeleteDataNodeEvent) throws PipeException {
        TCommitId tCommitId = new TCommitId(pipeDeleteDataNodeEvent.getReplicateIndexForIoTV2(), pipeDeleteDataNodeEvent.getRebootTimes());
        TConsensusGroupId tConsensusGroupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, this.consensusGroupId);
        try {
            SyncPipeConsensusServiceClient syncPipeConsensusServiceClient = (SyncPipeConsensusServiceClient) this.syncRetryClientManager.borrowClient(getFollowerUrl());
            try {
                TPipeConsensusTransferResp pipeConsensusTransfer = syncPipeConsensusServiceClient.pipeConsensusTransfer(PipeConsensusDeleteNodeReq.toTPipeConsensusTransferReq(pipeDeleteDataNodeEvent.getDeleteDataNode(), tCommitId, tConsensusGroupId, pipeDeleteDataNodeEvent.getProgressIndex(), this.thisDataNodeId));
                if (syncPipeConsensusServiceClient != null) {
                    syncPipeConsensusServiceClient.close();
                }
                TSStatus status = pipeConsensusTransfer.getStatus();
                if (pipeConsensusTransfer.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && pipeConsensusTransfer.getStatus().getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
                    this.receiverStatusHandler.handle(status, String.format("PipeConsensus transfer DeletionEvent %s error, result status %s.", pipeDeleteDataNodeEvent.getDeletionResource(), status), pipeDeleteDataNodeEvent.getDeleteDataNode().toString());
                }
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Successfully transferred deletion event {}.", pipeDeleteDataNodeEvent.getDeletionResource());
                }
            } finally {
            }
        } catch (Exception e) {
            throw new PipeConnectionException(String.format(PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT, getFollowerUrl().getIp(), Integer.valueOf(getFollowerUrl().getPort()), DELETION_SCENARIO, e.getMessage()), e);
        }
    }

    private void doTransferWrapper(PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent) throws PipeException {
        if (pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(PipeConsensusSyncConnector.class.getName())) {
            try {
                doTransfer(pipeInsertNodeTabletInsertionEvent);
            } finally {
                pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(PipeConsensusSyncConnector.class.getName(), false);
            }
        }
    }

    private void doTransfer(PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent) throws PipeException {
        TCommitId tCommitId = new TCommitId(pipeInsertNodeTabletInsertionEvent.getReplicateIndexForIoTV2(), pipeInsertNodeTabletInsertionEvent.getRebootTimes());
        TConsensusGroupId tConsensusGroupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, this.consensusGroupId);
        try {
            SyncPipeConsensusServiceClient syncPipeConsensusServiceClient = (SyncPipeConsensusServiceClient) this.syncRetryClientManager.borrowClient(getFollowerUrl());
            try {
                InsertNode insertNodeViaCacheIfPossible = pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
                ProgressIndex progressIndex = pipeInsertNodeTabletInsertionEvent.getProgressIndex();
                TPipeConsensusTransferResp pipeConsensusTransfer = insertNodeViaCacheIfPossible != null ? syncPipeConsensusServiceClient.pipeConsensusTransfer(PipeConsensusTabletInsertNodeReq.toTPipeConsensusTransferReq(insertNodeViaCacheIfPossible, tCommitId, tConsensusGroupId, progressIndex, this.thisDataNodeId)) : syncPipeConsensusServiceClient.pipeConsensusTransfer(PipeConsensusTabletBinaryReq.toTPipeConsensusTransferReq(pipeInsertNodeTabletInsertionEvent.getByteBuffer(), tCommitId, tConsensusGroupId, progressIndex, this.thisDataNodeId));
                if (syncPipeConsensusServiceClient != null) {
                    syncPipeConsensusServiceClient.close();
                }
                TSStatus status = pipeConsensusTransfer.getStatus();
                if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() || status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
                    return;
                }
                this.receiverStatusHandler.handle(status, String.format("PipeConsensus transfer PipeInsertNodeTabletInsertionEvent %s error, result status %s", pipeInsertNodeTabletInsertionEvent, status), pipeInsertNodeTabletInsertionEvent.toString());
            } finally {
            }
        } catch (Exception e) {
            throw new PipeConnectionException(String.format(PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT, getFollowerUrl().getIp(), Integer.valueOf(getFollowerUrl().getPort()), TABLET_INSERTION_NODE_SCENARIO, e.getMessage()), e);
        }
    }

    private void doTransfer(PipeTsFileInsertionEvent pipeTsFileInsertionEvent) throws PipeException {
        TPipeConsensusTransferResp pipeConsensusTransfer;
        File tsFile = pipeTsFileInsertionEvent.getTsFile();
        File modFile = pipeTsFileInsertionEvent.getModFile();
        try {
            SyncPipeConsensusServiceClient syncPipeConsensusServiceClient = (SyncPipeConsensusServiceClient) this.syncRetryClientManager.borrowClient(getFollowerUrl());
            try {
                TCommitId tCommitId = new TCommitId(pipeTsFileInsertionEvent.getReplicateIndexForIoTV2(), pipeTsFileInsertionEvent.getRebootTimes());
                TConsensusGroupId tConsensusGroupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, this.consensusGroupId);
                if (pipeTsFileInsertionEvent.isWithMod()) {
                    transferFilePieces(modFile, syncPipeConsensusServiceClient, true, tCommitId, tConsensusGroupId);
                    transferFilePieces(tsFile, syncPipeConsensusServiceClient, true, tCommitId, tConsensusGroupId);
                    pipeConsensusTransfer = syncPipeConsensusServiceClient.pipeConsensusTransfer(PipeConsensusTsFileSealWithModReq.toTPipeConsensusTransferReq(modFile.getName(), modFile.length(), tsFile.getName(), tsFile.length(), pipeTsFileInsertionEvent.getFlushPointCount(), tCommitId, tConsensusGroupId, pipeTsFileInsertionEvent.getProgressIndex(), this.thisDataNodeId));
                } else {
                    transferFilePieces(tsFile, syncPipeConsensusServiceClient, false, tCommitId, tConsensusGroupId);
                    pipeConsensusTransfer = syncPipeConsensusServiceClient.pipeConsensusTransfer(PipeConsensusTsFileSealReq.toTPipeConsensusTransferReq(tsFile.getName(), tsFile.length(), pipeTsFileInsertionEvent.getFlushPointCount(), tCommitId, tConsensusGroupId, pipeTsFileInsertionEvent.getProgressIndex(), this.thisDataNodeId));
                }
                if (syncPipeConsensusServiceClient != null) {
                    syncPipeConsensusServiceClient.close();
                }
                TSStatus status = pipeConsensusTransfer.getStatus();
                if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
                    this.receiverStatusHandler.handle(pipeConsensusTransfer.getStatus(), String.format("Seal file %s error, result status %s.", tsFile, pipeConsensusTransfer.getStatus()), tsFile.getName());
                }
                LOGGER.info("Successfully transferred file {}.", tsFile);
            } finally {
            }
        } catch (Exception e) {
            throw new PipeConnectionException(String.format(PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT, getFollowerUrl().getIp(), Integer.valueOf(getFollowerUrl().getPort()), TSFILE_SCENARIO, e.getMessage()), e);
        }
    }

    protected void transferFilePieces(File file, SyncPipeConsensusServiceClient syncPipeConsensusServiceClient, boolean z, TCommitId tCommitId, TConsensusGroupId tConsensusGroupId) throws PipeException, IOException {
        TPipeConsensusTransferReq tPipeConsensusTransferReq;
        int pipeConnectorReadFileBufferSize = PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
        byte[] bArr = new byte[pipeConnectorReadFileBufferSize];
        long j = 0;
        RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r");
        while (true) {
            try {
                int read = randomAccessFile.read(bArr);
                if (read == -1) {
                    randomAccessFile.close();
                    return;
                }
                byte[] copyOfRange = read == pipeConnectorReadFileBufferSize ? bArr : Arrays.copyOfRange(bArr, 0, read);
                if (z) {
                    try {
                        tPipeConsensusTransferReq = PipeConsensusTsFilePieceWithModReq.toTPipeConsensusTransferReq(file.getName(), j, copyOfRange, tCommitId, tConsensusGroupId, this.thisDataNodeId);
                    } catch (Exception e) {
                        throw new PipeConnectionException(String.format("Network error when transfer file %s, because %s.", file, e.getMessage()), e);
                    }
                } else {
                    tPipeConsensusTransferReq = PipeConsensusTsFilePieceReq.toTPipeConsensusTransferReq(file.getName(), j, copyOfRange, tCommitId, tConsensusGroupId, this.thisDataNodeId);
                }
                PipeConsensusTransferFilePieceResp fromTPipeConsensusTransferResp = PipeConsensusTransferFilePieceResp.fromTPipeConsensusTransferResp(syncPipeConsensusServiceClient.pipeConsensusTransfer(tPipeConsensusTransferReq));
                j += read;
                TSStatus status = fromTPipeConsensusTransferResp.getStatus();
                if (status.getCode() == TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_OFFSET_RESET.getStatusCode()) {
                    j = fromTPipeConsensusTransferResp.getEndWritingOffset();
                    randomAccessFile.seek(j);
                    LOGGER.info("Redirect file position to {}.", Long.valueOf(j));
                } else if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
                    this.receiverStatusHandler.handle(fromTPipeConsensusTransferResp.getStatus(), String.format("Transfer file %s error, result status %s.", file, fromTPipeConsensusTransferResp.getStatus()), file.getName());
                }
            } catch (Throwable th) {
                try {
                    randomAccessFile.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
    }

    private TEndPoint getFollowerUrl() {
        return this.peers.get(0);
    }

    public synchronized void close() {
        super.close();
        if (this.syncRetryClientManager != null) {
            this.syncRetryClientManager.close();
        }
        if (this.tabletBatchBuilder != null) {
            this.tabletBatchBuilder.close();
        }
    }
}
