package org.apache.iotdb.db.protocol.mqtt;

import io.moquette.interception.AbstractInterceptHandler;
import io.moquette.interception.messages.InterceptConnectMessage;
import io.moquette.interception.messages.InterceptDisconnectMessage;
import io.moquette.interception.messages.InterceptPublishMessage;
import io.netty.buffer.ByteBuf;
import java.time.ZoneId;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.protocol.session.IClientSession;
import org.apache.iotdb.db.protocol.session.MqttClientSession;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.BitMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.class */
public class MPPPublishHandler extends AbstractInterceptHandler {
    private static final Logger LOG = LoggerFactory.getLogger(MPPPublishHandler.class);
    private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
    private final PayloadFormatter payloadFormat;
    private final boolean useTableInsert;
    private final SessionManager sessionManager = SessionManager.getInstance();
    private final ConcurrentHashMap<String, MqttClientSession> clientIdToSessionMap = new ConcurrentHashMap<>();
    private final IPartitionFetcher partitionFetcher = ClusterPartitionFetcher.getInstance();
    private final ISchemaFetcher schemaFetcher = ClusterSchemaFetcher.getInstance();

    public MPPPublishHandler(IoTDBConfig ioTDBConfig) {
        this.payloadFormat = PayloadFormatManager.getPayloadFormat(ioTDBConfig.getMqttPayloadFormatter());
        this.useTableInsert = "table".equals(this.payloadFormat.getType());
    }

    public String getID() {
        return "iotdb-mqtt-broker-listener";
    }

    public void onConnect(InterceptConnectMessage interceptConnectMessage) {
        if (this.clientIdToSessionMap.containsKey(interceptConnectMessage.getClientID())) {
            return;
        }
        MqttClientSession mqttClientSession = new MqttClientSession(interceptConnectMessage.getClientID());
        this.sessionManager.login(mqttClientSession, interceptConnectMessage.getUsername(), new String(interceptConnectMessage.getPassword()), ZoneId.systemDefault().toString(), TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3, IoTDBConstant.ClientVersion.V_1_0, this.useTableInsert ? IClientSession.SqlDialect.TABLE : IClientSession.SqlDialect.TREE);
        this.sessionManager.registerSession(mqttClientSession);
        this.clientIdToSessionMap.put(interceptConnectMessage.getClientID(), mqttClientSession);
    }

    public void onDisconnect(InterceptDisconnectMessage interceptDisconnectMessage) {
        MqttClientSession remove = this.clientIdToSessionMap.remove(interceptDisconnectMessage.getClientID());
        if (null != remove) {
            this.sessionManager.removeCurrSession();
            SessionManager sessionManager = this.sessionManager;
            Coordinator coordinator = Coordinator.getInstance();
            Objects.requireNonNull(coordinator);
            sessionManager.closeSession(remove, (v1) -> {
                r2.cleanupQueryExecution(v1);
            });
        }
    }

    public void onPublish(InterceptPublishMessage interceptPublishMessage) {
        try {
            try {
                String clientID = interceptPublishMessage.getClientID();
                if (!this.clientIdToSessionMap.containsKey(clientID)) {
                    super.onPublish(interceptPublishMessage);
                    return;
                }
                MqttClientSession mqttClientSession = this.clientIdToSessionMap.get(interceptPublishMessage.getClientID());
                ByteBuf payload = interceptPublishMessage.getPayload();
                LOG.debug("Receive publish message. clientId: {}, username: {}, qos: {}, topic: {}, payload: {}", new Object[]{clientID, interceptPublishMessage.getUsername(), interceptPublishMessage.getQos(), interceptPublishMessage.getTopicName(), payload});
                List<Message> format = this.payloadFormat.format(payload);
                if (format == null) {
                    super.onPublish(interceptPublishMessage);
                    return;
                }
                for (Message message : format) {
                    if (message != null) {
                        if (this.useTableInsert) {
                            TableMessage tableMessage = (TableMessage) message;
                            tableMessage.setDatabase(!interceptPublishMessage.getTopicName().contains("/") ? interceptPublishMessage.getTopicName() : interceptPublishMessage.getTopicName().substring(0, interceptPublishMessage.getTopicName().indexOf("/")));
                            insertTable(tableMessage, mqttClientSession);
                        } else {
                            insertTree((TreeMessage) message, mqttClientSession);
                        }
                    }
                }
                super.onPublish(interceptPublishMessage);
            } catch (Throwable th) {
                LOG.warn("onPublish execution exception, msg is [{}], error is ", interceptPublishMessage, th);
                super.onPublish(interceptPublishMessage);
            }
        } catch (Throwable th2) {
            super.onPublish(interceptPublishMessage);
            throw th2;
        }
    }

    private void insertTable(TableMessage tableMessage, MqttClientSession mqttClientSession) {
        try {
            TimestampPrecisionUtils.checkTimestampPrecision(tableMessage.getTimestamp().longValue());
            InsertTabletStatement constructInsertTabletStatement = constructInsertTabletStatement(tableMessage);
            mqttClientSession.setDatabaseName(tableMessage.getDatabase().toLowerCase());
            mqttClientSession.setSqlDialect(IClientSession.SqlDialect.TABLE);
            TSStatus tSStatus = Coordinator.getInstance().executeForTableModel(constructInsertTabletStatement, new SqlParser(), mqttClientSession, this.sessionManager.requestQueryId(), this.sessionManager.getSessionInfo(mqttClientSession), "", LocalExecutionPlanner.getInstance().metadata, config.getQueryTimeoutThreshold()).status;
            LOG.debug("process result: {}", tSStatus);
            if (tSStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                LOG.warn("mqtt line insert error , message = {}", tSStatus.message);
            }
        } catch (Exception e) {
            LOG.warn("meet error when inserting database {}, table {}, tags {}, attributes {}, fields {}, at time {}, because ", new Object[]{tableMessage.getDatabase(), tableMessage.getTable(), tableMessage.getTagKeys(), tableMessage.getAttributeKeys(), tableMessage.getFields(), tableMessage.getTimestamp(), e});
        }
    }

