package org.apache.iotdb.db.pipe.connector.protocol.writeback;

import java.time.ZoneId;
import java.util.Objects;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.SubStringFunctionColumnTransformer;
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.class */
public class WriteBackConnector implements PipeConnector {
    private static final Logger LOGGER = LoggerFactory.getLogger(WriteBackConnector.class);

    public void validate(PipeParameterValidator pipeParameterValidator) throws Exception {
    }

    public void customize(PipeParameters pipeParameters, PipeConnectorRuntimeConfiguration pipeConnectorRuntimeConfiguration) throws Exception {
    }

    public void handshake() throws Exception {
    }

    public void heartbeat() throws Exception {
    }

    public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
        if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
            LOGGER.warn("WriteBackConnector only support PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent. Ignore {}.", tabletInsertionEvent);
        } else if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
            doTransferWrapper((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
        } else {
            doTransferWrapper((PipeRawTabletInsertionEvent) tabletInsertionEvent);
        }
    }

    public void transfer(Event event) throws Exception {
        if ((event instanceof PipeHeartbeatEvent) || (event instanceof PipeTerminateEvent)) {
            return;
        }
        LOGGER.warn("WriteBackConnector does not support transferring generic event: {}.", event);
    }

    private void doTransferWrapper(PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent) throws PipeException, WALPipeException {
        if (pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(WriteBackConnector.class.getName())) {
            try {
                doTransfer(pipeInsertNodeTabletInsertionEvent);
            } finally {
                pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(WriteBackConnector.class.getName(), false);
            }
        }
    }

    private void doTransfer(PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent) throws PipeException, WALPipeException {
        TSStatus executeStatement;
        InsertNode insertNodeViaCacheIfPossible = pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
        if (Objects.isNull(insertNodeViaCacheIfPossible)) {
            executeStatement = PipeDataNodeAgent.receiver().thrift().receive(PipeTransferTabletBinaryReq.toTPipeTransferReq(pipeInsertNodeTabletInsertionEvent.getByteBuffer())).getStatus();
        } else {
            InsertBaseStatement constructStatement = PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(insertNodeViaCacheIfPossible).constructStatement();
            executeStatement = constructStatement.isEmpty() ? RpcUtils.SUCCESS_STATUS : executeStatement(constructStatement);
        }
        if (executeStatement.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            throw new PipeException(String.format("Transfer PipeInsertNodeTabletInsertionEvent %s error, result status %s", pipeInsertNodeTabletInsertionEvent, executeStatement));
        }
    }

    private void doTransferWrapper(PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent) throws PipeException {
        if (pipeRawTabletInsertionEvent.increaseReferenceCount(WriteBackConnector.class.getName())) {
            try {
                doTransfer(pipeRawTabletInsertionEvent);
            } finally {
                pipeRawTabletInsertionEvent.decreaseReferenceCount(WriteBackConnector.class.getName(), false);
            }
        }
    }

    private void doTransfer(PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent) throws PipeException {
        InsertTabletStatement constructStatement = PipeTransferTabletRawReq.toTPipeTransferRawReq(pipeRawTabletInsertionEvent.convertToTablet(), pipeRawTabletInsertionEvent.isAligned()).constructStatement();
        TSStatus executeStatement = constructStatement.isEmpty() ? RpcUtils.SUCCESS_STATUS : executeStatement(constructStatement);
        if (executeStatement.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            throw new PipeException(String.format("Transfer PipeRawTabletInsertionEvent %s error, result status %s", pipeRawTabletInsertionEvent, executeStatement));
        }
    }

    private TSStatus executeStatement(InsertBaseStatement insertBaseStatement) {
        return Coordinator.getInstance().executeForTreeModel(new PipeEnrichedStatement(insertBaseStatement), SessionManager.getInstance().requestQueryId(), new SessionInfo(0L, AuthorityChecker.SUPER_USER, ZoneId.systemDefault()), SubStringFunctionColumnTransformer.EMPTY_STRING, ClusterPartitionFetcher.getInstance(), ClusterSchemaFetcher.getInstance(), IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(), false).status;
    }

    public void close() throws Exception {
    }
}
