MQTT is a machine-to-machine (M2M)/“Internet of Things” connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium.
IoTDB supports the MQTT v3.1(an OASIS Standard) protocol. IoTDB server includes a built-in MQTT service that allows remote devices to send messages to an IoTDB server directly.
The built-in MQTT Service provides the ability of a direct connection to IoTDB using MQTT. It listens for publish messages from MQTT clients and then writes the data into storage immediately. The MQTT topic is directly mapped to the IoTDB timeseries used for persisting the data. The messages payload can be formatted to events by using the PayloadFormatter which loaded by Java SPI. The default implementation is JSONPayloadFormatter. The default json formatter supports both accepting single json objects as well as arrays of json objects.
The following are two examples of valid MQTT message payloads:
{ "device":"root.sg.d1", "timestamp":1586076045524, "measurements":["s1","s2"], "values":[0.530635,0.530635] }
or
{ "device":"root.sg.d1", "timestamps":[1586076045524,1586076065526], "measurements":["s1","s2"], "values":[[0.530635,0.530635], [0.530655,0.530695]] }
or json array of the above two.
The IoTDB MQTT service load configurations from ${IOTDB_HOME}/${IOTDB_CONF}/iotdb-common.properties by default.
The IoTDB MQTT service loads it's configuration from ${IOTDB_HOME}/${IOTDB_CONF}/iotdb-datanode.properties per default.
The configuration options of this file are as follows:
| NAME | DESCRIPTION | DEFAULT |
|---|---|---|
| enable_mqtt_service | whether to enable the mqtt service | false |
| mqtt_host | the mqtt service binding host | 127.0.0.1 |
| mqtt_port | the mqtt service binding port | 1883 |
| mqtt_handler_pool_size | the handler pool size for handing the mqtt messages | 1 |
| mqtt_payload_formatter | the mqtt message payload formatter | json |
| mqtt_max_message_size | the max mqtt message size in byte | 1048576 |
The following is an example which a mqtt client send messages to IoTDB server.
MQTT mqtt = new MQTT(); mqtt.setHost("127.0.0.1", 1883); mqtt.setUserName("root"); mqtt.setPassword("root"); BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); Random random = new Random(); for (int i = 0; i < 10; i++) { String payload = String.format("{\n" + "\"device\":\"root.sg.d1\",\n" + "\"timestamp\":%d,\n" + "\"measurements\":[\"s1\"],\n" + "\"values\":[%f]\n" + "}", System.currentTimeMillis(), random.nextDouble()); connection.publish("root.sg.d1.s1", payload.getBytes(), QoS.AT_LEAST_ONCE, false); } connection.disconnect();
If you need a special format, you can customize your MQTT Message format by writing a few lines of codes.
An example can be found in example/mqtt-customize module.
Steps:
<dependency> <groupId>org.apache.iotdb</groupId> <artifactId>iotdb-server</artifactId> <version>1.1.0-SNAPSHOT</version> </dependency>
org.apache.iotdb.db.protocol.mqtt.PayloadFormatter interface:package org.apache.iotdb.mqtt.server; import io.netty.buffer.ByteBuf; import org.apache.iotdb.db.protocol.mqtt.Message; import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.List; public class CustomizedJsonPayloadFormatter implements PayloadFormatter { @Override public List<Message> format(ByteBuf payload) { // Suppose the payload is a json format if (payload == null) { return null; } String json = payload.toString(StandardCharsets.UTF_8); // parse data from the json and generate Messages and put them into List<Meesage> ret List<Message> ret = new ArrayList<>(); // this is just an example, so we just generate some Messages directly for (int i = 0; i < 2; i++) { long ts = i; Message message = new Message(); message.setDevice("d" + i); message.setTimestamp(ts); message.setMeasurements(Arrays.asList("s1", "s2")); message.setValues(Arrays.asList("4.0" + i, "5.0" + i)); ret.add(message); } return ret; } @Override public String getName() { // set the value of mqtt_payload_formatter in iotdb-datanode.properties as the following string: return "CustomizedJson"; } }
src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter containing the fully qualified class name of your formatter. (In the previous example, this would be adding a single line with this content: org.apache.iotdb.mqtt.server.CustomizedJsonPayloadFormatter)mvn package -DskipTestsThen, in your server:
${IOTDB_HOME}/ext/mqtt/ folder, and put the jar into this directory.conf/iotdb-datanode.properties by setting enable_mqtt_service=true.mqtt_payload_formatter in conf/iotdb-datanode.properties to the value you are returning in your implementation of getName(). (In this example, the value would be: CustomizedJson)More: the message format can be anything you want. For example, if it is a binary format, just use payload.forEachByte() or payload.array to get bytes content.