MQTT 是机器对机器(M2M)/“物联网”连接协议。
它被设计为一种非常轻量级的发布/订阅消息传递。
对于与需要较小代码占用和/或网络带宽非常宝贵的远程位置的连接很有用。
IoTDB 支持 MQTT v3.1(OASIS 标准)协议。 IoTDB 服务器包括内置的 MQTT 服务,该服务允许远程设备将消息直接发送到 IoTDB 服务器。
内置的 MQTT 服务提供了通过 MQTT 直接连接到 IoTDB 的能力。 它侦听来自 MQTT 客户端的发布消息,然后立即将数据写入存储。 MQTT 主题与 IoTDB 时间序列相对应。 消息有效载荷可以由 Java SPI 加载的PayloadFormatter格式化为事件,默认实现为JSONPayloadFormatter 默认的json格式化程序支持两种 json 格式以及由他们组成的json数组,以下是 MQTT 消息有效负载示例:
{ "device":"root.sg.d1", "timestamp":1586076045524, "measurements":["s1","s2"], "values":[0.530635,0.530635] }
或者
{ "device":"root.sg.d1", "timestamps":[1586076045524,1586076065526], "measurements":["s1","s2"], "values":[[0.530635,0.530635], [0.530655,0.530695]] }
或者以上两者的JSON数组形式。
默认情况下,IoTDB MQTT 服务从${IOTDB_HOME}/${IOTDB_CONF}/iotdb-datanode.properties加载配置。
配置如下:
| 名称 | 描述 | 默认 |
|---|---|---|
| enable_mqtt_service | 是否启用 mqtt 服务 | false |
| mqtt_host | mqtt 服务绑定主机 | 127.0.0.1 |
| mqtt_port | mqtt 服务绑定端口 | 1883 |
| mqtt_handler_pool_size | 处理 mqtt 消息的处理程序池大小 | 1 |
| mqtt_payload_formatter | mqtt 消息有效负载格式化程序 | json |
| mqtt_max_message_size | mqtt 消息最大长度(字节) | 1048576 |
### 示例代码 以下是 mqtt 客户端将消息发送到 IoTDB 服务器的示例。
MQTT mqtt = new MQTT(); mqtt.setHost("127.0.0.1", 1883); mqtt.setUserName("root"); mqtt.setPassword("root"); BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); Random random = new Random(); for (int i = 0; i < 10; i++) { String payload = String.format("{\n" + "\"device\":\"root.sg.d1\",\n" + "\"timestamp\":%d,\n" + "\"measurements\":[\"s1\"],\n" + "\"values\":[%f]\n" + "}", System.currentTimeMillis(), random.nextDouble()); connection.publish("root.sg.d1.s1", payload.getBytes(), QoS.AT_LEAST_ONCE, false); } connection.disconnect();
事实上可以通过简单编程来实现 MQTT 消息的格式自定义。 可以在源码的 example/mqtt-customize 项目中找到一个简单示例。
步骤:
<dependency> <groupId>org.apache.iotdb</groupId> <artifactId>iotdb-server</artifactId> <version>${project.version}</version> </dependency>
org.apache.iotdb.db.mqtt.protocol.PayloadFormatterpackage org.apache.iotdb.mqtt.server; import io.netty.buffer.ByteBuf; import org.apache.iotdb.db.protocol.mqtt.Message; import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.List; public class CustomizedJsonPayloadFormatter implements PayloadFormatter { @Override public List<Message> format(ByteBuf payload) { // Suppose the payload is a json format if (payload == null) { return null; } String json = payload.toString(StandardCharsets.UTF_8); // parse data from the json and generate Messages and put them into List<Meesage> ret List<Message> ret = new ArrayList<>(); // this is just an example, so we just generate some Messages directly for (int i = 0; i < 2; i++) { long ts = i; Message message = new Message(); message.setDevice("d" + i); message.setTimestamp(ts); message.setMeasurements(Arrays.asList("s1", "s2")); message.setValues(Arrays.asList("4.0" + i, "5.0" + i)); ret.add(message); } return ret; } @Override public String getName() { // set the value of mqtt_payload_formatter in iotdb-datanode.properties as the following string: return "CustomizedJson"; } }
src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter 文件: 将示例中的文件内容清除,并将刚才的实现类的全名(包名.类名)写入文件中。注意,这个文件中只有一行。 在本例中,文件内容为: org.apache.iotdb.mqtt.server.CustomizedJsonPayloadFormattermvn package -DskipTests在 IoTDB 服务端:
enable_mqtt_service=true in conf/iotdb-datanode.properties)conf/iotdb-datanode.properties 中 mqtt_payload_formatter 的值, , 在本例中,为 CustomizedJsonMore: MQTT 协议的消息不限于 json,你还可以用任意二进制。通过如下函数获得: payload.forEachByte() or payload.array。