| import{_ as a,c as n,b as l,o as e}from"./app-C1IcKGP3.js";const o="/img/mqtt-table-1.png",p="/img/mqtt-table-2.png",t={};function r(c,s){return e(),n("div",null,[...s[0]||(s[0]=[l('<h1 id="mqtt-协议" tabindex="-1"><a class="header-anchor" href="#mqtt-协议"><span>MQTT 协议</span></a></h1><h2 id="_1-概述" tabindex="-1"><a class="header-anchor" href="#_1-概述"><span>1. 概述</span></a></h2><p>MQTT 是一种专为物联网(IoT)和低带宽环境设计的轻量级消息传输协议,基于发布/订阅(Pub/Sub)模型,支持设备间高效、可靠的双向通信。其核心目标是低功耗、低带宽消耗和高实时性,尤其适合网络不稳定或资源受限的场景(如传感器、移动设备)。</p><p>IoTDB 深度集成了 MQTT 协议能力,完整兼容 MQTT v3.1(OASIS 国际标准协议)。IoTDB 服务器内置高性能 MQTT Broker 服务模块,无需第三方中间件,支持设备通过 MQTT 报文将时序数据直接写入 IoTDB 存储引擎。</p><figure><img src="'+o+'" alt="" tabindex="0" loading="lazy"><figcaption></figcaption></figure><h2 id="_2-配置方式" tabindex="-1"><a class="header-anchor" href="#_2-配置方式"><span>2. 配置方式</span></a></h2><p>默认情况下,IoTDB MQTT 服务通过<code>${IOTDB_HOME}/${IOTDB_CONF}/iotdb-system.properties</code>加载配置。</p><p>具体配置项如下:</p><table><thead><tr><th><strong>名称</strong></th><th><strong>描述</strong></th><th><strong>默认</strong></th></tr></thead><tbody><tr><td><code>enable_mqtt_service</code></td><td>是否启用 mqtt 服务</td><td>FALSE</td></tr><tr><td><code>mqtt_host</code></td><td>mqtt 服务绑定主机</td><td>127.0.0.1</td></tr><tr><td><code>mqtt_port</code></td><td>mqtt 服务绑定端口</td><td>1883</td></tr><tr><td><code>mqtt_handler_pool_size</code></td><td>处理 mqtt 消息的处理程序池大小</td><td>1</td></tr><tr><td><strong><code>mqtt_payload_formatter</code></strong></td><td><strong>mqtt</strong><strong> 消息有效负载格式化程序。<strong></strong>可选项:<strong></strong><code>json</code><strong></strong>:仅适用于树模型。<strong></strong><code>line</code><strong></strong>:仅适用于表模型。</strong></td><td><strong>json</strong></td></tr><tr><td><code>mqtt_max_message_size</code></td><td>mqtt 消息最大长度(字节)</td><td>1048576</td></tr></tbody></table><h2 id="_3-写入协议" tabindex="-1"><a class="header-anchor" href="#_3-写入协议"><span>3. 写入协议</span></a></h2><ul><li>行协议语法格式</li></ul><div class="language-javascript line-numbers-mode" data-highlighter="shiki" data-ext="javascript" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-javascript"><span class="line"><span style="color:#ABB2BF;"><</span><span style="color:#E06C75;">measurement</span><span style="color:#ABB2BF;">>[,<</span><span style="color:#E5C07B;">tag_key</span><span style="color:#ABB2BF;">>=<</span><span style="color:#E5C07B;">tag_value</span><span style="color:#ABB2BF;">>[,<</span><span style="color:#E5C07B;">tag_key</span><span style="color:#ABB2BF;">>=<</span><span style="color:#E5C07B;">tag_value</span><span style="color:#ABB2BF;">>]][ <</span><span style="color:#E5C07B;">attribute_key</span><span style="color:#ABB2BF;">>=<</span><span style="color:#E5C07B;">attribute_value</span><span style="color:#ABB2BF;">>[,<</span><span style="color:#E5C07B;">attribute_key</span><span style="color:#ABB2BF;">>=<</span><span style="color:#E5C07B;">attribute_value</span><span style="color:#ABB2BF;">>]] <</span><span style="color:#E5C07B;">field_key</span><span style="color:#ABB2BF;">>=<</span><span style="color:#E5C07B;">field_value</span><span style="color:#ABB2BF;">>[,<</span><span style="color:#E5C07B;">field_key</span><span style="color:#ABB2BF;">>=<</span><span style="color:#E5C07B;">field_value</span><span style="color:#ABB2BF;">>] [<</span><span style="color:#E06C75;">timestamp</span><span style="color:#ABB2BF;">>]</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div></div></div><ul><li>示例</li></ul><div class="language-javascript line-numbers-mode" data-highlighter="shiki" data-ext="javascript" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-javascript"><span class="line"><span style="color:#E06C75;">myMeasurement</span><span style="color:#ABB2BF;">,</span><span style="color:#E06C75;">tag1</span><span style="color:#56B6C2;">=</span><span style="color:#E06C75;">value1</span><span style="color:#ABB2BF;">,</span><span style="color:#E06C75;">tag2</span><span style="color:#56B6C2;">=</span><span style="color:#E06C75;">value2</span><span style="color:#E06C75;"> attr1</span><span style="color:#56B6C2;">=</span><span style="color:#E06C75;">value1</span><span style="color:#ABB2BF;">,</span><span style="color:#E06C75;">attr2</span><span style="color:#56B6C2;">=</span><span style="color:#E06C75;">value2</span><span style="color:#E06C75;"> fieldKey</span><span style="color:#56B6C2;">=</span><span style="color:#98C379;">"fieldValue"</span><span style="color:#D19A66;"> 1556813561098000000</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div></div></div><figure><img src="'+p+`" alt="" tabindex="0" loading="lazy"><figcaption></figcaption></figure><h2 id="_4-命名约定" tabindex="-1"><a class="header-anchor" href="#_4-命名约定"><span>4. 命名约定</span></a></h2><ul><li>数据库名称</li></ul><p>MQTT topic 名称用 <code>/</code> 分割后, 第一串内容作为数据库名称。</p><div class="language-properties line-numbers-mode" data-highlighter="shiki" data-ext="properties" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-properties"><span class="line"><span style="color:#98C379;">topic: stock/Legacy</span></span> |
| <span class="line"><span style="color:#98C379;">databaseName: stock</span></span> |
| <span class="line"></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#98C379;">topic: stock/Legacy/</span><span style="color:#7F848E;font-style:italic;">#</span></span> |
| <span class="line"><span style="color:#98C379;">databaseName:stock</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><ul><li>表名称</li></ul><p>表名称使用行协议中的 <code><measurement></code>。</p><ul><li>类型标识</li></ul><table><thead><tr><th>Filed 内容</th><th>IoTDB 数据类型</th></tr></thead><tbody><tr><td>1<br>1.12</td><td>DOUBLE</td></tr><tr><td>1<code>f</code><br>1.12<code>f</code></td><td>FLOAT</td></tr><tr><td>1<code>i</code><br>123<code>i</code></td><td>INT64</td></tr><tr><td>1<code>u</code><br>123<code>u</code></td><td>INT64</td></tr><tr><td>1<code>i32</code><br>123<code>i32</code></td><td>INT32</td></tr><tr><td><code>"xxx"</code></td><td>TEXT</td></tr><tr><td><code>t</code>,<code>T</code>,<code>true</code>,<code>True</code>,<code>TRUE</code> <br><code>f</code>,<code>F</code>,<code>false</code>,<code>False</code>,<code>FALSE</code></td><td>BOOLEAN</td></tr></tbody></table><h2 id="_5-代码示例" tabindex="-1"><a class="header-anchor" href="#_5-代码示例"><span>5. 代码示例</span></a></h2><p>以下是 mqtt 客户端将消息发送到 IoTDB 服务器的示例。</p><div class="language-java line-numbers-mode" data-highlighter="shiki" data-ext="java" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-java"><span class="line"><span style="color:#E5C07B;">MQTT</span><span style="color:#E06C75;"> mqtt </span><span style="color:#56B6C2;">=</span><span style="color:#C678DD;"> new</span><span style="color:#61AFEF;"> MQTT</span><span style="color:#E06C75;">()</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#E5C07B;">mqtt</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">setHost</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"127.0.0.1"</span><span style="color:#ABB2BF;">, </span><span style="color:#D19A66;">1883</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;">mqtt</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">setUserName</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"root"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;">mqtt</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">setPassword</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"root"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#E5C07B;">BlockingConnection</span><span style="color:#E06C75;"> connection </span><span style="color:#56B6C2;">=</span><span style="color:#E5C07B;"> mqtt</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">blockingConnection</span><span style="color:#ABB2BF;">();</span></span> |
| <span class="line"><span style="color:#E5C07B;">String</span><span style="color:#E06C75;"> DATABASE </span><span style="color:#56B6C2;">=</span><span style="color:#98C379;"> "myMqttTest"</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#E5C07B;">connection</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">connect</span><span style="color:#ABB2BF;">();</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#E5C07B;">String</span><span style="color:#E06C75;"> payload </span><span style="color:#56B6C2;">=</span></span> |
| <span class="line"><span style="color:#98C379;"> "test1,tag1=t1,tag2=t2 attr3=a5,attr4=a4 field1=</span><span style="color:#56B6C2;">\\"</span><span style="color:#98C379;">fieldValue1</span><span style="color:#56B6C2;">\\"</span><span style="color:#98C379;">,field2=1i,field3=1u 1"</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#E5C07B;">connection</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">publish</span><span style="color:#ABB2BF;">(DATABASE </span><span style="color:#56B6C2;">+</span><span style="color:#98C379;"> "/myTopic"</span><span style="color:#ABB2BF;">, </span><span style="color:#E5C07B;">payload</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">getBytes</span><span style="color:#ABB2BF;">(), </span><span style="color:#E5C07B;">QoS</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">AT_LEAST_ONCE</span><span style="color:#ABB2BF;">, </span><span style="color:#D19A66;">false</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;">Thread</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">sleep</span><span style="color:#ABB2BF;">(</span><span style="color:#D19A66;">10</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#E06C75;">payload </span><span style="color:#56B6C2;">=</span><span style="color:#98C379;"> "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 2"</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#E5C07B;">connection</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">publish</span><span style="color:#ABB2BF;">(DATABASE, </span><span style="color:#E5C07B;">payload</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">getBytes</span><span style="color:#ABB2BF;">(), </span><span style="color:#E5C07B;">QoS</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">AT_LEAST_ONCE</span><span style="color:#ABB2BF;">, </span><span style="color:#D19A66;">false</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;">Thread</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">sleep</span><span style="color:#ABB2BF;">(</span><span style="color:#D19A66;">10</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#E06C75;">payload </span><span style="color:#56B6C2;">=</span><span style="color:#98C379;"> "# It's a remark</span><span style="color:#56B6C2;">\\n</span><span style="color:#98C379;"> "</span><span style="color:#56B6C2;"> +</span><span style="color:#98C379;"> "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 6"</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#E5C07B;"> connection</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">publish</span><span style="color:#ABB2BF;">(DATABASE </span><span style="color:#56B6C2;">+</span><span style="color:#98C379;"> "/myTopic"</span><span style="color:#ABB2BF;">, </span><span style="color:#E5C07B;">payload</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">getBytes</span><span style="color:#ABB2BF;">(), </span><span style="color:#E5C07B;">QoS</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">AT_LEAST_ONCE</span><span style="color:#ABB2BF;">, </span><span style="color:#D19A66;">false</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> Thread</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">sleep</span><span style="color:#ABB2BF;">(</span><span style="color:#D19A66;">10</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;">//批量写入示例 </span></span> |
| <span class="line"><span style="color:#E06C75;">payload </span><span style="color:#56B6C2;">=</span></span> |
| <span class="line"><span style="color:#98C379;"> "test1,tag1=t1,tag2=t2 field7=t,field8=T,field9=true 3 </span><span style="color:#56B6C2;">\\n</span><span style="color:#98C379;"> "</span></span> |
| <span class="line"><span style="color:#56B6C2;"> +</span><span style="color:#98C379;"> "test1,tag1=t1,tag2=t2 field7=f,field8=F,field9=FALSE 4"</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#E5C07B;">connection</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">publish</span><span style="color:#ABB2BF;">(DATABASE </span><span style="color:#56B6C2;">+</span><span style="color:#98C379;"> "/myTopic"</span><span style="color:#ABB2BF;">, </span><span style="color:#E5C07B;">payload</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">getBytes</span><span style="color:#ABB2BF;">(), </span><span style="color:#E5C07B;">QoS</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">AT_LEAST_ONCE</span><span style="color:#ABB2BF;">, </span><span style="color:#D19A66;">false</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;">Thread</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">sleep</span><span style="color:#ABB2BF;">(</span><span style="color:#D19A66;">10</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;">//批量写入示例</span></span> |
| <span class="line"><span style="color:#E06C75;">payload </span><span style="color:#56B6C2;">=</span></span> |
| <span class="line"><span style="color:#98C379;"> "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 field1=</span><span style="color:#56B6C2;">\\"</span><span style="color:#98C379;">fieldValue1</span><span style="color:#56B6C2;">\\"</span><span style="color:#98C379;">,field2=1i,field3=1u 4 </span><span style="color:#56B6C2;">\\n</span><span style="color:#98C379;"> "</span></span> |
| <span class="line"><span style="color:#56B6C2;"> +</span><span style="color:#98C379;"> "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 5"</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#E5C07B;">connection</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">publish</span><span style="color:#ABB2BF;">(DATABASE </span><span style="color:#56B6C2;">+</span><span style="color:#98C379;"> "/myTopic"</span><span style="color:#ABB2BF;">, </span><span style="color:#E5C07B;">payload</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">getBytes</span><span style="color:#ABB2BF;">(), </span><span style="color:#E5C07B;">QoS</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">AT_LEAST_ONCE</span><span style="color:#ABB2BF;">, </span><span style="color:#D19A66;">false</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;">Thread</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">sleep</span><span style="color:#ABB2BF;">(</span><span style="color:#D19A66;">10</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#E5C07B;">connection</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">disconnect</span><span style="color:#ABB2BF;">();</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h2 id="_6-自定义-mqtt-消息格式" tabindex="-1"><a class="header-anchor" href="#_6-自定义-mqtt-消息格式"><span>6. 自定义 MQTT 消息格式</span></a></h2><p>事实上可以通过简单编程来实现 MQTT 消息的格式自定义。<br> 可以在源码的 <a href="https://github.com/apache/iotdb/tree/master/example/mqtt-customize" target="_blank" rel="noopener noreferrer">example/mqtt-customize</a> 项目中找到一个简单示例。</p><p>步骤:</p><ol><li>创建一个 Java 项目,增加如下依赖</li></ol><div class="language-xml line-numbers-mode" data-highlighter="shiki" data-ext="xml" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-xml"><span class="line"><span style="color:#ABB2BF;"> <</span><span style="color:#E06C75;">dependency</span><span style="color:#ABB2BF;">></span></span> |
| <span class="line"><span style="color:#ABB2BF;"> <</span><span style="color:#E06C75;">groupId</span><span style="color:#ABB2BF;">>org.apache.iotdb</</span><span style="color:#E06C75;">groupId</span><span style="color:#ABB2BF;">></span></span> |
| <span class="line"><span style="color:#ABB2BF;"> <</span><span style="color:#E06C75;">artifactId</span><span style="color:#ABB2BF;">>iotdb-server</</span><span style="color:#E06C75;">artifactId</span><span style="color:#ABB2BF;">></span></span> |
| <span class="line"><span style="color:#ABB2BF;"> <</span><span style="color:#E06C75;">version</span><span style="color:#ABB2BF;">>2.0.4-SNAPSHOT</</span><span style="color:#E06C75;">version</span><span style="color:#ABB2BF;">></span></span> |
| <span class="line"><span style="color:#ABB2BF;"> </</span><span style="color:#E06C75;">dependency</span><span style="color:#ABB2BF;">></span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><ol start="2"><li>创建一个实现类,实现接口 <code>org.apache.iotdb.db.mqtt.protocol.PayloadFormatter</code></li></ol><div class="language-java line-numbers-mode" data-highlighter="shiki" data-ext="java" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-java"><span class="line"><span style="color:#C678DD;">package</span><span style="color:#C678DD;"> org.apache.iotdb.mqtt.server</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> io.netty.buffer.ByteBuf</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.db.protocol.mqtt.Message</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.db.protocol.mqtt.PayloadFormatter</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> java.nio.charset.StandardCharsets</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> java.util.ArrayList</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> java.util.Arrays</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> java.util.List</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#C678DD;">public</span><span style="color:#C678DD;"> class</span><span style="color:#E5C07B;"> CustomizedLinePayloadFormatter</span><span style="color:#C678DD;"> implements</span><span style="color:#E5C07B;"> PayloadFormatter</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#ABB2BF;"> @</span><span style="color:#E5C07B;">Override</span></span> |
| <span class="line"><span style="color:#C678DD;"> public</span><span style="color:#E5C07B;"> List</span><span style="color:#ABB2BF;"><</span><span style="color:#E5C07B;">Message</span><span style="color:#ABB2BF;">></span><span style="color:#61AFEF;"> format</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">String</span><span style="color:#E06C75;font-style:italic;"> topic</span><span style="color:#ABB2BF;">, </span><span style="color:#E5C07B;">ByteBuf</span><span style="color:#E06C75;font-style:italic;"> payload</span><span style="color:#ABB2BF;">)</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> // Suppose the payload is a line format</span></span> |
| <span class="line"><span style="color:#C678DD;"> if</span><span style="color:#ABB2BF;"> (payload </span><span style="color:#56B6C2;">==</span><span style="color:#D19A66;"> null</span><span style="color:#ABB2BF;">) {</span></span> |
| <span class="line"><span style="color:#C678DD;"> return</span><span style="color:#D19A66;"> null</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#E5C07B;"> String</span><span style="color:#E06C75;"> line</span><span style="color:#56B6C2;"> =</span><span style="color:#E5C07B;"> payload</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">toString</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">StandardCharsets</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">UTF_8</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> // parse data from the line and generate Messages and put them into List<Meesage> ret</span></span> |
| <span class="line"><span style="color:#E5C07B;"> List</span><span style="color:#ABB2BF;"><</span><span style="color:#E5C07B;">Message</span><span style="color:#ABB2BF;">> </span><span style="color:#E06C75;">ret</span><span style="color:#56B6C2;"> =</span><span style="color:#C678DD;"> new</span><span style="color:#E5C07B;"> ArrayList</span><span style="color:#ABB2BF;"><>();</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> // this is just an example, so we just generate some Messages directly</span></span> |
| <span class="line"><span style="color:#C678DD;"> for</span><span style="color:#ABB2BF;"> (</span><span style="color:#C678DD;">int</span><span style="color:#E06C75;"> i</span><span style="color:#56B6C2;"> =</span><span style="color:#D19A66;"> 0</span><span style="color:#ABB2BF;">; i </span><span style="color:#56B6C2;"><</span><span style="color:#D19A66;"> 3</span><span style="color:#ABB2BF;">; i++) {</span></span> |
| <span class="line"><span style="color:#C678DD;"> long</span><span style="color:#E06C75;"> ts</span><span style="color:#56B6C2;"> =</span><span style="color:#ABB2BF;"> i;</span></span> |
| <span class="line"><span style="color:#E5C07B;"> TableMessage</span><span style="color:#E06C75;"> message</span><span style="color:#56B6C2;"> =</span><span style="color:#C678DD;"> new</span><span style="color:#61AFEF;"> TableMessage</span><span style="color:#ABB2BF;">();</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> // Parsing Database Name</span></span> |
| <span class="line"><span style="color:#E5C07B;"> message</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">setDatabase</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"db"</span><span style="color:#56B6C2;"> +</span><span style="color:#ABB2BF;"> i);</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> //Parsing Table Names</span></span> |
| <span class="line"><span style="color:#E5C07B;"> message</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">setTable</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"t"</span><span style="color:#56B6C2;"> +</span><span style="color:#ABB2BF;"> i);</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> // Parsing Tags</span></span> |
| <span class="line"><span style="color:#E5C07B;"> List</span><span style="color:#ABB2BF;"><</span><span style="color:#E5C07B;">String</span><span style="color:#ABB2BF;">> </span><span style="color:#E06C75;">tagKeys</span><span style="color:#56B6C2;"> =</span><span style="color:#C678DD;"> new</span><span style="color:#E5C07B;"> ArrayList</span><span style="color:#ABB2BF;"><>();</span></span> |
| <span class="line"><span style="color:#E5C07B;"> tagKeys</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">add</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"tag1"</span><span style="color:#56B6C2;"> +</span><span style="color:#ABB2BF;"> i);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> tagKeys</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">add</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"tag2"</span><span style="color:#56B6C2;"> +</span><span style="color:#ABB2BF;"> i);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> List</span><span style="color:#ABB2BF;"><</span><span style="color:#E5C07B;">Object</span><span style="color:#ABB2BF;">> </span><span style="color:#E06C75;">tagValues</span><span style="color:#56B6C2;"> =</span><span style="color:#C678DD;"> new</span><span style="color:#E5C07B;"> ArrayList</span><span style="color:#ABB2BF;"><>();</span></span> |
| <span class="line"><span style="color:#E5C07B;"> tagValues</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">add</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"t_value1"</span><span style="color:#56B6C2;"> +</span><span style="color:#ABB2BF;"> i);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> tagValues</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">add</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"t_value2"</span><span style="color:#56B6C2;"> +</span><span style="color:#ABB2BF;"> i);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> message</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">setTagKeys</span><span style="color:#ABB2BF;">(tagKeys);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> message</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">setTagValues</span><span style="color:#ABB2BF;">(tagValues);</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> // Parsing Attributes</span></span> |
| <span class="line"><span style="color:#E5C07B;"> List</span><span style="color:#ABB2BF;"><</span><span style="color:#E5C07B;">String</span><span style="color:#ABB2BF;">> </span><span style="color:#E06C75;">attributeKeys</span><span style="color:#56B6C2;"> =</span><span style="color:#C678DD;"> new</span><span style="color:#E5C07B;"> ArrayList</span><span style="color:#ABB2BF;"><>();</span></span> |
| <span class="line"><span style="color:#E5C07B;"> List</span><span style="color:#ABB2BF;"><</span><span style="color:#E5C07B;">Object</span><span style="color:#ABB2BF;">> </span><span style="color:#E06C75;">attributeValues</span><span style="color:#56B6C2;"> =</span><span style="color:#C678DD;"> new</span><span style="color:#E5C07B;"> ArrayList</span><span style="color:#ABB2BF;"><>();</span></span> |
| <span class="line"><span style="color:#E5C07B;"> attributeKeys</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">add</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"attr1"</span><span style="color:#56B6C2;"> +</span><span style="color:#ABB2BF;"> i);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> attributeKeys</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">add</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"attr2"</span><span style="color:#56B6C2;"> +</span><span style="color:#ABB2BF;"> i);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> attributeValues</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">add</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"a_value1"</span><span style="color:#56B6C2;"> +</span><span style="color:#ABB2BF;"> i);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> attributeValues</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">add</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"a_value2"</span><span style="color:#56B6C2;"> +</span><span style="color:#ABB2BF;"> i);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> message</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">setAttributeKeys</span><span style="color:#ABB2BF;">(attributeKeys);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> message</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">setAttributeValues</span><span style="color:#ABB2BF;">(attributeValues);</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> // Parsing Fields</span></span> |
| <span class="line"><span style="color:#E5C07B;"> List</span><span style="color:#ABB2BF;"><</span><span style="color:#E5C07B;">String</span><span style="color:#ABB2BF;">> </span><span style="color:#E06C75;">fields</span><span style="color:#56B6C2;"> =</span><span style="color:#E5C07B;"> Arrays</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">asList</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"field1"</span><span style="color:#56B6C2;"> +</span><span style="color:#ABB2BF;"> i, </span><span style="color:#98C379;">"field2"</span><span style="color:#56B6C2;"> +</span><span style="color:#ABB2BF;"> i);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> List</span><span style="color:#ABB2BF;"><</span><span style="color:#E5C07B;">TSDataType</span><span style="color:#ABB2BF;">> </span><span style="color:#E06C75;">dataTypes</span><span style="color:#56B6C2;"> =</span><span style="color:#E5C07B;"> Arrays</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">asList</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">TSDataType</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">FLOAT</span><span style="color:#ABB2BF;">, </span><span style="color:#E5C07B;">TSDataType</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">FLOAT</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> List</span><span style="color:#ABB2BF;"><</span><span style="color:#E5C07B;">Object</span><span style="color:#ABB2BF;">> </span><span style="color:#E06C75;">values</span><span style="color:#56B6C2;"> =</span><span style="color:#E5C07B;"> Arrays</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">asList</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"4.0"</span><span style="color:#56B6C2;"> +</span><span style="color:#ABB2BF;"> i, </span><span style="color:#98C379;">"5.0"</span><span style="color:#56B6C2;"> +</span><span style="color:#ABB2BF;"> i);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> message</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">setFields</span><span style="color:#ABB2BF;">(fields);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> message</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">setDataTypes</span><span style="color:#ABB2BF;">(dataTypes);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> message</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">setValues</span><span style="color:#ABB2BF;">(values);</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> //// Parsing timestamp</span></span> |
| <span class="line"><span style="color:#E5C07B;"> message</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">setTimestamp</span><span style="color:#ABB2BF;">(ts);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> ret</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">add</span><span style="color:#ABB2BF;">(message);</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#C678DD;"> return</span><span style="color:#ABB2BF;"> ret;</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#ABB2BF;"> @</span><span style="color:#E5C07B;">Override</span></span> |
| <span class="line"><span style="color:#C678DD;"> public</span><span style="color:#E5C07B;"> String</span><span style="color:#61AFEF;"> getName</span><span style="color:#ABB2BF;">()</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> // set the value of mqtt_payload_formatter in iotdb-system.properties as the following string:</span></span> |
| <span class="line"><span style="color:#C678DD;"> return</span><span style="color:#98C379;"> "CustomizedLine"</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;">}</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><ol start="3"><li>修改项目中的 <code>src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter</code> 文件:<br> 将示例中的文件内容清除,并将刚才的实现类的全名(包名.类名)写入文件中。注意,这个文件中只有一行。<br> 在本例中,文件内容为: <code>org.apache.iotdb.mqtt.server.CustomizedLinePayloadFormatter</code></li><li>编译项目生成一个 jar 包: <code>mvn package -DskipTests</code></li></ol><p>在 IoTDB 服务端:</p><ol><li>创建 \${IOTDB_HOME}/ext/mqtt/ 文件夹, 将刚才的 jar 包放入此文件夹。</li><li>打开 MQTT 服务参数. (<code>enable_mqtt_service=true</code> in <code>conf/iotdb-system.properties</code>)</li><li>用刚才的实现类中的 getName() 方法的返回值 设置为 <code>conf/iotdb-system.properties</code> 中 <code>mqtt_payload_formatter</code> 的值,<br> , 在本例中,为 <code>CustomizedLine</code></li><li>启动 IoTDB</li><li>搞定</li></ol><p>More: MQTT 协议的消息不限于 line,你还可以用任意二进制。通过如下函数获得:<br><code>payload.forEachByte()</code> or <code>payload.array</code>。</p><h2 id="_7-注意事项" tabindex="-1"><a class="header-anchor" href="#_7-注意事项"><span>7. 注意事项</span></a></h2><p>为避免因缺省client_id引发的兼容性问题,强烈建议在所有MQTT客户端中始终显式地提供唯一且非空的 client_id。<br> 不同客户端在client_id缺失或为空时的表现并不一致,常见示例如下:</p><ol><li>显式传入空字符串<br> • MQTTX:client_id=""时,IoTDB会直接丢弃消息;<br> • mosquitto_pub:client_id=""时,IoTDB能正常接收消息。</li><li>完全不传client_id<br> • MQTTX:消息可被IoTDB正常接收;<br> • mosquitto_pub:IoTDB拒绝连接。<br> 由此可见,显式指定唯一且非空的client_id是消除上述差异、确保消息可靠投递的最简单做法。</li></ol>`,40)])])}const B=a(t,[["render",r]]),y=JSON.parse('{"path":"/zh/UserGuide/latest-Table/API/Programming-MQTT.html","title":"MQTT 协议","lang":"zh-CN","frontmatter":{"description":"MQTT 协议 1. 概述 MQTT 是一种专为物联网(IoT)和低带宽环境设计的轻量级消息传输协议,基于发布/订阅(Pub/Sub)模型,支持设备间高效、可靠的双向通信。其核心目标是低功耗、低带宽消耗和高实时性,尤其适合网络不稳定或资源受限的场景(如传感器、移动设备)。 IoTDB 深度集成了 MQTT 协议能力,完整兼容 MQTT v3.1(OAS...","head":[["script",{"type":"application/ld+json"},"{\\"@context\\":\\"https://schema.org\\",\\"@type\\":\\"Article\\",\\"headline\\":\\"MQTT 协议\\",\\"image\\":[\\"https://iotdb.apache.org/img/mqtt-table-1.png\\",\\"https://iotdb.apache.org/img/mqtt-table-2.png\\"],\\"dateModified\\":\\"2026-01-12T10:25:39.000Z\\",\\"author\\":[]}"],["meta",{"property":"og:url","content":"https://iotdb.apache.org/zh/UserGuide/latest-Table/API/Programming-MQTT.html"}],["meta",{"property":"og:site_name","content":"IoTDB Website"}],["meta",{"property":"og:title","content":"MQTT 协议"}],["meta",{"property":"og:description","content":"MQTT 协议 1. 概述 MQTT 是一种专为物联网(IoT)和低带宽环境设计的轻量级消息传输协议,基于发布/订阅(Pub/Sub)模型,支持设备间高效、可靠的双向通信。其核心目标是低功耗、低带宽消耗和高实时性,尤其适合网络不稳定或资源受限的场景(如传感器、移动设备)。 IoTDB 深度集成了 MQTT 协议能力,完整兼容 MQTT v3.1(OAS..."}],["meta",{"property":"og:type","content":"article"}],["meta",{"property":"og:image","content":"https://iotdb.apache.org/img/mqtt-table-1.png"}],["meta",{"property":"og:locale","content":"zh-CN"}],["meta",{"property":"og:locale:alternate","content":"en-US"}],["meta",{"property":"og:updated_time","content":"2026-01-12T10:25:39.000Z"}],["meta",{"property":"article:modified_time","content":"2026-01-12T10:25:39.000Z"}],["link",{"rel":"alternate","hreflang":"en-us","href":"https://iotdb.apache.org/UserGuide/latest-Table/API/Programming-MQTT.html"}]]},"git":{"createdTime":1768213539000,"updatedTime":1768213539000,"contributors":[{"name":"CritasWang","username":"CritasWang","email":"critas@outlook.com","commits":1,"url":"https://github.com/CritasWang"}]},"readingTime":{"minutes":4.8,"words":1441},"filePathRelative":"zh/UserGuide/latest-Table/API/Programming-MQTT.md","autoDesc":true}');export{B as comp,y as data}; |