| import{_ as l,c as p,d as n,b as r,e,a as t,w as o,r as i,o as c}from"./app-C1IcKGP3.js";const d={};function B(y,s){const a=i("RouteLink");return c(),p("div",null,[s[4]||(s[4]=n("h1",{id:"data-subscription-api",tabindex:"-1"},[n("a",{class:"header-anchor",href:"#data-subscription-api"},[n("span",null,"Data subscription API")])],-1)),n("p",null,[s[1]||(s[1]=e("IoTDB provides powerful data subscription functionality, allowing users to access newly added data from IoTDB in real-time through subscription APIs. For detailed functional definitions and introductions:",-1)),t(a,{to:"/UserGuide/V1.3.x/User-Manual/Data-subscription.html"},{default:o(()=>[...s[0]||(s[0]=[e("Data subscription",-1)])]),_:1})]),s[5]||(s[5]=n("h2",{id:"_1-core-steps",tabindex:"-1"},[n("a",{class:"header-anchor",href:"#_1-core-steps"},[n("span",null,"1 Core Steps")])],-1)),s[6]||(s[6]=n("ol",null,[n("li",null,"Create Topic: Create a Topic that includes the measurement points you wish to subscribe to."),n("li",null,"Subscribe to Topic: Before a consumer subscribes to a topic, the topic must have been created, otherwise the subscription will fail. Consumers under the same consumer group will evenly distribute the data."),n("li",null,"Consume Data: Only by explicitly subscribing to a specific topic will you receive data from that topic."),n("li",null,"Unsubscribe: When a consumer is closed, it will exit the corresponding consumer group and cancel all existing subscriptions.")],-1)),s[7]||(s[7]=n("h2",{id:"_2-detailed-steps",tabindex:"-1"},[n("a",{class:"header-anchor",href:"#_2-detailed-steps"},[n("span",null,"2 Detailed Steps")])],-1)),n("p",null,[s[3]||(s[3]=e("This section is used to illustrate the core development process and does not demonstrate all parameters and interfaces. For a comprehensive understanding of all features and parameters, please refer to: ",-1)),t(a,{to:"/UserGuide/V1.3.x/API/Programming-Java-Native-API.html#_3-native-interface-description"},{default:o(()=>[...s[2]||(s[2]=[e("Java Native API",-1)])]),_:1})]),s[8]||(s[8]=r(`<h3 id="_2-1-create-a-maven-project" tabindex="-1"><a class="header-anchor" href="#_2-1-create-a-maven-project"><span>2.1 Create a Maven project</span></a></h3><p>Create a Maven project and import the following dependencies(JDK >= 1.8, Maven >= 3.6)</p><div class="language-xml line-numbers-mode" data-highlighter="shiki" data-ext="xml" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-xml"><span class="line"><span style="color:#ABB2BF;"><</span><span style="color:#E06C75;">dependencies</span><span style="color:#ABB2BF;">></span></span> |
| <span class="line"><span style="color:#ABB2BF;"> <</span><span style="color:#E06C75;">dependency</span><span style="color:#ABB2BF;">></span></span> |
| <span class="line"><span style="color:#ABB2BF;"> <</span><span style="color:#E06C75;">groupId</span><span style="color:#ABB2BF;">>org.apache.iotdb</</span><span style="color:#E06C75;">groupId</span><span style="color:#ABB2BF;">></span></span> |
| <span class="line"><span style="color:#ABB2BF;"> <</span><span style="color:#E06C75;">artifactId</span><span style="color:#ABB2BF;">>iotdb-session</</span><span style="color:#E06C75;">artifactId</span><span style="color:#ABB2BF;">></span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> <!-- The version number is the same as the database version number --></span></span> |
| <span class="line"><span style="color:#ABB2BF;"> <</span><span style="color:#E06C75;">version</span><span style="color:#ABB2BF;">>\${project.version}</</span><span style="color:#E06C75;">version</span><span style="color:#ABB2BF;">></span></span> |
| <span class="line"><span style="color:#ABB2BF;"> </</span><span style="color:#E06C75;">dependency</span><span style="color:#ABB2BF;">></span></span> |
| <span class="line"><span style="color:#ABB2BF;"></</span><span style="color:#E06C75;">dependencies</span><span style="color:#ABB2BF;">></span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="_2-2-code-example" tabindex="-1"><a class="header-anchor" href="#_2-2-code-example"><span>2.2 Code Example</span></a></h3><h4 id="_2-2-1-topic-operations" tabindex="-1"><a class="header-anchor" href="#_2-2-1-topic-operations"><span>2.2.1 Topic operations</span></a></h4><div class="language-java line-numbers-mode" data-highlighter="shiki" data-ext="java" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-java"><span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> java.util.Optional</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> java.util.Properties</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> java.util.Set</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.rpc.IoTDBConnectionException</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.rpc.StatementExecutionException</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.rpc.subscription.config.TopicConstant</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.session.subscription.SubscriptionSession</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.session.subscription.model.Topic</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#C678DD;">public</span><span style="color:#C678DD;"> class</span><span style="color:#E5C07B;"> DataConsumerExample</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#C678DD;"> public</span><span style="color:#C678DD;"> static</span><span style="color:#C678DD;"> void</span><span style="color:#61AFEF;"> main</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">String</span><span style="color:#ABB2BF;">[] </span><span style="color:#E06C75;font-style:italic;">args</span><span style="color:#ABB2BF;">)</span><span style="color:#C678DD;"> throws</span><span style="color:#E5C07B;"> IoTDBConnectionException</span><span style="color:#ABB2BF;">,</span><span style="color:#E5C07B;"> StatementExecutionException</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"><span style="color:#C678DD;"> try</span><span style="color:#ABB2BF;"> (</span><span style="color:#E5C07B;">SubscriptionSession</span><span style="color:#E06C75;"> session</span><span style="color:#56B6C2;"> =</span><span style="color:#C678DD;"> new</span><span style="color:#61AFEF;"> SubscriptionSession</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"127.0.0.1"</span><span style="color:#ABB2BF;">, </span><span style="color:#D19A66;">6667</span><span style="color:#ABB2BF;">, </span><span style="color:#98C379;">"root"</span><span style="color:#ABB2BF;">, </span><span style="color:#98C379;">"root"</span><span style="color:#ABB2BF;">, </span><span style="color:#D19A66;">67108864</span><span style="color:#ABB2BF;">)) {</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> // 1. open session</span></span> |
| <span class="line"><span style="color:#E5C07B;"> session</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">open</span><span style="color:#ABB2BF;">();</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> // 2. create a topic of all data</span></span> |
| <span class="line"><span style="color:#E5C07B;"> Properties</span><span style="color:#E06C75;"> sessionConfig</span><span style="color:#56B6C2;"> =</span><span style="color:#C678DD;"> new</span><span style="color:#61AFEF;"> Properties</span><span style="color:#ABB2BF;">();</span></span> |
| <span class="line"><span style="color:#E5C07B;"> sessionConfig</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">put</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">TopicConstant</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">PATH_KEY</span><span style="color:#ABB2BF;">, </span><span style="color:#98C379;">"root.**"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#E5C07B;"> session</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">createTopic</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"allData"</span><span style="color:#ABB2BF;">, sessionConfig);</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> // 3. show all topics</span></span> |
| <span class="line"><span style="color:#E5C07B;"> Set</span><span style="color:#ABB2BF;"><</span><span style="color:#E5C07B;">Topic</span><span style="color:#ABB2BF;">> </span><span style="color:#E06C75;">topics</span><span style="color:#56B6C2;"> =</span><span style="color:#E5C07B;"> session</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">getTopics</span><span style="color:#ABB2BF;">();</span></span> |
| <span class="line"><span style="color:#E5C07B;"> System</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">out</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">println</span><span style="color:#ABB2BF;">(topics);</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> // 4. show a specific topic</span></span> |
| <span class="line"><span style="color:#E5C07B;"> Optional</span><span style="color:#ABB2BF;"><</span><span style="color:#E5C07B;">Topic</span><span style="color:#ABB2BF;">> </span><span style="color:#E06C75;">allData</span><span style="color:#56B6C2;"> =</span><span style="color:#E5C07B;"> session</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">getTopic</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"allData"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> System</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">out</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">println</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">allData</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">get</span><span style="color:#ABB2BF;">());</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;">}</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h4 id="_2-2-2-data-consume" tabindex="-1"><a class="header-anchor" href="#_2-2-2-data-consume"><span>2.2.2 Data Consume</span></a></h4><h5 id="scenario-1-subscribing-to-newly-added-real-time-data-in-iotdb-for-scenarios-such-as-dashboard-or-configuration-display" tabindex="-1"><a class="header-anchor" href="#scenario-1-subscribing-to-newly-added-real-time-data-in-iotdb-for-scenarios-such-as-dashboard-or-configuration-display"><span>Scenario-1: Subscribing to newly added real-time data in IoTDB (for scenarios such as dashboard or configuration display)</span></a></h5><div class="language-java line-numbers-mode" data-highlighter="shiki" data-ext="java" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-java"><span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> java.io.IOException</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> java.util.List</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> java.util.Properties</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.rpc.subscription.config.ConsumerConstant</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.rpc.subscription.config.TopicConstant</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.session.subscription.payload.SubscriptionMessage</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.session.subscription.payload.SubscriptionMessageType</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.tsfile.read.common.RowRecord</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#C678DD;">public</span><span style="color:#C678DD;"> class</span><span style="color:#E5C07B;"> DataConsumerExample</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#C678DD;"> public</span><span style="color:#C678DD;"> static</span><span style="color:#C678DD;"> void</span><span style="color:#61AFEF;"> main</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">String</span><span style="color:#ABB2BF;">[] </span><span style="color:#E06C75;font-style:italic;">args</span><span style="color:#ABB2BF;">)</span><span style="color:#C678DD;"> throws</span><span style="color:#E5C07B;"> IOException</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> // 5. create a pull consumer, the subscription is automatically cancelled when the logic in the try resources is completed</span></span> |
| <span class="line"><span style="color:#E5C07B;"> Properties</span><span style="color:#E06C75;"> consumerConfig</span><span style="color:#56B6C2;"> =</span><span style="color:#C678DD;"> new</span><span style="color:#61AFEF;"> Properties</span><span style="color:#ABB2BF;">();</span></span> |
| <span class="line"><span style="color:#E5C07B;"> consumerConfig</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">put</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">ConsumerConstant</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">CONSUMER_ID_KEY</span><span style="color:#ABB2BF;">, </span><span style="color:#98C379;">"c1"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> consumerConfig</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">put</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">ConsumerConstant</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">CONSUMER_GROUP_ID_KEY</span><span style="color:#ABB2BF;">, </span><span style="color:#98C379;">"cg1"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> consumerConfig</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">put</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">ConsumerConstant</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">USERNAME_KEY</span><span style="color:#ABB2BF;">, </span><span style="color:#98C379;">"root"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> consumerConfig</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">put</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">ConsumerConstant</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">PASSWORD_KEY</span><span style="color:#ABB2BF;">, </span><span style="color:#98C379;">"root"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#C678DD;"> try</span><span style="color:#ABB2BF;"> (</span><span style="color:#E5C07B;">SubscriptionPullConsumer</span><span style="color:#E06C75;"> pullConsumer</span><span style="color:#56B6C2;"> =</span><span style="color:#C678DD;"> new</span><span style="color:#61AFEF;"> SubscriptionPullConsumer</span><span style="color:#ABB2BF;">(consumerConfig)) {</span></span> |
| <span class="line"><span style="color:#E5C07B;"> pullConsumer</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">open</span><span style="color:#ABB2BF;">();</span></span> |
| <span class="line"><span style="color:#E5C07B;"> pullConsumer</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">subscribe</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"topic_all"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#C678DD;"> while</span><span style="color:#ABB2BF;"> (</span><span style="color:#D19A66;">true</span><span style="color:#ABB2BF;">) {</span></span> |
| <span class="line"><span style="color:#E5C07B;"> List</span><span style="color:#ABB2BF;"><</span><span style="color:#E5C07B;">SubscriptionMessage</span><span style="color:#ABB2BF;">> </span><span style="color:#E06C75;">messages</span><span style="color:#56B6C2;"> =</span><span style="color:#E5C07B;"> pullConsumer</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">poll</span><span style="color:#ABB2BF;">(</span><span style="color:#D19A66;">10000</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#C678DD;"> for</span><span style="color:#ABB2BF;"> (</span><span style="color:#C678DD;">final</span><span style="color:#E5C07B;"> SubscriptionMessage</span><span style="color:#E06C75;"> message</span><span style="color:#C678DD;"> :</span><span style="color:#ABB2BF;"> messages) {</span></span> |
| <span class="line"><span style="color:#C678DD;"> final</span><span style="color:#C678DD;"> short</span><span style="color:#E06C75;"> messageType</span><span style="color:#56B6C2;"> =</span><span style="color:#E5C07B;"> message</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">getMessageType</span><span style="color:#ABB2BF;">();</span></span> |
| <span class="line"><span style="color:#C678DD;"> if</span><span style="color:#ABB2BF;"> (</span><span style="color:#E5C07B;">SubscriptionMessageType</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">isValidatedMessageType</span><span style="color:#ABB2BF;">(messageType)) {</span></span> |
| <span class="line"><span style="color:#C678DD;"> for</span><span style="color:#ABB2BF;"> (</span><span style="color:#C678DD;">final</span><span style="color:#E5C07B;"> SubscriptionSessionDataSet</span><span style="color:#E06C75;"> dataSet</span><span style="color:#C678DD;"> :</span><span style="color:#E5C07B;"> message</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">getSessionDataSetsHandler</span><span style="color:#ABB2BF;">()) {</span></span> |
| <span class="line"><span style="color:#C678DD;"> while</span><span style="color:#ABB2BF;"> (</span><span style="color:#E5C07B;">dataSet</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">hasNext</span><span style="color:#ABB2BF;">()) {</span></span> |
| <span class="line"><span style="color:#C678DD;"> final</span><span style="color:#E5C07B;"> RowRecord</span><span style="color:#E06C75;"> record</span><span style="color:#56B6C2;"> =</span><span style="color:#E5C07B;"> dataSet</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">next</span><span style="color:#ABB2BF;">();</span></span> |
| <span class="line"><span style="color:#E5C07B;"> System</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">out</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">println</span><span style="color:#ABB2BF;">(record);</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;">}</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h5 id="scenario-2-subscribing-to-newly-added-tsfiles-for-scenarios-such-as-regular-data-backup" tabindex="-1"><a class="header-anchor" href="#scenario-2-subscribing-to-newly-added-tsfiles-for-scenarios-such-as-regular-data-backup"><span>Scenario-2: Subscribing to newly added TsFiles (for scenarios such as regular data backup)</span></a></h5><p>Prerequisite: The format of the topic to be consumed must be of the TsfileHandler type. For example:<code>create topic topic_all_tsfile with ('path'='root.**','format'='TsFileHandler')</code></p><div class="language-java line-numbers-mode" data-highlighter="shiki" data-ext="java" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code class="language-java"><span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> java.io.IOException</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> java.util.List</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> java.util.Properties</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.rpc.subscription.config.ConsumerConstant</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.rpc.subscription.config.TopicConstant</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"><span style="color:#C678DD;">import</span><span style="color:#E5C07B;"> org.apache.iotdb.session.subscription.payload.SubscriptionMessage</span><span style="color:#ABB2BF;">;</span></span> |
| <span class="line"></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#C678DD;">public</span><span style="color:#C678DD;"> class</span><span style="color:#E5C07B;"> DataConsumerExample</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#C678DD;"> public</span><span style="color:#C678DD;"> static</span><span style="color:#C678DD;"> void</span><span style="color:#61AFEF;"> main</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">String</span><span style="color:#ABB2BF;">[] </span><span style="color:#E06C75;font-style:italic;">args</span><span style="color:#ABB2BF;">)</span><span style="color:#C678DD;"> throws</span><span style="color:#E5C07B;"> IOException</span><span style="color:#ABB2BF;"> {</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"> // 1. create a pull consumer, the subscription is automatically cancelled when the logic in the try resources is completed</span></span> |
| <span class="line"><span style="color:#E5C07B;"> Properties</span><span style="color:#E06C75;"> consumerConfig</span><span style="color:#56B6C2;"> =</span><span style="color:#C678DD;"> new</span><span style="color:#61AFEF;"> Properties</span><span style="color:#ABB2BF;">();</span></span> |
| <span class="line"><span style="color:#E5C07B;"> consumerConfig</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">put</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">ConsumerConstant</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">CONSUMER_ID_KEY</span><span style="color:#ABB2BF;">, </span><span style="color:#98C379;">"c1"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> consumerConfig</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">put</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">ConsumerConstant</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">CONSUMER_GROUP_ID_KEY</span><span style="color:#ABB2BF;">, </span><span style="color:#98C379;">"cg1"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> consumerConfig</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">put</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">ConsumerConstant</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">USERNAME_KEY</span><span style="color:#ABB2BF;">, </span><span style="color:#98C379;">"root"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> consumerConfig</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">put</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">ConsumerConstant</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">PASSWORD_KEY</span><span style="color:#ABB2BF;">, </span><span style="color:#98C379;">"root"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#E5C07B;"> consumerConfig</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">put</span><span style="color:#ABB2BF;">(</span><span style="color:#E5C07B;">ConsumerConstant</span><span style="color:#ABB2BF;">.</span><span style="color:#E5C07B;">FILE_SAVE_DIR_KEY</span><span style="color:#ABB2BF;">, </span><span style="color:#98C379;">"/Users/iotdb/Downloads"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#C678DD;"> try</span><span style="color:#ABB2BF;"> (</span><span style="color:#E5C07B;">SubscriptionPullConsumer</span><span style="color:#E06C75;"> pullConsumer</span><span style="color:#56B6C2;"> =</span><span style="color:#C678DD;"> new</span><span style="color:#61AFEF;"> SubscriptionPullConsumer</span><span style="color:#ABB2BF;">(consumerConfig)) {</span></span> |
| <span class="line"><span style="color:#E5C07B;"> pullConsumer</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">open</span><span style="color:#ABB2BF;">();</span></span> |
| <span class="line"><span style="color:#E5C07B;"> pullConsumer</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">subscribe</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"topic_all_tsfile"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#C678DD;"> while</span><span style="color:#ABB2BF;"> (</span><span style="color:#D19A66;">true</span><span style="color:#ABB2BF;">) {</span></span> |
| <span class="line"><span style="color:#E5C07B;"> List</span><span style="color:#ABB2BF;"><</span><span style="color:#E5C07B;">SubscriptionMessage</span><span style="color:#ABB2BF;">> </span><span style="color:#E06C75;">messages</span><span style="color:#56B6C2;"> =</span><span style="color:#E5C07B;"> pullConsumer</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">poll</span><span style="color:#ABB2BF;">(</span><span style="color:#D19A66;">10000</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#C678DD;"> for</span><span style="color:#ABB2BF;"> (</span><span style="color:#C678DD;">final</span><span style="color:#E5C07B;"> SubscriptionMessage</span><span style="color:#E06C75;"> message</span><span style="color:#C678DD;"> :</span><span style="color:#ABB2BF;"> messages) {</span></span> |
| <span class="line"><span style="color:#E5C07B;"> message</span><span style="color:#ABB2BF;">.</span><span style="color:#61AFEF;">getTsFileHandler</span><span style="color:#ABB2BF;">().</span><span style="color:#61AFEF;">copyFile</span><span style="color:#ABB2BF;">(</span><span style="color:#98C379;">"/Users/iotdb/Downloads/1.tsfile"</span><span style="color:#ABB2BF;">);</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;"> }</span></span> |
| <span class="line"><span style="color:#ABB2BF;">}</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h2 id="_3-java-native-api-description" tabindex="-1"><a class="header-anchor" href="#_3-java-native-api-description"><span>3 Java Native API Description</span></a></h2><h3 id="_3-1-parameter-list" tabindex="-1"><a class="header-anchor" href="#_3-1-parameter-list"><span>3.1 Parameter List</span></a></h3><p>The consumer-related parameters can be set through the Properties parameter object. The specific parameters are as follows:</p><h4 id="subscriptionconsumer" tabindex="-1"><a class="header-anchor" href="#subscriptionconsumer"><span>SubscriptionConsumer</span></a></h4><table><thead><tr><th style="text-align:left;"><strong>Parameter</strong></th><th style="text-align:left;"><strong>required or optional with default</strong></th><th style="text-align:left;"><strong>Parameter Meaning</strong></th></tr></thead><tbody><tr><td style="text-align:left;">host</td><td style="text-align:left;">optional: 127.0.0.1</td><td style="text-align:left;"><code>String</code>: The RPC host of a DataNode in IoTDB</td></tr><tr><td style="text-align:left;">port</td><td style="text-align:left;">optional: 6667</td><td style="text-align:left;"><code>Integer</code>: The RPC port of a DataNode in IoTDB</td></tr><tr><td style="text-align:left;">node-urls</td><td style="text-align:left;">optional: 127.0.0.1:6667</td><td style="text-align:left;"><code>List<String></code>: The RPC addresses of all DataNodes in IoTDB, which can be multiple; either host:port or node-urls can be filled. If both host:port and node-urls are filled, the <strong>union</strong> of host:port and node-urls will be taken to form a new node-urls for application</td></tr><tr><td style="text-align:left;">username</td><td style="text-align:left;">optional: root</td><td style="text-align:left;"><code>String</code>: The username of the DataNode in IoTDB</td></tr><tr><td style="text-align:left;">password</td><td style="text-align:left;">optional: root</td><td style="text-align:left;"><code>String</code>: The password of the DataNode in IoTDB</td></tr><tr><td style="text-align:left;">groupId</td><td style="text-align:left;">optional</td><td style="text-align:left;"><code>String</code>: consumer group id,if not specified, it will be randomly assigned (a new consumer group),ensuring that the consumer group id of different consumer groups are all different</td></tr><tr><td style="text-align:left;">consumerId</td><td style="text-align:left;">optional</td><td style="text-align:left;"><code>String</code>: consumer client id,if not specified, it will be randomly assigned,ensuring that each consumer client id in the same consumer group is different</td></tr><tr><td style="text-align:left;">heartbeatIntervalMs</td><td style="text-align:left;">optional: 30000 (min: 1000)</td><td style="text-align:left;"><code>Long</code>: The interval at which the consumer sends periodic heartbeat requests to the IoTDB DataNode</td></tr><tr><td style="text-align:left;">endpointsSyncIntervalMs</td><td style="text-align:left;">optional: 120000 (min: 5000)</td><td style="text-align:left;"><code>Long</code>: The interval at which the consumer detects the expansion or contraction of IoTDB cluster nodes and adjusts the subscription connection</td></tr><tr><td style="text-align:left;">fileSaveDir</td><td style="text-align:left;">optional: Paths.get(System.getProperty("user.dir"), "iotdb-subscription").toString()</td><td style="text-align:left;"><code>String</code>: The temporary directory path where the consumer stores the subscribed TsFile files</td></tr><tr><td style="text-align:left;">fileSaveFsync</td><td style="text-align:left;">optional: false</td><td style="text-align:left;"><code>Boolean</code>: Whether the consumer actively calls fsync during the subscription of TsFiles</td></tr></tbody></table><p>Special configurations in <code>SubscriptionPushConsumer</code> :</p><table><thead><tr><th style="text-align:left;"><strong>Parameter</strong></th><th style="text-align:left;"><strong>required or optional with default</strong></th><th style="text-align:left;"><strong>Parameter Meaning</strong></th></tr></thead><tbody><tr><td style="text-align:left;">ackStrategy</td><td style="text-align:left;">optional: <code>ACKStrategy.AFTER_CONSUME</code></td><td style="text-align:left;">The acknowledgment mechanism for consumption progress includes the following options: <code>ACKStrategy.BEFORE_CONSUME</code>(the consumer submits the consumption progress immediately upon receiving the data, before <code>onReceive</code> )<code>ACKStrategy.AFTER_CONSUME</code>(the consumer submits the consumption progress after consuming the data, after <code>onReceive</code> )</td></tr><tr><td style="text-align:left;">consumeListener</td><td style="text-align:left;">optional</td><td style="text-align:left;">The callback function for consuming data, which needs to implement the <code>ConsumeListener</code> interface, defining the processing logic for consuming <code>SessionDataSetsHandler</code> and <code>TsFileHandler</code> formatted data</td></tr><tr><td style="text-align:left;">autoPollIntervalMs</td><td style="text-align:left;">optional: 5000 (min: 500)</td><td style="text-align:left;">Long: The time interval at which the consumer automatically pulls data, in <strong>ms</strong></td></tr><tr><td style="text-align:left;">autoPollTimeoutMs</td><td style="text-align:left;">optional: 10000 (min: 1000)</td><td style="text-align:left;">Long: The timeout duration for the consumer to pull data each time, in <strong>ms</strong></td></tr></tbody></table><p>Special configurations in <code>SubscriptionPullConsumer</code> :</p><table><thead><tr><th style="text-align:left;"><strong>Parameter</strong></th><th style="text-align:left;"><strong>required or optional with default</strong></th><th style="text-align:left;"><strong>Parameter Meaning</strong></th></tr></thead><tbody><tr><td style="text-align:left;">autoCommit</td><td style="text-align:left;">optional: true</td><td style="text-align:left;">Boolean: Whether to automatically commit the consumption progress. If this parameter is set to false, the <code>commit</code> method needs to be called manually to submit the consumption progress</td></tr><tr><td style="text-align:left;">autoCommitInterval</td><td style="text-align:left;">optional: 5000 (min: 500)</td><td style="text-align:left;">Long: The time interval for automatically committing the consumption progress, in <strong>ms</strong> .This parameter only takes effect when the <code>autoCommit</code> parameter is set to true</td></tr></tbody></table><h3 id="_3-2-function-list" tabindex="-1"><a class="header-anchor" href="#_3-2-function-list"><span>3.2 Function List</span></a></h3><h4 id="data-subscription" tabindex="-1"><a class="header-anchor" href="#data-subscription"><span>Data subscription</span></a></h4><h5 id="subscriptionpullconsumer" tabindex="-1"><a class="header-anchor" href="#subscriptionpullconsumer"><span>SubscriptionPullConsumer</span></a></h5><table><thead><tr><th><strong>Function name</strong></th><th><strong>Description</strong></th><th><strong>Parameter</strong></th></tr></thead><tbody><tr><td><code>open()</code></td><td>Opens the consumer connection and starts message consumption. If <code>autoCommit</code> is enabled, it will start the automatic commit worker.</td><td>None</td></tr><tr><td><code>close()</code></td><td>Closes the consumer connection. If <code>autoCommit</code> is enabled, it will commit all uncommitted messages before closing.</td><td>None</td></tr><tr><td><code>poll(final Duration timeout)</code></td><td>Pulls messages with a specified timeout.</td><td><code>timeout</code> : The timeout duration.</td></tr><tr><td><code>poll(final long timeoutMs)</code></td><td>Pulls messages with a specified timeout in milliseconds.</td><td><code>timeoutMs</code> : The timeout duration in milliseconds.</td></tr><tr><td><code>poll(final Set<String> topicNames, final Duration timeout)</code></td><td>Pulls messages from specified topics with a specified timeout.</td><td><code>topicNames</code> : The set of topics to pull messages from. <code>timeout</code>: The timeout duration。</td></tr><tr><td><code>poll(final Set<String> topicNames, final long timeoutMs)</code></td><td>Pulls messages from specified topics with a specified timeout in milliseconds.</td><td><code>topicNames</code> : The set of topics to pull messages from.<code>timeoutMs</code>: The timeout duration in milliseconds.</td></tr><tr><td><code>commitSync(final SubscriptionMessage message)</code></td><td>Synchronously commits a single message.</td><td><code>message</code> : The message object to be committed.</td></tr><tr><td><code>commitSync(final Iterable<SubscriptionMessage> messages)</code></td><td>Synchronously commits multiple messages.</td><td><code>messages</code> : The collection of message objects to be committed.</td></tr><tr><td><code>commitAsync(final SubscriptionMessage message)</code></td><td>Asynchronously commits a single message.</td><td><code>message</code> : The message object to be committed.</td></tr><tr><td><code>commitAsync(final Iterable<SubscriptionMessage> messages)</code></td><td>Asynchronously commits multiple messages.</td><td><code>messages</code> : The collection of message objects to be committed.</td></tr><tr><td><code>commitAsync(final SubscriptionMessage message, final AsyncCommitCallback callback)</code></td><td>Asynchronously commits a single message with a specified callback.</td><td><code>message</code> : The message object to be committed. <code>callback</code> : The callback function to be executed after asynchronous commit.</td></tr><tr><td><code>commitAsync(final Iterable<SubscriptionMessage> messages, final AsyncCommitCallback callback)</code></td><td>Asynchronously commits multiple messages with a specified callback.</td><td><code>messages</code> : The collection of message objects to be committed.<code>callback</code> : The callback function to be executed after asynchronous commit.</td></tr></tbody></table><h5 id="subscriptionpushconsumer" tabindex="-1"><a class="header-anchor" href="#subscriptionpushconsumer"><span>SubscriptionPushConsumer</span></a></h5><table><thead><tr><th><strong>Function name</strong></th><th><strong>Description</strong></th><th><strong>Parameter</strong></th></tr></thead><tbody><tr><td><code>open()</code></td><td>Opens the consumer connection, starts message consumption, and submits the automatic polling worker.</td><td>None</td></tr><tr><td><code>close()</code></td><td>Closes the consumer connection and stops message consumption.</td><td>None</td></tr><tr><td><code>toString()</code></td><td>Returns the core configuration information of the consumer object.</td><td>None</td></tr><tr><td><code>coreReportMessage()</code></td><td>Obtains the key-value representation of the consumer's core configuration.</td><td>None</td></tr><tr><td><code>allReportMessage()</code></td><td>Obtains the key-value representation of all the consumer's configurations.</td><td>None</td></tr><tr><td><code>buildPushConsumer()</code></td><td>Builds a <code>SubscriptionPushConsumer</code> instance through the <code>Builder</code></td><td>None</td></tr><tr><td><code>ackStrategy(final AckStrategy ackStrategy)</code></td><td>Configures the message acknowledgment strategy for the consumer.</td><td><code>ackStrategy</code>: The specified message acknowledgment strategy.</td></tr><tr><td><code>consumeListener(final ConsumeListener consumeListener)</code></td><td>Configures the message consumption logic for the consumer.</td><td><code>consumeListener</code>: The processing logic when the consumer receives messages.</td></tr><tr><td><code>autoPollIntervalMs(final long autoPollIntervalMs)</code></td><td>Configures the interval for automatic polling.</td><td><code>autoPollIntervalMs</code> : The interval for automatic polling, in milliseconds.</td></tr><tr><td><code>autoPollTimeoutMs(final long autoPollTimeoutMs)</code></td><td>Configures the timeout for automatic polling.间。</td><td><code>autoPollTimeoutMs</code>: The timeout for automatic polling, in milliseconds.</td></tr></tbody></table>`,27))])}const m=l(d,[["render",B]]),g=JSON.parse('{"path":"/UserGuide/V1.3.x/API/Programming-Data-Subscription.html","title":"Data subscription API","lang":"en-US","frontmatter":{"description":"Data subscription API IoTDB provides powerful data subscription functionality, allowing users to access newly added data from IoTDB in real-time through subscription APIs. For d...","head":[["script",{"type":"application/ld+json"},"{\\"@context\\":\\"https://schema.org\\",\\"@type\\":\\"Article\\",\\"headline\\":\\"Data subscription API\\",\\"image\\":[\\"\\"],\\"dateModified\\":\\"2026-01-12T10:25:39.000Z\\",\\"author\\":[]}"],["meta",{"property":"og:url","content":"https://iotdb.apache.org/UserGuide/V1.3.x/API/Programming-Data-Subscription.html"}],["meta",{"property":"og:site_name","content":"IoTDB Website"}],["meta",{"property":"og:title","content":"Data subscription API"}],["meta",{"property":"og:description","content":"Data subscription API IoTDB provides powerful data subscription functionality, allowing users to access newly added data from IoTDB in real-time through subscription APIs. For d..."}],["meta",{"property":"og:type","content":"article"}],["meta",{"property":"og:locale","content":"en-US"}],["meta",{"property":"og:locale:alternate","content":"zh-CN"}],["meta",{"property":"og:updated_time","content":"2026-01-12T10:25:39.000Z"}],["meta",{"property":"article:modified_time","content":"2026-01-12T10:25:39.000Z"}],["link",{"rel":"alternate","hreflang":"zh-cn","href":"https://iotdb.apache.org/zh/UserGuide/V1.3.x/API/Programming-Data-Subscription.html"}]]},"git":{"createdTime":1768213539000,"updatedTime":1768213539000,"contributors":[{"name":"CritasWang","username":"CritasWang","email":"critas@outlook.com","commits":1,"url":"https://github.com/CritasWang"}]},"readingTime":{"minutes":5.25,"words":1575},"filePathRelative":"UserGuide/V1.3.x/API/Programming-Data-Subscription.md","autoDesc":true}');export{m as comp,g as data}; |