package org.apache.iotdb.db.pipe.connector.protocol.thrift.sync;

import java.io.File;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFilePieceReq;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventBatch;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventPlainBatch;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTransferBatchReqBuilder;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBatchReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
import org.apache.iotdb.db.pipe.connector.util.LeaderCacheUtils;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.class */
public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector {
    private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionSyncConnector.class);
    private PipeTransferBatchReqBuilder tabletBatchBuilder;

    public void customize(PipeParameters pipeParameters, PipeConnectorRuntimeConfiguration pipeConnectorRuntimeConfiguration) throws Exception {
        super.customize(pipeParameters, pipeConnectorRuntimeConfiguration);
        if (this.isTabletBatchModeEnabled) {
            this.tabletBatchBuilder = new PipeTransferBatchReqBuilder(pipeParameters);
        }
    }

    protected PipeTransferFilePieceReq getTransferSingleFilePieceReq(String str, long j, byte[] bArr) throws IOException {
        return PipeTransferTsFilePieceReq.toTPipeTransferReq(str, j, bArr);
    }

    protected PipeTransferFilePieceReq getTransferMultiFilePieceReq(String str, long j, byte[] bArr) throws IOException {
        return PipeTransferTsFilePieceWithModReq.toTPipeTransferReq(str, j, bArr);
    }

    public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
        if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
            LOGGER.warn("IoTDBThriftSyncConnector only support PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent. Ignore {}.", tabletInsertionEvent);
            return;
        }
        try {
            if (this.isTabletBatchModeEnabled) {
                Pair<TEndPoint, PipeTabletEventBatch> onEvent = this.tabletBatchBuilder.onEvent(tabletInsertionEvent);
                if (Objects.nonNull(onEvent)) {
                    doTransferWrapper(onEvent);
                }
            } else if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
                doTransferWrapper((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
            } else {
                doTransferWrapper((PipeRawTabletInsertionEvent) tabletInsertionEvent);
            }
        } catch (Exception e) {
            throw new PipeConnectionException(String.format("Failed to transfer tablet insertion event %s, because %s.", ((EnrichedEvent) tabletInsertionEvent).coreReportMessage(), e.getMessage()), e);
        }
    }

    public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
        if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
            LOGGER.warn("IoTDBThriftSyncConnector only support PipeTsFileInsertionEvent. Ignore {}.", tsFileInsertionEvent);
            return;
        }
        try {
            if (this.isTabletBatchModeEnabled && !this.tabletBatchBuilder.isEmpty()) {
                doTransferWrapper();
            }
            doTransferWrapper((PipeTsFileInsertionEvent) tsFileInsertionEvent);
        } catch (Exception e) {
            throw new PipeConnectionException(String.format("Failed to transfer tsfile insertion event %s, because %s.", ((PipeTsFileInsertionEvent) tsFileInsertionEvent).coreReportMessage(), e.getMessage()), e);
        }
    }

    public void transfer(Event event) throws Exception {
        if (event instanceof PipeSchemaRegionWritePlanEvent) {
            doTransferWrapper((PipeSchemaRegionWritePlanEvent) event);
            return;
        }
        if (this.isTabletBatchModeEnabled && !this.tabletBatchBuilder.isEmpty()) {
            doTransferWrapper();
        }
        if ((event instanceof PipeHeartbeatEvent) || (event instanceof PipeTerminateEvent)) {
            return;
        }
        LOGGER.warn("IoTDBThriftSyncConnector does not support transferring generic event: {}.", event);
    }

    private void doTransferWrapper() throws IOException, WriteProcessException {
        Iterator<Pair<TEndPoint, PipeTabletEventBatch>> it = this.tabletBatchBuilder.getAllNonEmptyBatches().iterator();
        while (it.hasNext()) {
            doTransferWrapper(it.next());
        }
    }

    private void doTransferWrapper(Pair<TEndPoint, PipeTabletEventBatch> pair) throws IOException, WriteProcessException {
        PipeTabletEventBatch pipeTabletEventBatch = (PipeTabletEventBatch) pair.getRight();
        if (pipeTabletEventBatch instanceof PipeTabletEventPlainBatch) {
            doTransfer((TEndPoint) pair.getLeft(), (PipeTabletEventPlainBatch) pipeTabletEventBatch);
        } else if (pipeTabletEventBatch instanceof PipeTabletEventTsFileBatch) {
            doTransfer((PipeTabletEventTsFileBatch) pipeTabletEventBatch);
        } else {
            LOGGER.warn("Unsupported batch type {}.", pipeTabletEventBatch.getClass());
        }
        pipeTabletEventBatch.decreaseEventsReferenceCount(IoTDBDataRegionSyncConnector.class.getName(), true);
        pipeTabletEventBatch.onSuccess();
    }

    private void doTransfer(TEndPoint tEndPoint, PipeTabletEventPlainBatch pipeTabletEventPlainBatch) {
        Pair<IoTDBSyncClient, Boolean> client = this.clientManager.getClient(tEndPoint);
        try {
            PipeTransferTabletBatchReq tPipeTransferReq = pipeTabletEventPlainBatch.toTPipeTransferReq();
            long length = tPipeTransferReq.getBody().length;
            TPipeTransferReq compressIfNeeded = compressIfNeeded(tPipeTransferReq);
            double length2 = compressIfNeeded.getBody().length / length;
            for (Map.Entry<Pair<String, Long>, Long> entry : pipeTabletEventPlainBatch.getPipe2BytesAccumulated().entrySet()) {
                rateLimitIfNeeded((String) entry.getKey().getLeft(), ((Long) entry.getKey().getRight()).longValue(), ((IoTDBSyncClient) client.getLeft()).getEndPoint(), (long) (entry.getValue().longValue() * length2));
            }
            TPipeTransferResp pipeTransfer = ((IoTDBSyncClient) client.getLeft()).pipeTransfer(compressIfNeeded);
            TSStatus status = pipeTransfer.getStatus();
            if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
                this.receiverStatusHandler.handle(pipeTransfer.getStatus(), String.format("Transfer PipeTransferTabletBatchReq error, result status %s", pipeTransfer.status), pipeTabletEventPlainBatch.deepCopyEvents().toString());
            }
            for (Pair<String, TEndPoint> pair : LeaderCacheUtils.parseRecommendedRedirections(status)) {
                this.clientManager.updateLeaderCache((String) pair.getLeft(), (TEndPoint) pair.getRight());
            }
        } catch (Exception e) {
            client.setRight(false);
            throw new PipeConnectionException(String.format("Network error when transfer tablet batch, because %s.", e.getMessage()), e);
        }
    }

    private void doTransfer(PipeTabletEventTsFileBatch pipeTabletEventTsFileBatch) throws IOException, WriteProcessException {
        List<File> sealTsFiles = pipeTabletEventTsFileBatch.sealTsFiles();
        Map<Pair<String, Long>, Double> deepCopyPipe2WeightMap = pipeTabletEventTsFileBatch.deepCopyPipe2WeightMap();
        for (File file : sealTsFiles) {
            doTransfer(deepCopyPipe2WeightMap, file, null);
            try {
                FileUtils.delete(file);
            } catch (NoSuchFileException e) {
                LOGGER.info("The file {} is not found, may already be deleted.", file);
            } catch (Exception e2) {
                LOGGER.warn("Failed to delete batch file {}, this file should be deleted manually later", file);
            }
        }
    }

    private void doTransferWrapper(PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent) throws PipeException {
        if (pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(IoTDBDataRegionSyncConnector.class.getName())) {
            try {
                doTransfer(pipeInsertNodeTabletInsertionEvent);
            } finally {
                pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(IoTDBDataRegionSyncConnector.class.getName(), false);
            }
        }
    }

    private void doTransfer(PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent) throws PipeException {
        Pair<IoTDBSyncClient, Boolean> pair = null;
        try {
            pair = this.clientManager.getClient(pipeInsertNodeTabletInsertionEvent.getDeviceId());
            InsertNode insertNodeViaCacheIfPossible = pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
            TPipeTransferReq compressIfNeeded = compressIfNeeded(insertNodeViaCacheIfPossible != null ? PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNodeViaCacheIfPossible) : PipeTransferTabletBinaryReq.toTPipeTransferReq(pipeInsertNodeTabletInsertionEvent.getByteBuffer()));
            rateLimitIfNeeded(pipeInsertNodeTabletInsertionEvent.getPipeName(), pipeInsertNodeTabletInsertionEvent.getCreationTime(), ((IoTDBSyncClient) pair.getLeft()).getEndPoint(), compressIfNeeded.getBody().length);
            TSStatus status = ((IoTDBSyncClient) pair.getLeft()).pipeTransfer(compressIfNeeded).getStatus();
            if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
                this.receiverStatusHandler.handle(status, String.format("Transfer PipeInsertNodeTabletInsertionEvent %s error, result status %s", pipeInsertNodeTabletInsertionEvent.coreReportMessage(), status), pipeInsertNodeTabletInsertionEvent.toString());
            }
            if (status.isSetRedirectNode()) {
                this.clientManager.updateLeaderCache(pipeInsertNodeTabletInsertionEvent.getDeviceId(), status.getRedirectNode());
            }
        } catch (Exception e) {
            if (pair != null) {
                pair.setRight(false);
            }
            throw new PipeConnectionException(String.format("Network error when transfer insert node tablet insertion event, because %s.", e.getMessage()), e);
        }
    }

    private void doTransferWrapper(PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent) throws PipeException {
        if (pipeRawTabletInsertionEvent.increaseReferenceCount(IoTDBDataRegionSyncConnector.class.getName())) {
            try {
                doTransfer(pipeRawTabletInsertionEvent);
            } finally {
                pipeRawTabletInsertionEvent.decreaseReferenceCount(IoTDBDataRegionSyncConnector.class.getName(), false);
            }
        }
    }

    private void doTransfer(PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent) throws PipeException {
        Pair<IoTDBSyncClient, Boolean> client = this.clientManager.getClient(pipeRawTabletInsertionEvent.getDeviceId());
        try {
            TPipeTransferReq compressIfNeeded = compressIfNeeded(PipeTransferTabletRawReq.toTPipeTransferReq(pipeRawTabletInsertionEvent.convertToTablet(), pipeRawTabletInsertionEvent.isAligned()));
            rateLimitIfNeeded(pipeRawTabletInsertionEvent.getPipeName(), pipeRawTabletInsertionEvent.getCreationTime(), ((IoTDBSyncClient) client.getLeft()).getEndPoint(), compressIfNeeded.getBody().length);
            TSStatus status = ((IoTDBSyncClient) client.getLeft()).pipeTransfer(compressIfNeeded).getStatus();
            if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
                this.receiverStatusHandler.handle(status, String.format("Transfer PipeRawTabletInsertionEvent %s error, result status %s", pipeRawTabletInsertionEvent.coreReportMessage(), status), pipeRawTabletInsertionEvent.toString());
            }
            if (status.isSetRedirectNode()) {
                this.clientManager.updateLeaderCache(pipeRawTabletInsertionEvent.getDeviceId(), status.getRedirectNode());
            }
        } catch (Exception e) {
            client.setRight(false);
            throw new PipeConnectionException(String.format("Network error when transfer raw tablet insertion event, because %s.", e.getMessage()), e);
        }
    }

    private void doTransferWrapper(PipeTsFileInsertionEvent pipeTsFileInsertionEvent) throws PipeException, IOException {
        if (pipeTsFileInsertionEvent.increaseReferenceCount(IoTDBDataRegionSyncConnector.class.getName())) {
            try {
                doTransfer(Collections.singletonMap(new Pair(pipeTsFileInsertionEvent.getPipeName(), Long.valueOf(pipeTsFileInsertionEvent.getCreationTime())), Double.valueOf(1.0d)), pipeTsFileInsertionEvent.getTsFile(), pipeTsFileInsertionEvent.isWithMod() ? pipeTsFileInsertionEvent.getModFile() : null);
            } finally {
                pipeTsFileInsertionEvent.decreaseReferenceCount(IoTDBDataRegionSyncConnector.class.getName(), false);
            }
        }
    }

    private void doTransfer(Map<Pair<String, Long>, Double> map, File file, File file2) throws PipeException, IOException {
        TPipeTransferResp pipeTransfer;
        Pair client = this.clientManager.getClient();
        if (Objects.nonNull(file2) && this.clientManager.supportModsIfIsDataNodeReceiver()) {
            transferFilePieces(map, file2, client, true);
            transferFilePieces(map, file, client, true);
            try {
                TPipeTransferReq compressIfNeeded = compressIfNeeded(PipeTransferTsFileSealWithModReq.toTPipeTransferReq(file2.getName(), file2.length(), file.getName(), file.length()));
                map.forEach((pair, d) -> {
                    rateLimitIfNeeded((String) pair.getLeft(), ((Long) pair.getRight()).longValue(), ((IoTDBSyncClient) client.getLeft()).getEndPoint(), (long) (compressIfNeeded.getBody().length * d.doubleValue()));
                });
                pipeTransfer = ((IoTDBSyncClient) client.getLeft()).pipeTransfer(compressIfNeeded);
            } catch (Exception e) {
                client.setRight(false);
                this.clientManager.adjustTimeoutIfNecessary(e);
                throw new PipeConnectionException(String.format("Network error when seal file %s, because %s.", file, e.getMessage()), e);
            }
        } else {
            transferFilePieces(map, file, client, false);
            try {
                TPipeTransferReq compressIfNeeded2 = compressIfNeeded(PipeTransferTsFileSealReq.toTPipeTransferReq(file.getName(), file.length()));
                map.forEach((pair2, d2) -> {
                    rateLimitIfNeeded((String) pair2.getLeft(), ((Long) pair2.getRight()).longValue(), ((IoTDBSyncClient) client.getLeft()).getEndPoint(), (long) (compressIfNeeded2.getBody().length * d2.doubleValue()));
                });
                pipeTransfer = ((IoTDBSyncClient) client.getLeft()).pipeTransfer(compressIfNeeded2);
            } catch (Exception e2) {
                client.setRight(false);
                this.clientManager.adjustTimeoutIfNecessary(e2);
                throw new PipeConnectionException(String.format("Network error when seal file %s, because %s.", file, e2.getMessage()), e2);
            }
        }
        TSStatus status = pipeTransfer.getStatus();
        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
            this.receiverStatusHandler.handle(pipeTransfer.getStatus(), String.format("Seal file %s error, result status %s.", file, pipeTransfer.getStatus()), file.getName());
        }
        LOGGER.info("Successfully transferred file {}.", file);
    }

    public synchronized void discardEventsOfPipe(String str, int i) {
        this.tabletBatchBuilder.discardEventsOfPipe(str, i);
    }

    public void close() {
        if (this.tabletBatchBuilder != null) {
            this.tabletBatchBuilder.close();
        }
        super.close();
    }
}
