MQTT 是一种专为物联网(IoT)和低带宽环境设计的轻量级消息传输协议,基于发布/订阅(Pub/Sub)模型,支持设备间高效、可靠的双向通信。其核心目标是低功耗、低带宽消耗和高实时性,尤其适合网络不稳定或资源受限的场景(如传感器、移动设备)。
IoTDB 深度集成了 MQTT 协议能力,完整兼容 MQTT v3.1(OASIS 国际标准协议)。IoTDB 服务器内置高性能 MQTT Broker 服务模块,无需第三方中间件,支持设备通过 MQTT 报文将时序数据直接写入 IoTDB 存储引擎。
默认情况下,IoTDB MQTT 服务通过${IOTDB_HOME}/${IOTDB_CONF}/iotdb-system.properties加载配置。
具体配置项如下:
| 名称 | 描述 | 默认 |
|---|---|---|
enable_mqtt_service | 是否启用 mqtt 服务 | FALSE |
mqtt_host | mqtt 服务绑定主机 | 127.0.0.1 |
mqtt_port | mqtt 服务绑定端口 | 1883 |
mqtt_handler_pool_size | 处理 mqtt 消息的处理程序池大小 | 1 |
mqtt_payload_formatter | mqtt 消息有效负载格式化程序。可选项:json:仅适用于树模型。line:仅适用于表模型。 | json |
mqtt_max_message_size | mqtt 消息最大长度(字节) | 1048576 |
<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]][ <attribute_key>=<attribute_value>[,<attribute_key>=<attribute_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
myMeasurement,tag1=value1,tag2=value2 attr1=value1,attr2=value2 fieldKey="fieldValue" 1556813561098000000
MQTT topic 名称用 / 分割后, 第一串内容作为数据库名称。
topic: stock/Legacy databaseName: stock topic: stock/Legacy/# databaseName:stock
表名称使用行协议中的 <measurement>。
| Filed 内容 | IoTDB 数据类型 |
|---|---|
| 1 1.12 | DOUBLE |
1f1.12 f | FLOAT |
1i123 i | INT64 |
1u123 u | INT64 |
1i32123 i32 | INT32 |
"xxx" | TEXT |
t,T,true,True,TRUE f,F,false,False,FALSE | BOOLEAN |
以下是 mqtt 客户端将消息发送到 IoTDB 服务器的示例。
MQTT mqtt = new MQTT(); mqtt.setHost("127.0.0.1", 1883); mqtt.setUserName("root"); mqtt.setPassword("root"); BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); Random random = new Random(); for (int i = 0; i < 10; i++) { String payload = String.format("{\n" + "\"device\":\"root.sg.d1\",\n" + "\"timestamp\":%d,\n" + "\"measurements\":[\"s1\"],\n" + "\"values\":[%f]\n" + "}", System.currentTimeMillis(), random.nextDouble()); connection.publish("root.sg.d1.s1", payload.getBytes(), QoS.AT_LEAST_ONCE, false); } connection.disconnect();
事实上可以通过简单编程来实现 MQTT 消息的格式自定义。 可以在源码的 example/mqtt-customize 项目中找到一个简单示例。
步骤:
<dependency> <groupId>org.apache.iotdb</groupId> <artifactId>iotdb-server</artifactId> <version>2.0.4-SNAPSHOT</version> </dependency>
org.apache.iotdb.db.mqtt.protocol.PayloadFormatterpackage org.apache.iotdb.mqtt.server; import io.netty.buffer.ByteBuf; import org.apache.iotdb.db.protocol.mqtt.Message; import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.List; public class CustomizedLinePayloadFormatter implements PayloadFormatter { @Override public List<Message> format(String topic, ByteBuf payload) { // Suppose the payload is a line format if (payload == null) { return null; } String line = payload.toString(StandardCharsets.UTF_8); // parse data from the line and generate Messages and put them into List<Meesage> ret List<Message> ret = new ArrayList<>(); // this is just an example, so we just generate some Messages directly for (int i = 0; i < 3; i++) { long ts = i; TableMessage message = new TableMessage(); // Parsing Database Name message.setDatabase("db" + i); //Parsing Table Names message.setTable("t" + i); // Parsing Tags List<String> tagKeys = new ArrayList<>(); tagKeys.add("tag1" + i); tagKeys.add("tag2" + i); List<Object> tagValues = new ArrayList<>(); tagValues.add("t_value1" + i); tagValues.add("t_value2" + i); message.setTagKeys(tagKeys); message.setTagValues(tagValues); // Parsing Attributes List<String> attributeKeys = new ArrayList<>(); List<Object> attributeValues = new ArrayList<>(); attributeKeys.add("attr1" + i); attributeKeys.add("attr2" + i); attributeValues.add("a_value1" + i); attributeValues.add("a_value2" + i); message.setAttributeKeys(attributeKeys); message.setAttributeValues(attributeValues); // Parsing Fields List<String> fields = Arrays.asList("field1" + i, "field2" + i); List<TSDataType> dataTypes = Arrays.asList(TSDataType.FLOAT, TSDataType.FLOAT); List<Object> values = Arrays.asList("4.0" + i, "5.0" + i); message.setFields(fields); message.setDataTypes(dataTypes); message.setValues(values); //// Parsing timestamp message.setTimestamp(ts); ret.add(message); } return ret; } @Override public String getName() { // set the value of mqtt_payload_formatter in iotdb-system.properties as the following string: return "CustomizedLine"; } }
src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter 文件: 将示例中的文件内容清除,并将刚才的实现类的全名(包名.类名)写入文件中。注意,这个文件中只有一行。 在本例中,文件内容为: org.apache.iotdb.mqtt.server.CustomizedLinePayloadFormattermvn package -DskipTests在 IoTDB 服务端:
enable_mqtt_service=true in conf/iotdb-system.properties)conf/iotdb-system.properties 中 mqtt_payload_formatter 的值, , 在本例中,为 CustomizedLineMore: MQTT 协议的消息不限于 line,你还可以用任意二进制。通过如下函数获得: payload.forEachByte() or payload.array。
为避免因缺省client_id引发的兼容性问题,强烈建议在所有MQTT客户端中始终显式地提供唯一且非空的 client_id。 不同客户端在client_id缺失或为空时的表现并不一致,常见示例如下: