| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.iotdb.db.protocol.mqtt; |
| |
| import org.apache.iotdb.common.rpc.thrift.TSStatus; |
| import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion; |
| import org.apache.iotdb.db.auth.AuthorityChecker; |
| import org.apache.iotdb.db.conf.IoTDBConfig; |
| import org.apache.iotdb.db.conf.IoTDBDescriptor; |
| import org.apache.iotdb.db.protocol.session.MqttClientSession; |
| import org.apache.iotdb.db.protocol.session.SessionManager; |
| import org.apache.iotdb.db.queryengine.plan.Coordinator; |
| import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher; |
| import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher; |
| import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache; |
| import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher; |
| import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher; |
| import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult; |
| import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement; |
| import org.apache.iotdb.db.utils.CommonUtils; |
| import org.apache.iotdb.db.utils.TimestampPrecisionUtils; |
| import org.apache.iotdb.rpc.TSStatusCode; |
| import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; |
| |
| import io.moquette.interception.AbstractInterceptHandler; |
| import io.moquette.interception.messages.InterceptConnectMessage; |
| import io.moquette.interception.messages.InterceptDisconnectMessage; |
| import io.moquette.interception.messages.InterceptPublishMessage; |
| import io.netty.buffer.ByteBuf; |
| import io.netty.handler.codec.mqtt.MqttQoS; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.time.ZoneId; |
| import java.util.List; |
| import java.util.concurrent.ConcurrentHashMap; |
| |
| /** PublishHandler handle the messages from MQTT clients. */ |
| public class MPPPublishHandler extends AbstractInterceptHandler { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(MPPPublishHandler.class); |
| |
| private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); |
| private final SessionManager sessionManager = SessionManager.getInstance(); |
| |
| private final ConcurrentHashMap<String, MqttClientSession> clientIdToSessionMap = |
| new ConcurrentHashMap<>(); |
| private final PayloadFormatter payloadFormat; |
| private final IPartitionFetcher partitionFetcher; |
| private final ISchemaFetcher schemaFetcher; |
| |
| public MPPPublishHandler(IoTDBConfig config) { |
| this.payloadFormat = PayloadFormatManager.getPayloadFormat(config.getMqttPayloadFormatter()); |
| partitionFetcher = ClusterPartitionFetcher.getInstance(); |
| schemaFetcher = ClusterSchemaFetcher.getInstance(); |
| } |
| |
| @Override |
| public String getID() { |
| return "iotdb-mqtt-broker-listener"; |
| } |
| |
| @Override |
| public void onConnect(InterceptConnectMessage msg) { |
| if (!clientIdToSessionMap.containsKey(msg.getClientID())) { |
| MqttClientSession session = new MqttClientSession(msg.getClientID()); |
| sessionManager.login( |
| session, |
| msg.getUsername(), |
| new String(msg.getPassword()), |
| ZoneId.systemDefault().toString(), |
| TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3, |
| ClientVersion.V_1_0); |
| clientIdToSessionMap.put(msg.getClientID(), session); |
| } |
| } |
| |
| @Override |
| public void onDisconnect(InterceptDisconnectMessage msg) { |
| MqttClientSession session = clientIdToSessionMap.remove(msg.getClientID()); |
| if (null != session) { |
| sessionManager.closeSession(session, Coordinator.getInstance()::cleanupQueryExecution); |
| } |
| } |
| |
| @Override |
| public void onPublish(InterceptPublishMessage msg) { |
| try { |
| String clientId = msg.getClientID(); |
| if (!clientIdToSessionMap.containsKey(clientId)) { |
| return; |
| } |
| MqttClientSession session = clientIdToSessionMap.get(msg.getClientID()); |
| ByteBuf payload = msg.getPayload(); |
| String topic = msg.getTopicName(); |
| String username = msg.getUsername(); |
| MqttQoS qos = msg.getQos(); |
| |
| LOG.debug( |
| "Receive publish message. clientId: {}, username: {}, qos: {}, topic: {}, payload: {}", |
| clientId, |
| username, |
| qos, |
| topic, |
| payload); |
| |
| List<Message> events = payloadFormat.format(payload); |
| if (events == null) { |
| return; |
| } |
| |
| for (Message event : events) { |
| if (event == null) { |
| continue; |
| } |
| |
| TSStatus tsStatus = null; |
| try { |
| InsertRowStatement statement = new InsertRowStatement(); |
| statement.setDevicePath( |
| DataNodeDevicePathCache.getInstance().getPartialPath(event.getDevice())); |
| TimestampPrecisionUtils.checkTimestampPrecision(event.getTimestamp()); |
| statement.setTime(event.getTimestamp()); |
| statement.setMeasurements(event.getMeasurements().toArray(new String[0])); |
| if (event.getDataTypes() == null) { |
| statement.setDataTypes(new TSDataType[event.getMeasurements().size()]); |
| statement.setValues(event.getValues().toArray(new Object[0])); |
| statement.setNeedInferType(true); |
| } else { |
| List<TSDataType> dataTypes = event.getDataTypes(); |
| List<String> values = event.getValues(); |
| Object[] inferredValues = new Object[values.size()]; |
| for (int i = 0; i < values.size(); ++i) { |
| inferredValues[i] = CommonUtils.parseValue(dataTypes.get(i), values.get(i)); |
| } |
| statement.setDataTypes(dataTypes.toArray(new TSDataType[0])); |
| statement.setValues(inferredValues); |
| } |
| statement.setAligned(false); |
| |
| tsStatus = AuthorityChecker.checkAuthority(statement, session); |
| if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| LOG.warn(tsStatus.message); |
| } else { |
| long queryId = sessionManager.requestQueryId(); |
| ExecutionResult result = |
| Coordinator.getInstance() |
| .executeForTreeModel( |
| statement, |
| queryId, |
| sessionManager.getSessionInfo(session), |
| "", |
| partitionFetcher, |
| schemaFetcher, |
| config.getQueryTimeoutThreshold()); |
| tsStatus = result.status; |
| } |
| } catch (Exception e) { |
| LOG.warn( |
| "meet error when inserting device {}, measurements {}, at time {}, because ", |
| event.getDevice(), |
| event.getMeasurements(), |
| event.getTimestamp(), |
| e); |
| } |
| LOG.debug("event process result: {}", tsStatus); |
| } |
| } finally { |
| // release the payload of the message |
| super.onPublish(msg); |
| } |
| } |
| |
| @Override |
| public void onSessionLoopError(Throwable throwable) { |
| // TODO: Implement something sensible here ... |
| } |
| } |