MQTT 协议

1. 概述

MQTT 是一种专为物联网(IoT)和低带宽环境设计的轻量级消息传输协议,基于发布/订阅(Pub/Sub)模型,支持设备间高效、可靠的双向通信。其核心目标是低功耗、低带宽消耗和高实时性,尤其适合网络不稳定或资源受限的场景(如传感器、移动设备)。

IoTDB 深度集成了 MQTT 协议能力,完整兼容 MQTT v3.1(OASIS 国际标准协议)。IoTDB 服务器内置高性能 MQTT Broker 服务模块,无需第三方中间件,支持设备通过 MQTT 报文将时序数据直接写入 IoTDB 存储引擎。

2. 配置方式

默认情况下,IoTDB MQTT 服务通过${IOTDB_HOME}/${IOTDB_CONF}/iotdb-system.properties加载配置。

具体配置项如下:

名称描述默认
enable_mqtt_service是否启用 mqtt 服务FALSE
mqtt_hostmqtt 服务绑定主机127.0.0.1
mqtt_portmqtt 服务绑定端口1883
mqtt_handler_pool_size处理 mqtt 消息的处理程序池大小1
mqtt_payload_formattermqtt​ 消息有效负载格式化程序。可选项:​​json:仅适用于树模型。​​line:仅适用于表模型。json
mqtt_max_message_sizemqtt 消息最大长度(字节)1048576

3. 写入协议

  • 行协议语法格式
<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]][ <attribute_key>=<attribute_value>[,<attribute_key>=<attribute_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
  • 示例
myMeasurement,tag1=value1,tag2=value2 attr1=value1,attr2=value2 fieldKey="fieldValue" 1556813561098000000

4. 命名约定

  • 数据库名称

MQTT topic 名称用 / 分割后, 第一串内容作为数据库名称。

topic: stock/Legacy
databaseName: stock


topic: stock/Legacy/#
databaseName:stock
  • 表名称

表名称使用行协议中的 <measurement>

  • 类型标识
Filed 内容IoTDB 数据类型
1
1.12
DOUBLE
1f
1.12f
FLOAT
1i
123i
INT64
1u
123u
INT64
1i32
123i32
INT32
"xxx"TEXT
t,T,true,True,TRUE
f,F,false,False,FALSE
BOOLEAN

5. 代码示例

以下是 mqtt 客户端将消息发送到 IoTDB 服务器的示例。

MQTT mqtt = new MQTT();
mqtt.setHost("127.0.0.1", 1883);
mqtt.setUserName("root");
mqtt.setPassword("root");

BlockingConnection connection = mqtt.blockingConnection();
connection.connect();

Random random = new Random();
for (int i = 0; i < 10; i++) {
   String payload = String.format("{\n" +
           "\"device\":\"root.sg.d1\",\n" +
           "\"timestamp\":%d,\n" +
           "\"measurements\":[\"s1\"],\n" +
           "\"values\":[%f]\n" +
           "}", System.currentTimeMillis(), random.nextDouble());

   connection.publish("root.sg.d1.s1", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
}

connection.disconnect();

6. 自定义 MQTT 消息格式

事实上可以通过简单编程来实现 MQTT 消息的格式自定义。 可以在源码的 example/mqtt-customize 项目中找到一个简单示例。

步骤:

  1. 创建一个 Java 项目,增加如下依赖
        <dependency>
            <groupId>org.apache.iotdb</groupId>
            <artifactId>iotdb-server</artifactId>
            <version>2.0.4-SNAPSHOT</version>
        </dependency>
  1. 创建一个实现类,实现接口 org.apache.iotdb.db.mqtt.protocol.PayloadFormatter
package org.apache.iotdb.mqtt.server;

import io.netty.buffer.ByteBuf;
import org.apache.iotdb.db.protocol.mqtt.Message;
import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class CustomizedLinePayloadFormatter implements PayloadFormatter {

    @Override
    public List<Message> format(String topic, ByteBuf payload) {
        // Suppose the payload is a line format
        if (payload == null) {
            return null;
        }

        String line = payload.toString(StandardCharsets.UTF_8);
        // parse data from the line and generate Messages and put them into List<Meesage> ret
        List<Message> ret = new ArrayList<>();
        // this is just an example, so we just generate some Messages directly
        for (int i = 0; i < 3; i++) {
            long ts = i;
            TableMessage message = new TableMessage();
            
            // Parsing Database Name
            message.setDatabase("db" + i);
            
            //Parsing Table Names
            message.setTable("t" + i);

            // Parsing Tags
            List<String> tagKeys = new ArrayList<>();
            tagKeys.add("tag1" + i);
            tagKeys.add("tag2" + i);
            List<Object> tagValues = new ArrayList<>();
            tagValues.add("t_value1" + i);
            tagValues.add("t_value2" + i);
            message.setTagKeys(tagKeys);
            message.setTagValues(tagValues);

            // Parsing Attributes
            List<String> attributeKeys = new ArrayList<>();
            List<Object> attributeValues = new ArrayList<>();
            attributeKeys.add("attr1" + i);
            attributeKeys.add("attr2" + i);
            attributeValues.add("a_value1" + i);
            attributeValues.add("a_value2" + i);
            message.setAttributeKeys(attributeKeys);
            message.setAttributeValues(attributeValues);

           // Parsing Fields
           List<String> fields = Arrays.asList("field1" + i, "field2" + i);
           List<TSDataType> dataTypes = Arrays.asList(TSDataType.FLOAT, TSDataType.FLOAT);
           List<Object> values = Arrays.asList("4.0" + i, "5.0" + i);
           message.setFields(fields);
           message.setDataTypes(dataTypes);
           message.setValues(values);
           
           //// Parsing timestamp
           message.setTimestamp(ts);
           ret.add(message);
        }
        return ret;
    }

    @Override
    public String getName() {
        // set the value of mqtt_payload_formatter in iotdb-system.properties as the following string:
        return "CustomizedLine";
    }
}
  1. 修改项目中的 src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter 文件: 将示例中的文件内容清除,并将刚才的实现类的全名(包名.类名)写入文件中。注意,这个文件中只有一行。 在本例中,文件内容为: org.apache.iotdb.mqtt.server.CustomizedLinePayloadFormatter
  2. 编译项目生成一个 jar 包: mvn package -DskipTests

在 IoTDB 服务端:

  1. 创建 ${IOTDB_HOME}/ext/mqtt/ 文件夹, 将刚才的 jar 包放入此文件夹。
  2. 打开 MQTT 服务参数. (enable_mqtt_service=true in conf/iotdb-system.properties)
  3. 用刚才的实现类中的 getName() 方法的返回值 设置为 conf/iotdb-system.propertiesmqtt_payload_formatter 的值, , 在本例中,为 CustomizedLine
  4. 启动 IoTDB
  5. 搞定

More: MQTT 协议的消息不限于 line,你还可以用任意二进制。通过如下函数获得: payload.forEachByte() or payload.array

7. 注意事项

为避免因缺省client_id引发的兼容性问题,强烈建议在所有MQTT客户端中始终显式地提供唯一且非空的 client_id。 不同客户端在client_id缺失或为空时的表现并不一致,常见示例如下:

  1. 显式传入空字符串 • MQTTX:client_id=""时,IoTDB会直接丢弃消息; • mosquitto_pub:client_id=""时,IoTDB能正常接收消息。
  2. 完全不传client_id • MQTTX:消息可被IoTDB正常接收; • mosquitto_pub:IoTDB拒绝连接。 由此可见,显式指定唯一且非空的client_id是消除上述差异、确保消息可靠投递的最简单做法。