| import{_ as t,c as p,d as n,b as r,e as a,a as e,w as o,r as c,o as i}from"./app-C1IcKGP3.js";const d={};function B(y,s){const l=c("RouteLink");return i(),p("div",null,[s[4]||(s[4]=n("h1",{id:"数据订阅api",tabindex:"-1"},[n("a",{class:"header-anchor",href:"#数据订阅api"},[n("span",null,"数据订阅API")])],-1)),n("p",null,[s[1]||(s[1]=a("IoTDB 提供了强大的数据订阅功能,允许用户通过订阅 API 实时获取 IoTDB 新增的数据。详细的功能定义及介绍:",-1)),e(l,{to:"/zh/UserGuide/dev-1.3/User-Manual/Data-subscription.html"},{default:o(()=>[...s[0]||(s[0]=[a("数据订阅",-1)])]),_:1})]),s[5]||(s[5]=n("h2",{id:"_1-核心步骤",tabindex:"-1"},[n("a",{class:"header-anchor",href:"#_1-核心步骤"},[n("span",null,"1 核心步骤")])],-1)),s[6]||(s[6]=n("ol",null,[n("li",null,"创建Topic:创建一个Topic,Topic中包含希望订阅的测点。"),n("li",null,"订阅Topic:在 consumer 订阅 topic 前,topic 必须已经被创建,否则订阅会失败。同一个 consumer group 下的 consumers 会均分数据。"),n("li",null,"消费数据:只有显式订阅了某个 topic,才会收到对应 topic 的数据。"),n("li",null,"取消订阅: consumer close 时会退出对应的 consumer group,同时取消现存的所有订阅。")],-1)),s[7]||(s[7]=n("h2",{id:"_2-详细步骤",tabindex:"-1"},[n("a",{class:"header-anchor",href:"#_2-详细步骤"},[n("span",null,"2 详细步骤")])],-1)),n("p",null,[s[3]||(s[3]=a("本章节用于说明开发的核心流程,并未演示所有的参数和接口,如需了解全部功能及参数请参见: ",-1)),e(l,{to:"/zh/UserGuide/dev-1.3/API/Programming-Java-Native-API.html#_3-%E5%85%A8%E9%87%8F%E6%8E%A5%E5%8F%A3%E8%AF%B4%E6%98%8E"},{default:o(()=>[...s[2]||(s[2]=[a("全量接口说明",-1)])]),_:1})]),s[8]||(s[8]=r(`<h3 id="_2-1-创建maven项目" tabindex="-1"><a class="header-anchor" href="#_2-1-创建maven项目"><span>2.1 创建maven项目</span></a></h3><p>创建一个maven项目,并导入以下依赖(JDK >= 1.8, Maven >= 3.6)</p><div class="language-xml line-numbers-mode" data-highlighter="shiki" data-ext="xml" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-xml"><span class="line"><span style="color:#ABB2BF;"><</span><span style="color:#E06C75;">dependencies</span><span style="color:#ABB2BF;">></span></span> |
| <span class="line"><span style="color:#ABB2BF;"> <</span><span style="color:#E06C75;">dependency</span><span style="color:#ABB2BF;">></span></span> |
| <span class="line"><span style="color:#ABB2BF;"> <</span><span style="color:#E06C75;">groupId</span><span style="color:#ABB2BF;">>org.apache.iotdb</</span><span style="color:#E06C75;">groupId</span><span style="color:#ABB2BF;">></span></span> |
| <span class="line"><span style="color:#ABB2BF;"> <</span><span style="color:#E06C75;">artifactId</span><span style="color:#ABB2BF;">>iotdb-session</</span><span style="color:#E06C75;">artifactId</span><span style="color:#ABB2BF;">></span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> <!-- 版本号与数据库版本号相同 --></span></span> |
| <span class="line"><span style="color:#ABB2BF;"> <</span><span style="color:#E06C75;">version</span><span style="color:#ABB2BF;">>\${project.version}</</span><span style="color:#E06C75;">version</span><span style="color:#ABB2BF;">></span></span> |
| <span class="line"><span style="color:#ABB2BF;"> </</span><span style="color:#E06C75;">dependency</span><span style="color:#ABB2BF;">></span></span> |
| <span class="line"><span style="color:#ABB2BF;"></</span><span style="color:#E06C75;">dependencies</span><span style="color:#ABB2BF;">></span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="_2-2-代码案例" tabindex="-1"><a class="header-anchor" href="#_2-2-代码案例"><span>2.2 代码案例</span></a></h3><h4 id="_2-2-1-topic操作" tabindex="-1"><a class="header-anchor" href="#_2-2-1-topic操作"><span>2.2.1 Topic操作</span></a></h4><div class="language-java line-numbers-mode" data-highlighter="shiki" data-ext="java" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-java"><span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> java.util.Optional</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> java.util.Properties</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> java.util.Set</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.rpc.IoTDBConnectionException</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.rpc.StatementExecutionException</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.rpc.subscription.config.TopicConstant</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.session.subscription.SubscriptionSession</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.session.subscription.model.Topic</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#C678DD;">public</span><span style="color:#C678DD;"> class</span><span style="color:#E5C07B;"> DataConsumerExample</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#C678DD;"> public</span><span style="color:#C678DD;"> static</span><span style="color:#C678DD;"> void</span><span style="color:#61AFEF;"> main</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">String</span><span style="color:#ABB2BF;">[] </span><span style="color:#E06C75;font-style:italic;">args</span><span style="color:#ABB2BF;">)</span><span style="color:#C678DD;"> throws</span><span style="color:#E5C07B;"> IoTDBConnectionException</span><span style="color:#ABB2BF;">,</span><span style="color:#E5C07B;"> StatementExecutionException</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"><span style="color:#C678DD;"> try</span><span style="color:#ABB2BF;"> (</span><span style="color:#E5C07B;">SubscriptionSession</span><span style="color:#E06C75;"> session</span><span style="color:#56B6C2;"> =</span><span style="color:#C678DD;"> new</span><span style="color:#61AFEF;"> SubscriptionSession</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"127.0.0.1"</span><span style="color:#ABB2BF;">, </span><span style="color:#D19A66;">6667</span><span style="color:#ABB2BF;">, </span><span style="color:#98C379;">"root"</span><span style="color:#ABB2BF;">, </span><span style="color:#98C379;">"root"</span><span style="color:#ABB2BF;">, </span><span style="color:#D19A66;">67108864</span><span style="color:#ABB2BF;">)) {</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> // 1. open session</span></span> |
| <span class="line"><span style="color:#E5C07B;"> session</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">open</span><span style="color:#ABB2BF;">();</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> // 2. create a topic of all data</span></span> |
| <span class="line"><span style="color:#E5C07B;"> Properties</span><span style="color:#E06C75;"> sessionConfig</span><span style="color:#56B6C2;"> =</span><span style="color:#C678DD;"> new</span><span style="color:#61AFEF;"> Properties</span><span style="color:#ABB2BF;">();</span></span> |
| <span class="line"><span style="color:#E5C07B;"> sessionConfig</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">put</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">TopicConstant</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">PATH_KEY</span><span style="color:#ABB2BF;">, </span><span style="color:#98C379;">"root.**"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#E5C07B;"> session</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">createTopic</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"allData"</span><span style="color:#ABB2BF;">, sessionConfig);</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> // 3. show all topics</span></span> |
| <span class="line"><span style="color:#E5C07B;"> Set</span><span style="color:#ABB2BF;"><</span><span style="color:#E5C07B;">Topic</span><span style="color:#ABB2BF;">> </span><span style="color:#E06C75;">topics</span><span style="color:#56B6C2;"> =</span><span style="color:#E5C07B;"> session</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">getTopics</span><span style="color:#ABB2BF;">();</span></span> |
| <span class="line"><span style="color:#E5C07B;"> System</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">out</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">println</span><span style="color:#ABB2BF;">(topics);</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> // 4. show a specific topic</span></span> |
| <span class="line"><span style="color:#E5C07B;"> Optional</span><span style="color:#ABB2BF;"><</span><span style="color:#E5C07B;">Topic</span><span style="color:#ABB2BF;">> </span><span style="color:#E06C75;">allData</span><span style="color:#56B6C2;"> =</span><span style="color:#E5C07B;"> session</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">getTopic</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"allData"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> System</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">out</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">println</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">allData</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">get</span><span style="color:#ABB2BF;">());</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;">}</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h4 id="_2-2-2-数据消费" tabindex="-1"><a class="header-anchor" href="#_2-2-2-数据消费"><span>2.2.2 数据消费</span></a></h4><h5 id="场景-1-订阅iotdb中新增的实时数据-大屏或组态展示的场景" tabindex="-1"><a class="header-anchor" href="#场景-1-订阅iotdb中新增的实时数据-大屏或组态展示的场景"><span>场景-1: 订阅IoTDB中新增的实时数据(大屏或组态展示的场景)</span></a></h5><div class="language-java line-numbers-mode" data-highlighter="shiki" data-ext="java" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-java"><span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> java.io.IOException</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> java.util.List</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> java.util.Properties</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.rpc.subscription.config.ConsumerConstant</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.rpc.subscription.config.TopicConstant</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.session.subscription.payload.SubscriptionMessage</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.session.subscription.payload.SubscriptionMessageType</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.tsfile.read.common.RowRecord</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#C678DD;">public</span><span style="color:#C678DD;"> class</span><span style="color:#E5C07B;"> DataConsumerExample</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#C678DD;"> public</span><span style="color:#C678DD;"> static</span><span style="color:#C678DD;"> void</span><span style="color:#61AFEF;"> main</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">String</span><span style="color:#ABB2BF;">[] </span><span style="color:#E06C75;font-style:italic;">args</span><span style="color:#ABB2BF;">)</span><span style="color:#C678DD;"> throws</span><span style="color:#E5C07B;"> IOException</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> // 5. create a pull consumer, the subscription is automatically cancelled when the logic in the try resources is completed</span></span> |
| <span class="line"><span style="color:#E5C07B;"> Properties</span><span style="color:#E06C75;"> consumerConfig</span><span style="color:#56B6C2;"> =</span><span style="color:#C678DD;"> new</span><span style="color:#61AFEF;"> Properties</span><span style="color:#ABB2BF;">();</span></span> |
| <span class="line"><span style="color:#E5C07B;"> consumerConfig</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">put</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">ConsumerConstant</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">CONSUMER_ID_KEY</span><span style="color:#ABB2BF;">, </span><span style="color:#98C379;">"c1"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> consumerConfig</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">put</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">ConsumerConstant</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">CONSUMER_GROUP_ID_KEY</span><span style="color:#ABB2BF;">, </span><span style="color:#98C379;">"cg1"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> consumerConfig</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">put</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">ConsumerConstant</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">USERNAME_KEY</span><span style="color:#ABB2BF;">, </span><span style="color:#98C379;">"root"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> consumerConfig</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">put</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">ConsumerConstant</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">PASSWORD_KEY</span><span style="color:#ABB2BF;">, </span><span style="color:#98C379;">"root"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#C678DD;"> try</span><span style="color:#ABB2BF;"> (</span><span style="color:#E5C07B;">SubscriptionPullConsumer</span><span style="color:#E06C75;"> pullConsumer</span><span style="color:#56B6C2;"> =</span><span style="color:#C678DD;"> new</span><span style="color:#61AFEF;"> SubscriptionPullConsumer</span><span style="color:#ABB2BF;">(consumerConfig)) {</span></span> |
| <span class="line"><span style="color:#E5C07B;"> pullConsumer</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">open</span><span style="color:#ABB2BF;">();</span></span> |
| <span class="line"><span style="color:#E5C07B;"> pullConsumer</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">subscribe</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"topic_all"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#C678DD;"> while</span><span style="color:#ABB2BF;"> (</span><span style="color:#D19A66;">true</span><span style="color:#ABB2BF;">) {</span></span> |
| <span class="line"><span style="color:#E5C07B;"> List</span><span style="color:#ABB2BF;"><</span><span style="color:#E5C07B;">SubscriptionMessage</span><span style="color:#ABB2BF;">> </span><span style="color:#E06C75;">messages</span><span style="color:#56B6C2;"> =</span><span style="color:#E5C07B;"> pullConsumer</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">poll</span><span style="color:#ABB2BF;">(</span><span style="color:#D19A66;">10000</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#C678DD;"> for</span><span style="color:#ABB2BF;"> (</span><span style="color:#C678DD;">final</span><span style="color:#E5C07B;"> SubscriptionMessage</span><span style="color:#E06C75;"> message</span><span style="color:#C678DD;"> :</span><span style="color:#ABB2BF;"> messages) {</span></span> |
| <span class="line"><span style="color:#C678DD;"> final</span><span style="color:#C678DD;"> short</span><span style="color:#E06C75;"> messageType</span><span style="color:#56B6C2;"> =</span><span style="color:#E5C07B;"> message</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">getMessageType</span><span style="color:#ABB2BF;">();</span></span> |
| <span class="line"><span style="color:#C678DD;"> if</span><span style="color:#ABB2BF;"> (</span><span style="color:#E5C07B;">SubscriptionMessageType</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">isValidatedMessageType</span><span style="color:#ABB2BF;">(messageType)) {</span></span> |
| <span class="line"><span style="color:#C678DD;"> for</span><span style="color:#ABB2BF;"> (</span><span style="color:#C678DD;">final</span><span style="color:#E5C07B;"> SubscriptionSessionDataSet</span><span style="color:#E06C75;"> dataSet</span><span style="color:#C678DD;"> :</span><span style="color:#E5C07B;"> message</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">getSessionDataSetsHandler</span><span style="color:#ABB2BF;">()) {</span></span> |
| <span class="line"><span style="color:#C678DD;"> while</span><span style="color:#ABB2BF;"> (</span><span style="color:#E5C07B;">dataSet</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">hasNext</span><span style="color:#ABB2BF;">()) {</span></span> |
| <span class="line"><span style="color:#C678DD;"> final</span><span style="color:#E5C07B;"> RowRecord</span><span style="color:#E06C75;"> record</span><span style="color:#56B6C2;"> =</span><span style="color:#E5C07B;"> dataSet</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">next</span><span style="color:#ABB2BF;">();</span></span> |
| <span class="line"><span style="color:#E5C07B;"> System</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">out</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">println</span><span style="color:#ABB2BF;">(record);</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;">}</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h5 id="场景-2-订阅新增的-tsfile-定期数据备份的场景" tabindex="-1"><a class="header-anchor" href="#场景-2-订阅新增的-tsfile-定期数据备份的场景"><span>场景-2:订阅新增的 TsFile(定期数据备份的场景)</span></a></h5><p>前提:需要被消费的topic的格式为TsfileHandler类型,举例:<code>create topic topic_all_tsfile with ('path'='root.**','format'='TsFileHandler')</code></p><div class="language-java line-numbers-mode" data-highlighter="shiki" data-ext="java" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-java"><span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> java.io.IOException</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> java.util.List</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> java.util.Properties</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.rpc.subscription.config.ConsumerConstant</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.rpc.subscription.config.TopicConstant</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.session.subscription.payload.SubscriptionMessage</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#C678DD;">public</span><span style="color:#C678DD;"> class</span><span style="color:#E5C07B;"> DataConsumerExample</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#C678DD;"> public</span><span style="color:#C678DD;"> static</span><span style="color:#C678DD;"> void</span><span style="color:#61AFEF;"> main</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">String</span><span style="color:#ABB2BF;">[] </span><span style="color:#E06C75;font-style:italic;">args</span><span style="color:#ABB2BF;">)</span><span style="color:#C678DD;"> throws</span><span style="color:#E5C07B;"> IOException</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> // 1. create a pull consumer, the subscription is automatically cancelled when the logic in the try resources is completed</span></span> |
| <span class="line"><span style="color:#E5C07B;"> Properties</span><span style="color:#E06C75;"> consumerConfig</span><span style="color:#56B6C2;"> =</span><span style="color:#C678DD;"> new</span><span style="color:#61AFEF;"> Properties</span><span style="color:#ABB2BF;">();</span></span> |
| <span class="line"><span style="color:#E5C07B;"> consumerConfig</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">put</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">ConsumerConstant</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">CONSUMER_ID_KEY</span><span style="color:#ABB2BF;">, </span><span style="color:#98C379;">"c1"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> consumerConfig</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">put</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">ConsumerConstant</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">CONSUMER_GROUP_ID_KEY</span><span style="color:#ABB2BF;">, </span><span style="color:#98C379;">"cg1"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> consumerConfig</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">put</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">ConsumerConstant</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">USERNAME_KEY</span><span style="color:#ABB2BF;">, </span><span style="color:#98C379;">"root"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> consumerConfig</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">put</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">ConsumerConstant</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">PASSWORD_KEY</span><span style="color:#ABB2BF;">, </span><span style="color:#98C379;">"root"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> consumerConfig</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">put</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">ConsumerConstant</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">FILE_SAVE_DIR_KEY</span><span style="color:#ABB2BF;">, </span><span style="color:#98C379;">"/Users/iotdb/Downloads"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#C678DD;"> try</span><span style="color:#ABB2BF;"> (</span><span style="color:#E5C07B;">SubscriptionPullConsumer</span><span style="color:#E06C75;"> pullConsumer</span><span style="color:#56B6C2;"> =</span><span style="color:#C678DD;"> new</span><span style="color:#61AFEF;"> SubscriptionPullConsumer</span><span style="color:#ABB2BF;">(consumerConfig)) {</span></span> |
| <span class="line"><span style="color:#E5C07B;"> pullConsumer</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">open</span><span style="color:#ABB2BF;">();</span></span> |
| <span class="line"><span style="color:#E5C07B;"> pullConsumer</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">subscribe</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"topic_all_tsfile"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#C678DD;"> while</span><span style="color:#ABB2BF;"> (</span><span style="color:#D19A66;">true</span><span style="color:#ABB2BF;">) {</span></span> |
| <span class="line"><span style="color:#E5C07B;"> List</span><span style="color:#ABB2BF;"><</span><span style="color:#E5C07B;">SubscriptionMessage</span><span style="color:#ABB2BF;">> </span><span style="color:#E06C75;">messages</span><span style="color:#56B6C2;"> =</span><span style="color:#E5C07B;"> pullConsumer</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">poll</span><span style="color:#ABB2BF;">(</span><span style="color:#D19A66;">10000</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#C678DD;"> for</span><span style="color:#ABB2BF;"> (</span><span style="color:#C678DD;">final</span><span style="color:#E5C07B;"> SubscriptionMessage</span><span style="color:#E06C75;"> message</span><span style="color:#C678DD;"> :</span><span style="color:#ABB2BF;"> messages) {</span></span> |
| <span class="line"><span style="color:#E5C07B;"> message</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">getTsFileHandler</span><span style="color:#ABB2BF;">().</span><span style="color:#61AFEF;">copyFile</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"/Users/iotdb/Downloads/1.tsfile"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;">}</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h2 id="_2-全量接口说明" tabindex="-1"><a class="header-anchor" href="#_2-全量接口说明"><span>2 全量接口说明</span></a></h2><h3 id="_2-1-参数列表" tabindex="-1"><a class="header-anchor" href="#_2-1-参数列表"><span>2.1 参数列表</span></a></h3><p>可通过Properties参数对象设置消费者相关参数,具体参数如下。</p><h4 id="_2-1-1-subscriptionconsumer" tabindex="-1"><a class="header-anchor" href="#_2-1-1-subscriptionconsumer"><span>2.1.1 SubscriptionConsumer</span></a></h4><table><thead><tr><th style="text-align:left;">参数</th><th style="text-align:left;">是否必填(默认值)</th><th style="text-align:left;">参数含义</th></tr></thead><tbody><tr><td style="text-align:left;">host</td><td style="text-align:left;">optional: 127.0.0.1</td><td style="text-align:left;"><code>String</code>: IoTDB 中某 DataNode 的 RPC host</td></tr><tr><td style="text-align:left;">port</td><td style="text-align:left;">optional: 6667</td><td style="text-align:left;"><code>Integer</code>: IoTDB 中某 DataNode 的 RPC port</td></tr><tr><td style="text-align:left;">node-urls</td><td style="text-align:left;">optional: 127.0.0.1:6667</td><td style="text-align:left;"><code>List<String></code>: IoTDB 中所有 DataNode 的 RPC 地址,可以是多个;host:port 和 node-urls 选填一个即可。当 host:port 和 node-urls 都填写了,则取 host:port 和 node-urls 的<strong>并集</strong>构成新的 node-urls 应用</td></tr><tr><td style="text-align:left;">username</td><td style="text-align:left;">optional: root</td><td style="text-align:left;"><code>String</code>: IoTDB 中 DataNode 的用户名</td></tr><tr><td style="text-align:left;">password</td><td style="text-align:left;">optional: root</td><td style="text-align:left;"><code>String</code>: IoTDB 中 DataNode 的密码</td></tr><tr><td style="text-align:left;">groupId</td><td style="text-align:left;">optional</td><td style="text-align:left;"><code>String</code>: consumer group id,若未指定则随机分配(新的 consumer group),保证不同的 consumer group 对应的 consumer group id 均不相同</td></tr><tr><td style="text-align:left;">consumerId</td><td style="text-align:left;">optional</td><td style="text-align:left;"><code>String</code>: consumer client id,若未指定则随机分配,保证同一个 consumer group 中每一个 consumer client id 均不相同</td></tr><tr><td style="text-align:left;">heartbeatIntervalMs</td><td style="text-align:left;">optional: 30000 (min: 1000)</td><td style="text-align:left;"><code>Long</code>: consumer 向 IoTDB DataNode 定期发送心跳请求的间隔</td></tr><tr><td style="text-align:left;">endpointsSyncIntervalMs</td><td style="text-align:left;">optional: 120000 (min: 5000)</td><td style="text-align:left;"><code>Long</code>: consumer 探测 IoTDB 集群节点扩缩容情况调整订阅连接的间隔</td></tr><tr><td style="text-align:left;">fileSaveDir</td><td style="text-align:left;">optional: Paths.get(System.getProperty("user.dir"), "iotdb-subscription").toString()</td><td style="text-align:left;"><code>String</code>: consumer 订阅出的 TsFile 文件临时存放的目录路径</td></tr><tr><td style="text-align:left;">fileSaveFsync</td><td style="text-align:left;">optional: false</td><td style="text-align:left;"><code>Boolean</code>: consumer 订阅 TsFile 的过程中是否主动调用 fsync</td></tr></tbody></table><p><code>SubscriptionPushConsumer</code> 中的特殊配置:</p><table><thead><tr><th style="text-align:left;">参数</th><th style="text-align:left;">是否必填(默认值)</th><th style="text-align:left;">参数含义</th></tr></thead><tbody><tr><td style="text-align:left;">ackStrategy</td><td style="text-align:left;">optional: <code>ACKStrategy.AFTER_CONSUME</code></td><td style="text-align:left;">消费进度的确认机制包含以下选项:<code>ACKStrategy.BEFORE_CONSUME</code>(当 consumer 收到数据时立刻提交消费进度,<code>onReceive</code> 前)<code>ACKStrategy.AFTER_CONSUME</code>(当 consumer 消费完数据再去提交消费进度,<code>onReceive</code> 后)</td></tr><tr><td style="text-align:left;">consumeListener</td><td style="text-align:left;">optional</td><td style="text-align:left;">消费数据的回调函数,需实现 <code>ConsumeListener</code> 接口,定义消费 <code>SessionDataSetsHandler</code> 和 <code>TsFileHandler</code> 形式数据的处理逻辑</td></tr><tr><td style="text-align:left;">autoPollIntervalMs</td><td style="text-align:left;">optional: 5000 (min: 500)</td><td style="text-align:left;">Long: consumer 自动拉取数据的时间间隔,单位为<strong>毫秒</strong></td></tr><tr><td style="text-align:left;">autoPollTimeoutMs</td><td style="text-align:left;">optional: 10000 (min: 1000)</td><td style="text-align:left;">Long: consumer 每次拉取数据的超时时间,单位为<strong>毫秒</strong></td></tr></tbody></table><p><code>SubscriptionPullConsumer</code> 中的特殊配置:</p><table><thead><tr><th style="text-align:left;">参数</th><th style="text-align:left;">是否必填(默认值)</th><th style="text-align:left;">参数含义</th></tr></thead><tbody><tr><td style="text-align:left;">autoCommit</td><td style="text-align:left;">optional: true</td><td style="text-align:left;">Boolean: 是否自动提交消费进度如果此参数设置为 false,则需要调用 <code>commit</code> 方法来手动提交消费进度</td></tr><tr><td style="text-align:left;">autoCommitInterval</td><td style="text-align:left;">optional: 5000 (min: 500)</td><td style="text-align:left;">Long: 自动提交消费进度的时间间隔,单位为<strong>毫秒</strong>仅当 autoCommit 参数为 true 的时候才会生效</td></tr></tbody></table><h3 id="函数列表" tabindex="-1"><a class="header-anchor" href="#函数列表"><span>函数列表</span></a></h3><h4 id="数据订阅" tabindex="-1"><a class="header-anchor" href="#数据订阅"><span>数据订阅</span></a></h4><h5 id="subscriptionpullconsumer" tabindex="-1"><a class="header-anchor" href="#subscriptionpullconsumer"><span>SubscriptionPullConsumer</span></a></h5><table><thead><tr><th><strong>函数名</strong></th><th><strong>说明</strong></th><th><strong>参数</strong></th></tr></thead><tbody><tr><td><code>open()</code></td><td>打开消费者连接,启动消息消费。如果 <code>autoCommit</code> 启用,会启动自动提交工作器。</td><td>无</td></tr><tr><td><code>close()</code></td><td>关闭消费者连接。如果 <code>autoCommit</code> 启用,会在关闭前提交所有未提交的消息。</td><td>无</td></tr><tr><td><code>poll(final Duration timeout)</code></td><td>拉取消息,指定超时时间。</td><td><code>timeout</code> : 拉取的超时时间。</td></tr><tr><td><code>poll(final long timeoutMs)</code></td><td>拉取消息,指定超时时间(毫秒)。</td><td><code>timeoutMs</code> : 超时时间,单位为毫秒。</td></tr><tr><td><code>poll(final Set<String> topicNames, final Duration timeout)</code></td><td>拉取指定主题的消息,指定超时时间。</td><td><code>topicNames</code> : 要拉取的主题集合。<code>timeout</code>: 超时时间。</td></tr><tr><td><code>poll(final Set<String> topicNames, final long timeoutMs)</code></td><td>拉取指定主题的消息,指定超时时间(毫秒)。</td><td><code>topicNames</code> : 要拉取的主题集合。<code>timeoutMs</code>: 超时时间,单位为毫秒。</td></tr><tr><td><code>commitSync(final SubscriptionMessage message)</code></td><td>同步提交单条消息。</td><td><code>message</code> : 需要提交的消息对象。</td></tr><tr><td><code>commitSync(final Iterable<SubscriptionMessage> messages)</code></td><td>同步提交多条消息。</td><td><code>messages</code> : 需要提交的消息集合。</td></tr><tr><td><code>commitAsync(final SubscriptionMessage message)</code></td><td>异步提交单条消息。</td><td><code>message</code> : 需要提交的消息对象。</td></tr><tr><td><code>commitAsync(final Iterable<SubscriptionMessage> messages)</code></td><td>异步提交多条消息。</td><td><code>messages</code> : 需要提交的消息集合。</td></tr><tr><td><code>commitAsync(final SubscriptionMessage message, final AsyncCommitCallback callback)</code></td><td>异步提交单条消息并指定回调函数。</td><td><code>message</code> : 需要提交的消息对象。<code>callback</code> : 异步提交完成后的回调函数。</td></tr><tr><td><code>commitAsync(final Iterable<SubscriptionMessage> messages, final AsyncCommitCallback callback)</code></td><td>异步提交多条消息并指定回调函数。</td><td><code>messages</code> : 需要提交的消息集合。<code>callback</code> : 异步提交完成后的回调函数。</td></tr></tbody></table><h5 id="subscriptionpushconsumer" tabindex="-1"><a class="header-anchor" href="#subscriptionpushconsumer"><span>SubscriptionPushConsumer</span></a></h5><table><thead><tr><th><strong>函数名</strong></th><th><strong>说明</strong></th><th><strong>参数</strong></th></tr></thead><tbody><tr><td><code>open()</code></td><td>打开消费者连接,启动消息消费,提交自动轮询工作器。</td><td>无</td></tr><tr><td><code>close()</code></td><td>关闭消费者连接,停止消息消费。</td><td>无</td></tr><tr><td><code>toString()</code></td><td>返回消费者对象的核心配置信息。</td><td>无</td></tr><tr><td><code>coreReportMessage()</code></td><td>获取消费者核心配置的键值对表示形式。</td><td>无</td></tr><tr><td><code>allReportMessage()</code></td><td>获取消费者所有配置的键值对表示形式。</td><td>无</td></tr><tr><td><code>buildPushConsumer()</code></td><td>通过 <code>Builder</code> 构建 <code>SubscriptionPushConsumer</code> 实例。</td><td>无</td></tr><tr><td><code>ackStrategy(final AckStrategy ackStrategy)</code></td><td>配置消费者的消息确认策略。</td><td><code>ackStrategy</code>: 指定的消息确认策略。</td></tr><tr><td><code>consumeListener(final ConsumeListener consumeListener)</code></td><td>配置消费者的消息消费逻辑。</td><td><code>consumeListener</code>: 消费者接收消息时的处理逻辑。</td></tr><tr><td><code>autoPollIntervalMs(final long autoPollIntervalMs)</code></td><td>配置自动轮询的时间间隔。</td><td><code>autoPollIntervalMs</code> : 自动轮询的间隔时间,单位为毫秒。</td></tr><tr><td><code>autoPollTimeoutMs(final long autoPollTimeoutMs)</code></td><td>配置自动轮询的超时时间。</td><td><code>autoPollTimeoutMs</code>: 自动轮询的超时时间,单位为毫秒。</td></tr></tbody></table>`,27))])}const m=t(d,[["render",B]]),C=JSON.parse('{"path":"/zh/UserGuide/dev-1.3/API/Programming-Data-Subscription.html","title":"数据订阅API","lang":"zh-CN","frontmatter":{"description":"数据订阅API IoTDB 提供了强大的数据订阅功能,允许用户通过订阅 API 实时获取 IoTDB 新增的数据。详细的功能定义及介绍: 1 核心步骤 创建Topic:创建一个Topic,Topic中包含希望订阅的测点。 订阅Topic:在 consumer 订阅 topic 前,topic 必须已经被创建,否则订阅会失败。同一个 consumer g...","head":[["script",{"type":"application/ld+json"},"{\\"@context\\":\\"https://schema.org\\",\\"@type\\":\\"Article\\",\\"headline\\":\\"数据订阅API\\",\\"image\\":[\\"\\"],\\"dateModified\\":\\"2026-01-12T10:25:39.000Z\\",\\"author\\":[]}"],["meta",{"property":"og:url","content":"https://iotdb.apache.org/zh/UserGuide/dev-1.3/API/Programming-Data-Subscription.html"}],["meta",{"property":"og:site_name","content":"IoTDB Website"}],["meta",{"property":"og:title","content":"数据订阅API"}],["meta",{"property":"og:description","content":"数据订阅API IoTDB 提供了强大的数据订阅功能,允许用户通过订阅 API 实时获取 IoTDB 新增的数据。详细的功能定义及介绍: 1 核心步骤 创建Topic:创建一个Topic,Topic中包含希望订阅的测点。 订阅Topic:在 consumer 订阅 topic 前,topic 必须已经被创建,否则订阅会失败。同一个 consumer g..."}],["meta",{"property":"og:type","content":"article"}],["meta",{"property":"og:locale","content":"zh-CN"}],["meta",{"property":"og:locale:alternate","content":"en-US"}],["meta",{"property":"og:updated_time","content":"2026-01-12T10:25:39.000Z"}],["meta",{"property":"article:modified_time","content":"2026-01-12T10:25:39.000Z"}],["link",{"rel":"alternate","hreflang":"en-us","href":"https://iotdb.apache.org/UserGuide/dev-1.3/API/Programming-Data-Subscription.html"}]]},"git":{"createdTime":1768213539000,"updatedTime":1768213539000,"contributors":[{"name":"CritasWang","username":"CritasWang","email":"critas@outlook.com","commits":1,"url":"https://github.com/CritasWang"}]},"readingTime":{"minutes":6.47,"words":1941},"filePathRelative":"zh/UserGuide/dev-1.3/API/Programming-Data-Subscription.md","autoDesc":true}');export{m as comp,C as data}; |