MQTT (Message Queuing Telemetry Transport) is a lightweight messaging protocol designed for IoT and low-bandwidth environments. It operates on a Publish/Subscribe (Pub/Sub) model, enabling efficient and reliable bidirectional communication between devices. Its core objectives are low power consumption, minimal bandwidth usage, and high real-time performance, making it ideal for unstable networks or resource-constrained scenarios (e.g., sensors, mobile devices).
IoTDB provides deep integration with the MQTT protocol, fully compliant with MQTT v3.1 (OASIS International Standard). The IoTDB server includes a built-in high-performance MQTT Broker module, eliminating the need for third-party middleware. Devices can directly write time-series data into the IoTDB storage engine via MQTT messages.
By default, the IoTDB MQTT service loads configurations from ${IOTDB_HOME}/${IOTDB_CONF}/iotdb-system.properties.
| Property | Description | Default |
|---|---|---|
enable_mqtt_service | Enable/ disable the MQTT service. | FALSE |
mqtt_host | Host address bound to the MQTT service. | 127.0.0.1 |
mqtt_port | Port bound to the MQTT service. | 1883 |
mqtt_handler_pool_size | Thread pool size for processing MQTT messages. | 1 |
mqtt_payload_formatter | Formatting method for MQTT message payloads. Options: json (tree model), line (table model). | json |
mqtt_max_message_size | Maximum allowed MQTT message size (bytes). | 1048576 |
<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]][ <attribute_key>=<attribute_value>[,<attribute_key>=<attribute_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
myMeasurement,tag1=value1,tag2=value2 attr1=value1,attr2=value2 fieldKey="fieldValue" 1556813561098000000
The first segment of the MQTT topic (split by /) is used as the database name.
topic: stock/Legacy databaseName: stock topic: stock/Legacy/# databaseName:stock
The table name is derived from the <measurement> in the line protocol.
| Filed Value | IoTDB Data Type |
|---|---|
| 1 1.12 | DOUBLE |
1f1.12 f | FLOAT |
1i123 i | INT64 |
1u123 u | INT64 |
1i32123 i32 | INT32 |
"xxx" | TEXT |
t,T,true,True,TRUEf,F,false,False,FALSE | BOOLEAN |
The following is an example which a mqtt client send messages to IoTDB server.
MQTT mqtt = new MQTT(); mqtt.setHost("127.0.0.1", 1883); mqtt.setUserName("root"); mqtt.setPassword("root"); BlockingConnection connection = mqtt.blockingConnection(); String DATABASE = "myMqttTest"; connection.connect(); String payload = "test1,tag1=t1,tag2=t2 attr3=a5,attr4=a4 field1=\"fieldValue1\",field2=1i,field3=1u 1"; connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); Thread.sleep(10); payload = "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 2"; connection.publish(DATABASE, payload.getBytes(), QoS.AT_LEAST_ONCE, false); Thread.sleep(10); payload = "# It's a remark\n " + "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 6"; connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); Thread.sleep(10); //batch write example payload = "test1,tag1=t1,tag2=t2 field7=t,field8=T,field9=true 3 \n " + "test1,tag1=t1,tag2=t2 field7=f,field8=F,field9=FALSE 4"; connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); Thread.sleep(10); //batch write example payload = "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 field1=\"fieldValue1\",field2=1i,field3=1u 4 \n " + "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 5"; connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); Thread.sleep(10); connection.disconnect();
If you do not like the above Line format, you can customize your MQTT Message format by just writing several lines of codes. An example can be found in example/mqtt-customize project.
Steps:
<dependency> <groupId>org.apache.iotdb</groupId> <artifactId>iotdb-server</artifactId> <version>2.0.4-SNAPSHOT</version> </dependency>
org.apache.iotdb.db.protocol.mqtt.PayloadFormatter e.g.,package org.apache.iotdb.mqtt.server; import io.netty.buffer.ByteBuf; import org.apache.iotdb.db.protocol.mqtt.Message; import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.List; public class CustomizedLinePayloadFormatter implements PayloadFormatter { @Override public List<Message> format(String topic, ByteBuf payload) { // Suppose the payload is a line format if (payload == null) { return null; } String line = payload.toString(StandardCharsets.UTF_8); // parse data from the line and generate Messages and put them into List<Meesage> ret List<Message> ret = new ArrayList<>(); // this is just an example, so we just generate some Messages directly for (int i = 0; i < 3; i++) { long ts = i; TableMessage message = new TableMessage(); // Parsing Database Name message.setDatabase("db" + i); //Parsing Table Names message.setTable("t" + i); // Parsing Tags List<String> tagKeys = new ArrayList<>(); tagKeys.add("tag1" + i); tagKeys.add("tag2" + i); List<Object> tagValues = new ArrayList<>(); tagValues.add("t_value1" + i); tagValues.add("t_value2" + i); message.setTagKeys(tagKeys); message.setTagValues(tagValues); // Parsing Attributes List<String> attributeKeys = new ArrayList<>(); List<Object> attributeValues = new ArrayList<>(); attributeKeys.add("attr1" + i); attributeKeys.add("attr2" + i); attributeValues.add("a_value1" + i); attributeValues.add("a_value2" + i); message.setAttributeKeys(attributeKeys); message.setAttributeValues(attributeValues); // Parsing Fields List<String> fields = Arrays.asList("field1" + i, "field2" + i); List<TSDataType> dataTypes = Arrays.asList(TSDataType.FLOAT, TSDataType.FLOAT); List<Object> values = Arrays.asList("4.0" + i, "5.0" + i); message.setFields(fields); message.setDataTypes(dataTypes); message.setValues(values); //// Parsing timestamp message.setTimestamp(ts); ret.add(message); } return ret; } @Override public String getName() { // set the value of mqtt_payload_formatter in iotdb-system.properties as the following string: return "CustomizedLine"; } }
src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter: clean the file and put your implementation class name into the file. In this example, the content is: org.apache.iotdb.mqtt.server.CustomizedLinePayloadFormattermvn package -DskipTestsThen, in your server:
enable_mqtt_service=true in conf/iotdb-system.properties)mqtt_payload_formatter in conf/iotdb-system.properties as the value of getName() in your implementation , in this example, the value is CustomizedLineMore: the message format can be anything you want. For example, if it is a binary format, just use payload.forEachByte() or payload.array to get bytes content.
To avoid compatibility issues caused by a default client_id, always explicitly supply a unique, non-empty client_id in every MQTT client. Behavior varies when the client_id is missing or empty. Common examples: