package org.apache.iotdb.confignode.manager.pipe.connector.protocol;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClientManager;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFilePieceReq;
import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBSslSyncConnector;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.manager.pipe.connector.client.IoTDBConfigNodeSyncClientManager;
import org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigPlanReq;
import org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigSnapshotPieceReq;
import org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigSnapshotSealReq;
import org.apache.iotdb.confignode.manager.pipe.event.PipeConfigRegionSnapshotEvent;
import org.apache.iotdb.confignode.manager.pipe.event.PipeConfigRegionWritePlanEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.class */
public class IoTDBConfigRegionConnector extends IoTDBSslSyncConnector {
    private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBConfigRegionConnector.class);

    protected IoTDBSyncClientManager constructClient(List<TEndPoint> list, String str, String str2, boolean z, String str3, String str4, boolean z2, String str5, boolean z3, String str6, boolean z4, boolean z5) {
        return new IoTDBConfigNodeSyncClientManager(list, str, str2, z, Objects.nonNull(str3) ? ConfigNodeConfig.addHomeDir(str3) : null, str4, str5, z3, str6, z4, z5);
    }

    protected PipeTransferFilePieceReq getTransferSingleFilePieceReq(String str, long j, byte[] bArr) {
        throw new UnsupportedOperationException("The config region connector does not support transferring single file piece req.");
    }

    protected PipeTransferFilePieceReq getTransferMultiFilePieceReq(String str, long j, byte[] bArr) throws IOException {
        return PipeTransferConfigSnapshotPieceReq.toTPipeTransferReq(str, j, bArr);
    }

    public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
        throw new UnsupportedOperationException("IoTDBConfigRegionConnector can't transfer TabletInsertionEvent.");
    }

    public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
        throw new UnsupportedOperationException("IoTDBConfigRegionConnector can't transfer TsFileInsertionEvent.");
    }

    public void transfer(Event event) throws Exception {
        if (event instanceof PipeConfigRegionWritePlanEvent) {
            doTransferWrapper((PipeConfigRegionWritePlanEvent) event);
        } else if (event instanceof PipeConfigRegionSnapshotEvent) {
            doTransferWrapper((PipeConfigRegionSnapshotEvent) event);
        } else {
            if (event instanceof PipeHeartbeatEvent) {
                return;
            }
            LOGGER.warn("IoTDBConfigRegionConnector does not support transferring generic event: {}.", event);
        }
    }

    private void doTransferWrapper(PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent) throws PipeException {
        if (pipeConfigRegionWritePlanEvent.increaseReferenceCount(IoTDBConfigRegionConnector.class.getName())) {
            try {
                doTransfer(pipeConfigRegionWritePlanEvent);
            } finally {
                pipeConfigRegionWritePlanEvent.decreaseReferenceCount(IoTDBConfigRegionConnector.class.getName(), false);
            }
        }
    }

    private void doTransfer(PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent) throws PipeException {
        Pair client = this.clientManager.getClient();
        try {
            TPipeTransferReq compressIfNeeded = compressIfNeeded(PipeTransferConfigPlanReq.toTPipeTransferReq(pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan()));
            rateLimitIfNeeded(pipeConfigRegionWritePlanEvent.getPipeName(), pipeConfigRegionWritePlanEvent.getCreationTime(), ((IoTDBSyncClient) client.getLeft()).getEndPoint(), compressIfNeeded.getBody().length);
            TSStatus status = ((IoTDBSyncClient) client.getLeft()).pipeTransfer(compressIfNeeded).getStatus();
            if (status.getCode() == TSStatusCode.PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED.getStatusCode()) {
                this.clientManager.sendHandshakeReq(client);
            }
            if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
                this.receiverStatusHandler.handle(status, String.format("Transfer config region write plan %s error, result status %s.", pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan().getType(), status), pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan().toString());
            }
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Successfully transferred config event {}.", pipeConfigRegionWritePlanEvent);
            }
        } catch (Exception e) {
            client.setRight(false);
            throw new PipeConnectionException(String.format("Network error when transfer config region write plan %s, because %s.", pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan().getType(), e.getMessage()), e);
        }
    }

    private void doTransferWrapper(PipeConfigRegionSnapshotEvent pipeConfigRegionSnapshotEvent) throws PipeException, IOException {
        if (pipeConfigRegionSnapshotEvent.increaseReferenceCount(IoTDBConfigRegionConnector.class.getName())) {
            try {
                doTransfer(pipeConfigRegionSnapshotEvent);
            } finally {
                pipeConfigRegionSnapshotEvent.decreaseReferenceCount(IoTDBConfigRegionConnector.class.getName(), false);
            }
        }
    }

    private void doTransfer(PipeConfigRegionSnapshotEvent pipeConfigRegionSnapshotEvent) throws PipeException, IOException {
        String pipeName = pipeConfigRegionSnapshotEvent.getPipeName();
        long creationTime = pipeConfigRegionSnapshotEvent.getCreationTime();
        File snapshotFile = pipeConfigRegionSnapshotEvent.getSnapshotFile();
        File templateFile = pipeConfigRegionSnapshotEvent.getTemplateFile();
        Pair client = this.clientManager.getClient();
        transferFilePieces(Collections.singletonMap(new Pair(pipeName, Long.valueOf(creationTime)), Double.valueOf(1.0d)), snapshotFile, client, true);
        if (Objects.nonNull(templateFile)) {
            transferFilePieces(Collections.singletonMap(new Pair(pipeName, Long.valueOf(creationTime)), Double.valueOf(1.0d)), templateFile, client, true);
        }
        try {
            TPipeTransferReq compressIfNeeded = compressIfNeeded(PipeTransferConfigSnapshotSealReq.toTPipeTransferReq(pipeConfigRegionSnapshotEvent.getPatternString(), snapshotFile.getName(), snapshotFile.length(), Objects.nonNull(templateFile) ? templateFile.getName() : null, Objects.nonNull(templateFile) ? templateFile.length() : 0L, pipeConfigRegionSnapshotEvent.getFileType(), pipeConfigRegionSnapshotEvent.toSealTypeString()));
            rateLimitIfNeeded(pipeConfigRegionSnapshotEvent.getPipeName(), pipeConfigRegionSnapshotEvent.getCreationTime(), ((IoTDBSyncClient) client.getLeft()).getEndPoint(), compressIfNeeded.getBody().length);
            TPipeTransferResp pipeTransfer = ((IoTDBSyncClient) client.getLeft()).pipeTransfer(compressIfNeeded);
            TSStatus status = pipeTransfer.getStatus();
            if (status.getCode() == TSStatusCode.PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED.getStatusCode()) {
                this.clientManager.sendHandshakeReq(client);
            }
            if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
                this.receiverStatusHandler.handle(status, String.format("Seal config region snapshot file %s error, result status %s.", snapshotFile, pipeTransfer.getStatus()), snapshotFile.toString());
            }
            LOGGER.info("Successfully transferred config region snapshot {}.", snapshotFile);
        } catch (Exception e) {
            client.setRight(false);
            throw new PipeConnectionException(String.format("Network error when seal config region snapshot %s, because %s.", snapshotFile, e.getMessage()), e);
        }
    }
}
