MQTT (Message Queuing Telemetry Transport) is a lightweight messaging protocol designed for IoT and low-bandwidth environments. It operates on a Publish/Subscribe (Pub/Sub) model, enabling efficient and reliable bidirectional communication between devices. Its core objectives are low power consumption, minimal bandwidth usage, and high real-time performance, making it ideal for unstable networks or resource-constrained scenarios (e.g., sensors, mobile devices).
IoTDB provides deep integration with the MQTT protocol, fully compliant with MQTT v3.1 (OASIS International Standard). The IoTDB server includes a built-in high-performance MQTT Broker module, eliminating the need for third-party middleware. Devices can directly write time-series data into the IoTDB storage engine via MQTT messages.
Note: As of version V2.0.8.2, the TimechoDB installation package does not include the MQTT service JAR file by default. Please contact the Timecho team to obtain the JAR file before using this service, and place it in the timechodb_home/lib or timechodb_home/ext/external_service directory.
By default, the IoTDB MQTT service loads configurations from ${IOTDB_HOME}/${IOTDB_CONF}/iotdb-system.properties.
| Property | Description | Default |
|---|---|---|
enable_mqtt_service | Enable/ disable the MQTT service. | FALSE |
mqtt_host | Host address bound to the MQTT service. | 127.0.0.1 |
mqtt_port | Port bound to the MQTT service. | 1883 |
mqtt_handler_pool_size | Thread pool size for processing MQTT messages. | 1 |
mqtt_payload_formatter | Formatting method for MQTT message payloads. Options: json (tree mode), line (table mode). | json |
mqtt_max_message_size | Maximum allowed MQTT message size (bytes). | 1048576 |
<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]][ <attribute_key>=<attribute_value>[,<attribute_key>=<attribute_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
myMeasurement,tag1=value1,tag2=value2 attr1=value1,attr2=value2 fieldKey="fieldValue" 1556813561098000000
The first segment of the MQTT topic (split by /) is used as the database name.
topic: stock/Legacy databaseName: stock topic: stock/Legacy/# databaseName:stock
The table name is derived from the <measurement> in the line protocol.
| Filed Value | IoTDB Data Type |
|---|---|
| 1 1.12 | DOUBLE |
1f1.12 f | FLOAT |
1i123 i | INT64 |
1u123 u | INT64 |
1i32123 i32 | INT32 |
"xxx" | TEXT |
t,T,true,True,TRUEf,F,false,False,FALSE | BOOLEAN |
The following is an example which a mqtt client send messages to IoTDB server.
MQTT mqtt = new MQTT(); mqtt.setHost("127.0.0.1", 1883); mqtt.setUserName("root"); mqtt.setPassword("root"); BlockingConnection connection = mqtt.blockingConnection(); String DATABASE = "myMqttTest"; connection.connect(); String payload = "test1,tag1=t1,tag2=t2 attr3=a5,attr4=a4 field1=\"fieldValue1\",field2=1i,field3=1u 1"; connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); Thread.sleep(10); payload = "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 2"; connection.publish(DATABASE, payload.getBytes(), QoS.AT_LEAST_ONCE, false); Thread.sleep(10); payload = "# It's a remark\n " + "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 6"; connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); Thread.sleep(10); //batch write example payload = "test1,tag1=t1,tag2=t2 field7=t,field8=T,field9=true 3 \n " + "test1,tag1=t1,tag2=t2 field7=f,field8=F,field9=FALSE 4"; connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); Thread.sleep(10); //batch write example payload = "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 field1=\"fieldValue1\",field2=1i,field3=1u 4 \n " + "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 5"; connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); Thread.sleep(10); connection.disconnect();
If you do not like the above Line format, you can customize your MQTT Message format by just writing several lines of codes. An example can be found in example/mqtt-customize project.
Steps:
<dependency> <groupId>org.apache.iotdb</groupId> <artifactId>iotdb-server</artifactId> <version>2.0.4-SNAPSHOT</version> </dependency>
org.apache.iotdb.db.protocol.mqtt.PayloadFormatter e.g.,package org.apache.iotdb.mqtt.server; import io.netty.buffer.ByteBuf; import org.apache.iotdb.db.protocol.mqtt.Message; import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.List; public class CustomizedLinePayloadFormatter implements PayloadFormatter { @Override public List<Message> format(String topic, ByteBuf payload) { // Suppose the payload is a line format if (payload == null) { return null; } String line = payload.toString(StandardCharsets.UTF_8); // parse data from the line and generate Messages and put them into List<Meesage> ret List<Message> ret = new ArrayList<>(); // this is just an example, so we just generate some Messages directly for (int i = 0; i < 3; i++) { long ts = i; TableMessage message = new TableMessage(); // Parsing Database Name message.setDatabase("db" + i); //Parsing Table Names message.setTable("t" + i); // Parsing Tags List<String> tagKeys = new ArrayList<>(); tagKeys.add("tag1" + i); tagKeys.add("tag2" + i); List<Object> tagValues = new ArrayList<>(); tagValues.add("t_value1" + i); tagValues.add("t_value2" + i); message.setTagKeys(tagKeys); message.setTagValues(tagValues); // Parsing Attributes List<String> attributeKeys = new ArrayList<>(); List<Object> attributeValues = new ArrayList<>(); attributeKeys.add("attr1" + i); attributeKeys.add("attr2" + i); attributeValues.add("a_value1" + i); attributeValues.add("a_value2" + i); message.setAttributeKeys(attributeKeys); message.setAttributeValues(attributeValues); // Parsing Fields List<String> fields = Arrays.asList("field1" + i, "field2" + i); List<TSDataType> dataTypes = Arrays.asList(TSDataType.FLOAT, TSDataType.FLOAT); List<Object> values = Arrays.asList("4.0" + i, "5.0" + i); message.setFields(fields); message.setDataTypes(dataTypes); message.setValues(values); //// Parsing timestamp message.setTimestamp(ts); ret.add(message); } return ret; } @Override public String getName() { // set the value of mqtt_payload_formatter in iotdb-system.properties as the following string: return "CustomizedLine"; } }
src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter: clean the file and put your implementation class name into the file. In this example, the content is: org.apache.iotdb.mqtt.server.CustomizedLinePayloadFormattermvn package -DskipTestsThen, in your server:
enable_mqtt_service=true in conf/iotdb-system.properties)mqtt_payload_formatter in conf/iotdb-system.properties as the value of getName() in your implementation , in this example, the value is CustomizedLineMore: the message format can be anything you want. For example, if it is a binary format, just use payload.forEachByte() or payload.array to get bytes content.
To avoid compatibility issues caused by a default client_id, always explicitly supply a unique, non-empty client_id in every MQTT client. Behavior varies when the client_id is missing or empty. Common examples: