package org.apache.iotdb.db.pipe.connector.protocol.airgap;

import java.io.File;
import java.io.IOException;
import java.util.Objects;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBAirGapConnector;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferPlanNodeReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReqV2;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReqV2;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReqV2;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.annotation.TableModel;
import org.apache.iotdb.pipe.api.annotation.TreeModel;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@TreeModel
@TableModel
/* loaded from: input_file:org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.class */
public class IoTDBDataRegionAirGapConnector extends IoTDBDataNodeAirGapConnector {
    private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionAirGapConnector.class);

    public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
        if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
            LOGGER.warn("IoTDBDataRegionAirGapConnector only support PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent. Ignore {}.", tabletInsertionEvent);
            return;
        }
        int nextSocketIndex = nextSocketIndex();
        IoTDBAirGapConnector.AirGapSocket airGapSocket = (IoTDBAirGapConnector.AirGapSocket) this.sockets.get(nextSocketIndex);
        try {
            if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
                doTransferWrapper(airGapSocket, (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
            } else {
                doTransferWrapper(airGapSocket, (PipeRawTabletInsertionEvent) tabletInsertionEvent);
            }
        } catch (IOException e) {
            this.isSocketAlive.set(nextSocketIndex, false);
            throw new PipeConnectionException(String.format("Network error when transfer tablet insertion event %s, because %s.", ((EnrichedEvent) tabletInsertionEvent).coreReportMessage(), e.getMessage()), e);
        }
    }

    public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
        if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
            LOGGER.warn("IoTDBDataRegionAirGapConnector only support PipeTsFileInsertionEvent. Ignore {}.", tsFileInsertionEvent);
            return;
        }
        if (!((PipeTsFileInsertionEvent) tsFileInsertionEvent).waitForTsFileClose()) {
            LOGGER.warn("Pipe skipping temporary TsFile which shouldn't be transferred: {}", ((PipeTsFileInsertionEvent) tsFileInsertionEvent).getTsFile());
            return;
        }
        int nextSocketIndex = nextSocketIndex();
        try {
            doTransferWrapper((IoTDBAirGapConnector.AirGapSocket) this.sockets.get(nextSocketIndex), (PipeTsFileInsertionEvent) tsFileInsertionEvent);
        } catch (IOException e) {
            this.isSocketAlive.set(nextSocketIndex, false);
            throw new PipeConnectionException(String.format("Network error when transfer tsfile insertion event %s, because %s.", ((PipeTsFileInsertionEvent) tsFileInsertionEvent).coreReportMessage(), e.getMessage()), e);
        }
    }

    public void transfer(Event event) throws Exception {
        int nextSocketIndex = nextSocketIndex();
        IoTDBAirGapConnector.AirGapSocket airGapSocket = (IoTDBAirGapConnector.AirGapSocket) this.sockets.get(nextSocketIndex);
        try {
            if (event instanceof PipeDeleteDataNodeEvent) {
                doTransferWrapper(airGapSocket, (PipeDeleteDataNodeEvent) event);
            } else if (!(event instanceof PipeHeartbeatEvent) && !(event instanceof PipeTerminateEvent)) {
                LOGGER.warn("IoTDBDataRegionAirGapConnector does not support transferring generic event: {}.", event);
            }
        } catch (IOException e) {
            this.isSocketAlive.set(nextSocketIndex, false);
            throw new PipeConnectionException(String.format("Network error when transfer tsfile event %s, because %s.", ((PipeDeleteDataNodeEvent) event).coreReportMessage(), e.getMessage()), e);
        }
    }

    private void doTransferWrapper(IoTDBAirGapConnector.AirGapSocket airGapSocket, PipeDeleteDataNodeEvent pipeDeleteDataNodeEvent) throws PipeException, IOException {
        if (pipeDeleteDataNodeEvent.increaseReferenceCount(IoTDBDataNodeAirGapConnector.class.getName())) {
            try {
                doTransfer(airGapSocket, pipeDeleteDataNodeEvent);
            } finally {
                pipeDeleteDataNodeEvent.decreaseReferenceCount(IoTDBDataNodeAirGapConnector.class.getName(), false);
            }
        }
    }

    private void doTransfer(IoTDBAirGapConnector.AirGapSocket airGapSocket, PipeDeleteDataNodeEvent pipeDeleteDataNodeEvent) throws PipeException, IOException {
        if (send(pipeDeleteDataNodeEvent.getPipeName(), pipeDeleteDataNodeEvent.getCreationTime(), airGapSocket, PipeTransferPlanNodeReq.toTPipeTransferBytes(pipeDeleteDataNodeEvent.getDeleteDataNode()))) {
            return;
        }
        String format = String.format("Transfer deletion %s error. Socket: %s.", pipeDeleteDataNodeEvent.getDeleteDataNode().getType(), airGapSocket);
        this.receiverStatusHandler.handle(new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()).setMessage(format), format, pipeDeleteDataNodeEvent.toString());
    }

    private void doTransferWrapper(IoTDBAirGapConnector.AirGapSocket airGapSocket, PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent) throws PipeException, WALPipeException, IOException {
        if (pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(IoTDBDataRegionAirGapConnector.class.getName())) {
            try {
                doTransfer(airGapSocket, pipeInsertNodeTabletInsertionEvent);
            } finally {
                pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(IoTDBDataRegionAirGapConnector.class.getName(), false);
            }
        }
    }

    private void doTransfer(IoTDBAirGapConnector.AirGapSocket airGapSocket, PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent) throws PipeException, WALPipeException, IOException {
        byte[] tPipeTransferBytes;
        InsertNode insertNodeViaCacheIfPossible = pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
        if (Objects.isNull(insertNodeViaCacheIfPossible)) {
            tPipeTransferBytes = PipeTransferTabletBinaryReqV2.toTPipeTransferBytes(pipeInsertNodeTabletInsertionEvent.getByteBuffer(), pipeInsertNodeTabletInsertionEvent.isTableModelEvent() ? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName() : null);
        } else {
            tPipeTransferBytes = PipeTransferTabletInsertNodeReqV2.toTPipeTransferBytes(insertNodeViaCacheIfPossible, pipeInsertNodeTabletInsertionEvent.isTableModelEvent() ? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName() : null);
        }
        if (send(pipeInsertNodeTabletInsertionEvent.getPipeName(), pipeInsertNodeTabletInsertionEvent.getCreationTime(), airGapSocket, tPipeTransferBytes)) {
            return;
        }
        String format = String.format("Transfer PipeInsertNodeTabletInsertionEvent %s error. Socket: %s", pipeInsertNodeTabletInsertionEvent, airGapSocket);
        this.receiverStatusHandler.handle(new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()).setMessage(format), format, pipeInsertNodeTabletInsertionEvent.toString());
    }

    private void doTransferWrapper(IoTDBAirGapConnector.AirGapSocket airGapSocket, PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent) throws PipeException, IOException {
        if (pipeRawTabletInsertionEvent.increaseReferenceCount(IoTDBDataRegionAirGapConnector.class.getName())) {
            try {
                doTransfer(airGapSocket, pipeRawTabletInsertionEvent);
            } finally {
                pipeRawTabletInsertionEvent.decreaseReferenceCount(IoTDBDataRegionAirGapConnector.class.getName(), false);
            }
        }
    }

    private void doTransfer(IoTDBAirGapConnector.AirGapSocket airGapSocket, PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent) throws PipeException, IOException {
        if (send(pipeRawTabletInsertionEvent.getPipeName(), pipeRawTabletInsertionEvent.getCreationTime(), airGapSocket, PipeTransferTabletRawReqV2.toTPipeTransferBytes(pipeRawTabletInsertionEvent.convertToTablet(), pipeRawTabletInsertionEvent.isAligned(), pipeRawTabletInsertionEvent.isTableModelEvent() ? pipeRawTabletInsertionEvent.getTableModelDatabaseName() : null))) {
            return;
        }
        String format = String.format("Transfer PipeRawTabletInsertionEvent %s error. Socket: %s.", pipeRawTabletInsertionEvent, airGapSocket);
        this.receiverStatusHandler.handle(new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()).setMessage(format), format, pipeRawTabletInsertionEvent.toString());
    }

    private void doTransferWrapper(IoTDBAirGapConnector.AirGapSocket airGapSocket, PipeTsFileInsertionEvent pipeTsFileInsertionEvent) throws PipeException, IOException {
        if (pipeTsFileInsertionEvent.increaseReferenceCount(IoTDBDataRegionAirGapConnector.class.getName())) {
            try {
                doTransfer(airGapSocket, pipeTsFileInsertionEvent);
            } finally {
                pipeTsFileInsertionEvent.decreaseReferenceCount(IoTDBDataRegionAirGapConnector.class.getName(), false);
            }
        }
    }

    private void doTransfer(IoTDBAirGapConnector.AirGapSocket airGapSocket, PipeTsFileInsertionEvent pipeTsFileInsertionEvent) throws PipeException, IOException {
        String pipeName = pipeTsFileInsertionEvent.getPipeName();
        long creationTime = pipeTsFileInsertionEvent.getCreationTime();
        File tsFile = pipeTsFileInsertionEvent.getTsFile();
        String format = String.format("Seal file %s error. Socket %s.", tsFile, airGapSocket);
        if (!pipeTsFileInsertionEvent.isWithMod() || !this.supportModsIfIsDataNodeReceiver) {
            transferFilePieces(pipeName, creationTime, tsFile, airGapSocket, false);
            if (send(pipeName, creationTime, airGapSocket, PipeTransferTsFileSealWithModReq.toTPipeTransferBytes(tsFile.getName(), tsFile.length(), pipeTsFileInsertionEvent.isTableModelEvent() ? pipeTsFileInsertionEvent.getTableModelDatabaseName() : null))) {
                LOGGER.info("Successfully transferred file {}.", tsFile);
                return;
            } else {
                this.receiverStatusHandler.handle(new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()).setMessage(format), format, pipeTsFileInsertionEvent.toString());
                return;
            }
        }
        File modFile = pipeTsFileInsertionEvent.getModFile();
        transferFilePieces(pipeName, creationTime, modFile, airGapSocket, true);
        transferFilePieces(pipeName, creationTime, tsFile, airGapSocket, true);
        if (send(pipeName, creationTime, airGapSocket, PipeTransferTsFileSealWithModReq.toTPipeTransferBytes(modFile.getName(), modFile.length(), tsFile.getName(), tsFile.length(), pipeTsFileInsertionEvent.isTableModelEvent() ? pipeTsFileInsertionEvent.getTableModelDatabaseName() : null))) {
            LOGGER.info("Successfully transferred file {}.", tsFile);
        } else {
            this.receiverStatusHandler.handle(new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()).setMessage(format), format, pipeTsFileInsertionEvent.toString());
        }
    }

    protected byte[] getTransferSingleFilePieceBytes(String str, long j, byte[] bArr) throws IOException {
        return PipeTransferTsFilePieceReq.toTPipeTransferBytes(str, j, bArr);
    }

    protected byte[] getTransferMultiFilePieceBytes(String str, long j, byte[] bArr) throws IOException {
        return PipeTransferTsFilePieceWithModReq.toTPipeTransferBytes(str, j, bArr);
    }
}