    private InsertTabletStatement constructInsertTabletStatement(TableMessage tableMessage) throws IllegalPathException {
        InsertTabletStatement insertTabletStatement = new InsertTabletStatement();
        insertTabletStatement.setDevicePath(DataNodeDevicePathCache.getInstance().getPartialPath(tableMessage.getTable()));
        List list = (List) Stream.of((Object[]) new List[]{tableMessage.getFields(), tableMessage.getTagKeys(), tableMessage.getAttributeKeys()}).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
        insertTabletStatement.setMeasurements((String[]) list.toArray(new String[0]));
        insertTabletStatement.setTimes(new long[]{tableMessage.getTimestamp().longValue()});
        BitMap[] bitMapArr = new BitMap[list.size()];
        insertTabletStatement.setColumns(Stream.of((Object[]) new List[]{tableMessage.getValues(), tableMessage.getTagValues(), tableMessage.getAttributeValues()}).flatMap((v0) -> {
            return v0.stream();
        }).toArray(i -> {
            return new Object[i];
        }));
        insertTabletStatement.setBitMaps(bitMapArr);
        insertTabletStatement.setRowCount(1);
        insertTabletStatement.setAligned(false);
        insertTabletStatement.setWriteToTable(true);
        TSDataType[] tSDataTypeArr = new TSDataType[list.size()];
        TsTableColumnCategory[] tsTableColumnCategoryArr = new TsTableColumnCategory[list.size()];
        for (int i2 = 0; i2 < tableMessage.getFields().size(); i2++) {
            tSDataTypeArr[i2] = tableMessage.getDataTypes().get(i2);
            tsTableColumnCategoryArr[i2] = TsTableColumnCategory.FIELD;
        }
        for (int size = tableMessage.getFields().size(); size < tableMessage.getFields().size() + tableMessage.getTagKeys().size(); size++) {
            tSDataTypeArr[size] = TSDataType.STRING;
            tsTableColumnCategoryArr[size] = TsTableColumnCategory.TAG;
        }
        for (int size2 = tableMessage.getFields().size() + tableMessage.getTagKeys().size(); size2 < tableMessage.getFields().size() + tableMessage.getTagKeys().size() + tableMessage.getAttributeKeys().size(); size2++) {
            tSDataTypeArr[size2] = TSDataType.STRING;
            tsTableColumnCategoryArr[size2] = TsTableColumnCategory.ATTRIBUTE;
        }
        insertTabletStatement.setDataTypes(tSDataTypeArr);
        insertTabletStatement.setColumnCategories(tsTableColumnCategoryArr);
        return insertTabletStatement;
    }

    private void insertTree(TreeMessage treeMessage, MqttClientSession mqttClientSession) {
        try {
            InsertRowStatement insertRowStatement = new InsertRowStatement();
            insertRowStatement.setDevicePath(DataNodeDevicePathCache.getInstance().getPartialPath(treeMessage.getDevice()));
            TimestampPrecisionUtils.checkTimestampPrecision(treeMessage.getTimestamp().longValue());
            insertRowStatement.setTime(treeMessage.getTimestamp().longValue());
            insertRowStatement.setMeasurements((String[]) treeMessage.getMeasurements().toArray(new String[0]));
            if (treeMessage.getDataTypes() == null) {
                insertRowStatement.setDataTypes(new TSDataType[treeMessage.getMeasurements().size()]);
                insertRowStatement.setValues(treeMessage.getValues().toArray(new Object[0]));
                insertRowStatement.setNeedInferType(true);
            } else {
                List<TSDataType> dataTypes = treeMessage.getDataTypes();
                List<String> values = treeMessage.getValues();
                Object[] objArr = new Object[values.size()];
                for (int i = 0; i < values.size(); i++) {
                    objArr[i] = CommonUtils.parseValue(dataTypes.get(i), values.get(i));
                }
                insertRowStatement.setDataTypes((TSDataType[]) dataTypes.toArray(new TSDataType[0]));
                insertRowStatement.setValues(objArr);
            }
            insertRowStatement.setAligned(false);
            TSStatus checkAuthority = AuthorityChecker.checkAuthority(insertRowStatement, mqttClientSession);
            if (checkAuthority.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                LOG.warn(checkAuthority.message);
            } else {
                TSStatus tSStatus = Coordinator.getInstance().executeForTreeModel(insertRowStatement, this.sessionManager.requestQueryId(), this.sessionManager.getSessionInfo(mqttClientSession), "", this.partitionFetcher, this.schemaFetcher, config.getQueryTimeoutThreshold(), false).status;
                LOG.debug("process result: {}", tSStatus);
                if (tSStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                    LOG.warn("mqtt json insert error , message = {}", tSStatus.message);
                }
            }
        } catch (Exception e) {
            LOG.warn("meet error when inserting device {}, measurements {}, at time {}, because ", new Object[]{treeMessage.getDevice(), treeMessage.getMeasurements(), treeMessage.getTimestamp(), e});
        }
    }

    public void onSessionLoopError(Throwable th) {
    }
}
