| import{_ as a,a as l,b as i}from"./pipe2-DwZV5LbE.js";import{_ as o,a as r,b as p}from"./sync_en_04-D3QXD4YV.js";import{_ as c,c as d,b as t,d as h,e as s,a as y,f as u,r as m,o as f}from"./app-C8175JBb.js";const g={};function B(b,e){const n=m("RouteLink");return f(),d("div",null,[e[3]||(e[3]=t('<h1 id="data-sync" tabindex="-1"><a class="header-anchor" href="#data-sync"><span>Data Sync</span></a></h1><p>Data synchronization is a typical requirement in industrial Internet of Things (IoT). Through data synchronization mechanisms, it is possible to achieve data sharing between IoTDB, and to establish a complete data link to meet the needs for internal and external network data interconnectivity, edge-cloud synchronization, data migration, and data backup.</p><h2 id="_1-function-overview" tabindex="-1"><a class="header-anchor" href="#_1-function-overview"><span>1. Function Overview</span></a></h2><h3 id="_1-1-data-synchronization" tabindex="-1"><a class="header-anchor" href="#_1-1-data-synchronization"><span>1.1 Data Synchronization</span></a></h3><p>A data synchronization task consists of three stages:</p><figure><img src="'+a+'" alt="" tabindex="0" loading="lazy"><figcaption></figcaption></figure><ul><li>Source Stage:This part is used to extract data from the source IoTDB, defined in the source section of the SQL statement.</li><li>Process Stage:This part is used to process the data extracted from the source IoTDB, defined in the processor section of the SQL statement.</li><li>Sink Stage:This part is used to send data to the target IoTDB, defined in the sink section of the SQL statement.</li></ul><p>By declaratively configuring the specific content of the three parts through SQL statements, flexible data synchronization capabilities can be achieved. Currently, data synchronization supports the synchronization of the following information, and you can select the synchronization scope when creating a synchronization task (the default is data.insert, which means synchronizing newly written data):</p><table style="text-align:left;"><tbody><tr><th>Synchronization Scope</th><th>Synchronization Content </th><th>Description</th></tr><tr><td colspan="2">all</td><td>All scopes</td></tr><tr><td rowspan="2">data(Data)</td><td>insert</td><td>Synchronize newly written data</td></tr><tr><td>delete</td><td>Synchronize deleted data</td></tr><tr><td rowspan="3">schema</td><td>database</td><td>Synchronize database creation, modification or deletion operations</td></tr><tr><td>timeseries</td><td>Synchronize the definition and attributes of time series</td></tr><tr><td>TTL</td><td>Synchronize the data retention time</td></tr><tr><td>auth</td><td>-</td><td>Synchronize user permissions and access control</td></tr></tbody></table><h3 id="_1-2-functional-limitations-and-instructions" tabindex="-1"><a class="header-anchor" href="#_1-2-functional-limitations-and-instructions"><span>1.2 Functional limitations and instructions</span></a></h3><p>The schema and auth synchronization functions have the following limitations:</p><ul><li><p>When using schema synchronization, it is required that the consensus protocol for <code>Schema region</code> and <code>ConfigNode</code> must be the default ratis protocol. This means that the <code>iotdb-system.properties</code> configuration file should contain the settings <code>config_node_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus</code> and <code>schema_region_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus</code>. If these are not specified, the default ratis protocol is used.</p></li><li><p>To prevent potential conflicts, please disable the automatic creation of schema on the receiving end when enabling schema synchronization. This can be done by setting the <code>enable_auto_create_schema</code> configuration in the <code>iotdb-system.properties</code> file to false.</p></li><li><p>When schema synchronization is enabled, the use of custom plugins is not supported.</p></li><li><p>During data synchronization tasks, please avoid performing any deletion operations to prevent inconsistent states between the two ends.</p></li></ul><h2 id="_2-usage-instructions" tabindex="-1"><a class="header-anchor" href="#_2-usage-instructions"><span>2. Usage Instructions</span></a></h2><p>Data synchronization tasks have three states: RUNNING, STOPPED, and DROPPED. The task state transitions are shown in the following diagram:</p><figure><img src="'+o+`" alt="" tabindex="0" loading="lazy"><figcaption></figcaption></figure><p>After creation, the task will start directly, and when the task stops abnormally, the system will automatically attempt to restart the task.</p><p>Provide the following SQL statements for state management of synchronization tasks.</p><h3 id="_2-1-create-task" tabindex="-1"><a class="header-anchor" href="#_2-1-create-task"><span>2.1 Create Task</span></a></h3><p>Use the <code>CREATE PIPE</code> statement to create a data synchronization task. The <code>PipeId</code> and <code>sink</code> attributes are required, while <code>source</code> and <code>processor</code> are optional. When entering the SQL, note that the order of the <code>SOURCE</code> and <code>SINK</code> plugins cannot be swapped.</p><p>The SQL example is as follows:</p><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" data-title="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code><span class="line"><span style="color:#C678DD;">CREATE</span><span style="color:#ABB2BF;"> PIPE </span><span style="color:#E06C75;">[IF NOT EXISTS]</span><span style="color:#56B6C2;"> <</span><span style="color:#ABB2BF;">PipeId</span><span style="color:#56B6C2;">></span><span style="color:#7F848E;font-style:italic;"> -- PipeId is the name that uniquely identifies the task. </span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;">-- Data extraction plugin, optional plugin</span></span> |
| <span class="line"><span style="color:#C678DD;">WITH</span><span style="color:#ABB2BF;"> SOURCE (</span></span> |
| <span class="line"><span style="color:#E06C75;"> [<parameter> = <value>,]</span><span style="color:#ABB2BF;">,</span></span> |
| <span class="line"><span style="color:#ABB2BF;">)</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;">-- Data processing plugin, optional plugin</span></span> |
| <span class="line"><span style="color:#C678DD;">WITH</span><span style="color:#ABB2BF;"> PROCESSOR (</span></span> |
| <span class="line"><span style="color:#E06C75;"> [<parameter> = <value>,]</span><span style="color:#ABB2BF;">,</span></span> |
| <span class="line"><span style="color:#ABB2BF;">)</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;">-- Data connection plugin, required plugin</span></span> |
| <span class="line"><span style="color:#C678DD;">WITH</span><span style="color:#ABB2BF;"> SINK (</span></span> |
| <span class="line"><span style="color:#E06C75;"> [<parameter> = <value>,]</span><span style="color:#ABB2BF;">,</span></span> |
| <span class="line"><span style="color:#ABB2BF;">)</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p><strong>IF NOT EXISTS semantics</strong>: Used in creation operations to ensure that the create command is executed when the specified Pipe does not exist, preventing errors caused by attempting to create an existing Pipe.</p><h3 id="_2-2-start-task" tabindex="-1"><a class="header-anchor" href="#_2-2-start-task"><span>2.2 Start Task</span></a></h3><p>Start processing data:</p><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" data-title="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code><span class="line"><span style="color:#C678DD;">START</span><span style="color:#ABB2BF;"> PIPE</span><span style="color:#56B6C2;"><</span><span style="color:#ABB2BF;">PipeId</span><span style="color:#56B6C2;">></span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div></div></div><h3 id="_2-3-stop-task" tabindex="-1"><a class="header-anchor" href="#_2-3-stop-task"><span>2.3 Stop Task</span></a></h3><p>Stop processing data:</p><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" data-title="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code><span class="line"><span style="color:#C678DD;">STOP</span><span style="color:#ABB2BF;"> PIPE </span><span style="color:#56B6C2;"><</span><span style="color:#ABB2BF;">PipeId</span><span style="color:#56B6C2;">></span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div></div></div><h3 id="_2-4-delete-task" tabindex="-1"><a class="header-anchor" href="#_2-4-delete-task"><span>2.4 Delete Task</span></a></h3><p>Deletes the specified task:</p><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" data-title="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code><span class="line"><span style="color:#C678DD;">DROP</span><span style="color:#ABB2BF;"> PIPE </span><span style="color:#E06C75;">[IF EXISTS]</span><span style="color:#56B6C2;"> <</span><span style="color:#ABB2BF;">PipeId</span><span style="color:#56B6C2;">></span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div></div></div><p><strong>IF EXISTS semantics</strong>: Used in deletion operations to ensure that when a specified Pipe exists, the delete command is executed to prevent errors caused by attempting to delete non-existent Pipes.</p><p>Deleting a task does not require stopping the synchronization task first.</p><h3 id="_2-5-view-task" tabindex="-1"><a class="header-anchor" href="#_2-5-view-task"><span>2.5 View Task</span></a></h3><p>View all tasks:</p><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" data-title="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code><span class="line"><span style="color:#ABB2BF;">SHOW PIPES</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div></div></div><p>To view a specified task:</p><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" data-title="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code><span class="line"><span style="color:#ABB2BF;">SHOW PIPE </span><span style="color:#56B6C2;"><</span><span style="color:#ABB2BF;">PipeId</span><span style="color:#56B6C2;">></span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div></div></div><p>Example of the show pipes result for a pipe:</p><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" data-title="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code><span class="line"><span style="color:#ABB2BF;">+</span><span style="color:#7F848E;font-style:italic;">--------------------------------+-----------------------+-------+----------+-------------+-----------------------------------------------------------+----------------+-------------------+-------------------------+</span></span> |
| <span class="line"><span style="color:#ABB2BF;">| ID| CreationTime| </span><span style="color:#C678DD;">State</span><span style="color:#ABB2BF;">|PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds|</span></span> |
| <span class="line"><span style="color:#ABB2BF;">+</span><span style="color:#7F848E;font-style:italic;">--------------------------------+-----------------------+-------+----------+-------------+-----------------------------------------------------------+----------------+-------------------+-------------------------+</span></span> |
| <span class="line"><span style="color:#ABB2BF;">|59abf95db892428b9d01c5fa318014ea|</span><span style="color:#D19A66;">2024</span><span style="color:#ABB2BF;">-</span><span style="color:#D19A66;">06</span><span style="color:#ABB2BF;">-17T14:</span><span style="color:#D19A66;">03</span><span style="color:#ABB2BF;">:</span><span style="color:#D19A66;">44</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">189</span><span style="color:#ABB2BF;">|RUNNING| {}| {}|{sink</span><span style="color:#56B6C2;">=</span><span style="color:#ABB2BF;">iotdb-thrift-sink, </span><span style="color:#D19A66;">sink</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">ip</span><span style="color:#56B6C2;">=</span><span style="color:#D19A66;">127</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">0</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">0</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">1</span><span style="color:#ABB2BF;">, </span><span style="color:#D19A66;">sink</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">port</span><span style="color:#56B6C2;">=</span><span style="color:#D19A66;">6668</span><span style="color:#ABB2BF;">}| | </span><span style="color:#D19A66;">128</span><span style="color:#ABB2BF;">| </span><span style="color:#D19A66;">1</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">03</span><span style="color:#ABB2BF;">|</span></span> |
| <span class="line"><span style="color:#ABB2BF;">+</span><span style="color:#7F848E;font-style:italic;">--------------------------------+-----------------------+-------+----------+-------------+-----------------------------------------------------------+----------------+-------------------+-------------------------+</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>The meanings of each column are as follows:</p><ul><li><strong>ID</strong>:The unique identifier for the synchronization task</li><li><strong>CreationTime</strong>:The time when the synchronization task was created</li><li><strong>State</strong>:The state of the synchronization task</li><li><strong>PipeSource</strong>:The source of the synchronized data stream</li><li><strong>PipeProcessor</strong>:The processing logic of the synchronized data stream during transmission</li><li><strong>PipeSink</strong>:The destination of the synchronized data stream</li><li><strong>ExceptionMessage</strong>:Displays the exception information of the synchronization task</li><li><strong>RemainingEventCount (Statistics with Delay)</strong>: The number of remaining events, which is the total count of all events in the current data synchronization task, including data and schema synchronization events, as well as system and user-defined events.</li><li><strong>EstimatedRemainingSeconds (Statistics with Delay)</strong>: The estimated remaining time, based on the current number of events and the rate at the pipe, to complete the transfer.</li></ul><h3 id="_2-6-synchronization-plugins" tabindex="-1"><a class="header-anchor" href="#_2-6-synchronization-plugins"><span>2.6 Synchronization Plugins</span></a></h3><p>To make the overall architecture more flexible to match different synchronization scenario requirements, we support plugin assembly within the synchronization task framework. The system comes with some pre-installed common plugins that you can use directly. At the same time, you can also customize processor plugins and Sink plugins, and load them into the IoTDB system for use. You can view the plugins in the system (including custom and built-in plugins) with the following statement:</p><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" data-title="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code><span class="line"><span style="color:#ABB2BF;">SHOW PIPEPLUGINS</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div></div></div><p>The return result is as follows (version 1.3.2):</p><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" data-title="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code><span class="line"><span style="color:#ABB2BF;">IoTDB</span><span style="color:#56B6C2;">></span><span style="color:#ABB2BF;"> SHOW PIPEPLUGINS</span></span> |
| <span class="line"><span style="color:#ABB2BF;">+</span><span style="color:#7F848E;font-style:italic;">------------------------------+----------+--------------------------------------------------------------------------------------------------+----------------------------------------------------+</span></span> |
| <span class="line"><span style="color:#ABB2BF;">| PluginName|PluginType| ClassName| PluginJar|</span></span> |
| <span class="line"><span style="color:#ABB2BF;">+</span><span style="color:#7F848E;font-style:italic;">------------------------------+----------+--------------------------------------------------------------------------------------------------+----------------------------------------------------+</span></span> |
| <span class="line"><span style="color:#ABB2BF;">| DO-NOTHING-PROCESSOR| Builtin| </span><span style="color:#D19A66;">org</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">apache</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">iotdb</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">commons</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">pipe</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">plugin</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">builtin</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">processor</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">donothing</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">DoNothingProcessor</span><span style="color:#ABB2BF;">| |</span></span> |
| <span class="line"><span style="color:#ABB2BF;">| DO-NOTHING-SINK| Builtin| </span><span style="color:#D19A66;">org</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">apache</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">iotdb</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">commons</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">pipe</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">plugin</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">builtin</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">connector</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">donothing</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">DoNothingConnector</span><span style="color:#ABB2BF;">| |</span></span> |
| <span class="line"><span style="color:#ABB2BF;">| IOTDB-SOURCE| Builtin| </span><span style="color:#D19A66;">org</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">apache</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">iotdb</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">commons</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">pipe</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">plugin</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">builtin</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">extractor</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">iotdb</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">IoTDBExtractor</span><span style="color:#ABB2BF;">| |</span></span> |
| <span class="line"><span style="color:#ABB2BF;">| IOTDB-THRIFT-SINK| Builtin| </span><span style="color:#D19A66;">org</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">apache</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">iotdb</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">commons</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">pipe</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">plugin</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">builtin</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">connector</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">iotdb</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">thrift</span><span style="color:#ABB2BF;">.IoTDBThriftConnector| |</span></span> |
| <span class="line"><span style="color:#ABB2BF;">| IOTDB-THRIFT-</span><span style="color:#C678DD;">SSL</span><span style="color:#ABB2BF;">-SINK| Builtin| </span><span style="color:#D19A66;">org</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">apache</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">iotdb</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">commons</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">pipe</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">plugin</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">builtin</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">connector</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">iotdb</span><span style="color:#ABB2BF;">.</span><span style="color:#D19A66;">thrift</span><span style="color:#ABB2BF;">.IoTDBThriftSslConnector| |</span></span> |
| <span class="line"><span style="color:#ABB2BF;">+</span><span style="color:#7F848E;font-style:italic;">------------------------------+----------+--------------------------------------------------------------------------------------------------+----------------------------------------------------+</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>Detailed introduction of pre-installed plugins is as follows (for detailed parameters of each plugin, please refer to the <a href="#reference-parameter-description">Parameter Description</a> section):</p><table style="text-align:left;"><tbody><tr><th>Type</th><th>Custom Plugin</th><th>Plugin Name</th><th>Description</th><th>Applicable Version</th></tr><tr><td>source plugin</td><td>Not Supported</td><td>iotdb-source</td><td>The default extractor plugin, used to extract historical or real-time data from IoTDB</td><td>1.2.x</td></tr><tr><td>processor plugin</td><td>Supported</td><td>do-nothing-processor</td><td>The default processor plugin, which does not process the incoming data</td><td>1.2.x</td></tr><tr><td rowspan="3">sink plugin</td><td rowspan="3">Supported</td><td>do-nothing-sink</td><td>Does not process the data that is sent out</td><td>1.2.x</td></tr><tr><td>iotdb-thrift-sink</td><td>The default sink plugin ( V1.3.1+ ), used for data transfer between IoTDB ( V1.2.0+ ) and IoTDB( V1.2.0+ ) . It uses the Thrift RPC framework to transfer data, with a multi-threaded async non-blocking IO model, high transfer performance, especially suitable for scenarios where the target end is distributed</td><td>1.2.x</td></tr><tr><td>iotdb-thrift-ssl-sink</td><td>Used for data transfer between IoTDB ( V1.3.1+ ) and IoTDB ( V1.2.0+ ). It uses the Thrift RPC framework to transfer data, with a single-threaded sync blocking IO model, suitable for scenarios with higher security requirements</td><td>1.3.1+</td></tr></tbody></table>`,49)),h("p",null,[e[1]||(e[1]=s("For importing custom plugins, please refer to the ")),y(n,{to:"/UserGuide/latest/User-Manual/Streaming_apache.html#custom-stream-processing-plugin-management"},{default:u(()=>e[0]||(e[0]=[s("Stream Processing")])),_:1}),e[2]||(e[2]=s(" section."))]),e[4]||(e[4]=t('<h2 id="_3-use-examples" tabindex="-1"><a class="header-anchor" href="#_3-use-examples"><span>3. Use examples</span></a></h2><h3 id="_3-1-full-data-synchronisation" tabindex="-1"><a class="header-anchor" href="#_3-1-full-data-synchronisation"><span>3.1 Full data synchronisation</span></a></h3><p>This example is used to demonstrate the synchronisation of all data from one IoTDB to another IoTDB with the data link as shown below:</p><figure><img src="'+l+`" alt="" tabindex="0" loading="lazy"><figcaption></figcaption></figure><p>In this example, we can create a synchronization task named A2B to synchronize the full data from A IoTDB to B IoTDB. The iotdb-thrift-sink plugin (built-in plugin) for the sink is required. The URL of the data service port of the DataNode node on the target IoTDB needs to be configured through node-urls, as shown in the following example statement:</p><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" data-title="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code><span class="line"><span style="color:#C678DD;">create</span><span style="color:#ABB2BF;"> pipe A2B</span></span> |
| <span class="line"><span style="color:#C678DD;">with</span><span style="color:#ABB2BF;"> sink (</span></span> |
| <span class="line"><span style="color:#98C379;"> 'sink'</span><span style="color:#56B6C2;">=</span><span style="color:#98C379;">'iotdb-thrift-sink'</span><span style="color:#ABB2BF;">,</span></span> |
| <span class="line"><span style="color:#98C379;"> 'node-urls'</span><span style="color:#56B6C2;"> =</span><span style="color:#98C379;"> '127.0.0.1:6668'</span><span style="color:#ABB2BF;">, </span><span style="color:#7F848E;font-style:italic;">-- The URL of the data service port of the DataNode node on the target IoTDB</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="_3-2-partial-data-synchronization" tabindex="-1"><a class="header-anchor" href="#_3-2-partial-data-synchronization"><span>3.2 Partial data synchronization</span></a></h3><p>This example is used to demonstrate the synchronisation of data from a certain historical time range (8:00pm 23 August 2023 to 8:00pm 23 October 2023) to another IoTDB, the data link is shown below:</p><figure><img src="`+i+`" alt="" tabindex="0" loading="lazy"><figcaption></figcaption></figure><p>In this example, we can create a synchronization task named A2B. First, we need to define the range of data to be transferred in the source. Since the data being transferred is historical data (historical data refers to data that existed before the creation of the synchronization task), we need to configure the start-time and end-time of the data and the transfer mode mode. The URL of the data service port of the DataNode node on the target IoTDB needs to be configured through node-urls.</p><p>The detailed statements are as follows:</p><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" data-title="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code><span class="line"><span style="color:#C678DD;">create</span><span style="color:#ABB2BF;"> pipe A2B</span></span> |
| <span class="line"><span style="color:#C678DD;">WITH</span><span style="color:#ABB2BF;"> SOURCE (</span></span> |
| <span class="line"><span style="color:#98C379;"> 'source'</span><span style="color:#56B6C2;">=</span><span style="color:#98C379;"> 'iotdb-source'</span><span style="color:#ABB2BF;">,</span></span> |
| <span class="line"><span style="color:#98C379;"> 'realtime.mode'</span><span style="color:#56B6C2;"> =</span><span style="color:#98C379;"> 'stream'</span><span style="color:#7F848E;font-style:italic;"> -- The extraction mode for newly inserted data (after pipe creation)</span></span> |
| <span class="line"><span style="color:#98C379;"> 'start-time'</span><span style="color:#56B6C2;"> =</span><span style="color:#98C379;"> '2023.08.23T08:00:00+00:00'</span><span style="color:#ABB2BF;">, </span><span style="color:#7F848E;font-style:italic;">-- The start event time for synchronizing all data, including start-time</span></span> |
| <span class="line"><span style="color:#98C379;"> 'end-time'</span><span style="color:#56B6C2;"> =</span><span style="color:#98C379;"> '2023.10.23T08:00:00+00:00'</span><span style="color:#7F848E;font-style:italic;"> -- The end event time for synchronizing all data, including end-time</span></span> |
| <span class="line"><span style="color:#ABB2BF;">) </span></span> |
| <span class="line"><span style="color:#C678DD;">with</span><span style="color:#ABB2BF;"> SINK (</span></span> |
| <span class="line"><span style="color:#98C379;"> 'sink'</span><span style="color:#56B6C2;">=</span><span style="color:#98C379;">'iotdb-thrift-async-sink'</span><span style="color:#ABB2BF;">,</span></span> |
| <span class="line"><span style="color:#98C379;"> 'node-urls'</span><span style="color:#56B6C2;"> =</span><span style="color:#98C379;"> '127.0.0.1:6668'</span><span style="color:#ABB2BF;">, </span><span style="color:#7F848E;font-style:italic;">-- The URL of the data service port of the DataNode node on the target IoTDB</span></span> |
| <span class="line"><span style="color:#ABB2BF;">)</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="_3-3-edge-cloud-data-transfer" tabindex="-1"><a class="header-anchor" href="#_3-3-edge-cloud-data-transfer"><span>3.3 Edge-cloud data transfer</span></a></h3><p>This example is used to demonstrate the scenario where data from multiple IoTDB is transferred to the cloud, with data from clusters B, C, and D all synchronized to cluster A, as shown in the figure below:</p><figure><img src="`+r+`" alt="" tabindex="0" loading="lazy"><figcaption></figcaption></figure><p>In this example, to synchronize the data from clusters B, C, and D to A, the pipe between BA, CA, and DA needs to configure the <code>path</code> to limit the range, and to keep the edge and cloud data consistent, the pipe needs to be configured with <code>inclusion=all</code> to synchronize full data and metadata. The detailed statement is as follows:</p><p>On B IoTDB, execute the following statement to synchronize data from B to A:</p><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" data-title="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code><span class="line"><span style="color:#C678DD;">create</span><span style="color:#ABB2BF;"> pipe BA</span></span> |
| <span class="line"><span style="color:#C678DD;">with</span><span style="color:#ABB2BF;"> source (</span></span> |
| <span class="line"><span style="color:#98C379;"> 'inclusion'</span><span style="color:#56B6C2;">=</span><span style="color:#98C379;">'all'</span><span style="color:#ABB2BF;">, </span><span style="color:#7F848E;font-style:italic;">-- Indicates synchronization of full data, schema , and auth</span></span> |
| <span class="line"><span style="color:#98C379;"> 'path'</span><span style="color:#56B6C2;">=</span><span style="color:#98C379;">'root.db.**'</span><span style="color:#ABB2BF;">, </span><span style="color:#7F848E;font-style:italic;">-- Limit the range</span></span> |
| <span class="line"><span style="color:#ABB2BF;">)</span></span> |
| <span class="line"><span style="color:#C678DD;">with</span><span style="color:#ABB2BF;"> sink (</span></span> |
| <span class="line"><span style="color:#98C379;"> 'sink'</span><span style="color:#56B6C2;">=</span><span style="color:#98C379;">'iotdb-thrift-sink'</span><span style="color:#ABB2BF;">,</span></span> |
| <span class="line"><span style="color:#98C379;"> 'node-urls'</span><span style="color:#56B6C2;"> =</span><span style="color:#98C379;"> '127.0.0.1:6668'</span><span style="color:#ABB2BF;">, </span><span style="color:#7F848E;font-style:italic;">-- The URL of the data service port of the DataNode node on the target IoTDB</span></span> |
| <span class="line"><span style="color:#ABB2BF;">)</span></span> |
| <span class="line"><span style="color:#ABB2BF;">)</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>On C IoTDB, execute the following statement to synchronize data from C to A:</p><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" data-title="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code><span class="line"><span style="color:#C678DD;">create</span><span style="color:#ABB2BF;"> pipe CA</span></span> |
| <span class="line"><span style="color:#C678DD;">with</span><span style="color:#ABB2BF;"> source (</span></span> |
| <span class="line"><span style="color:#98C379;"> 'inclusion'</span><span style="color:#56B6C2;">=</span><span style="color:#98C379;">'all'</span><span style="color:#ABB2BF;">, </span><span style="color:#7F848E;font-style:italic;">-- Indicates synchronization of full data, schema , and auth</span></span> |
| <span class="line"><span style="color:#98C379;"> 'path'</span><span style="color:#56B6C2;">=</span><span style="color:#98C379;">'root.db.**'</span><span style="color:#ABB2BF;">, </span><span style="color:#7F848E;font-style:italic;">-- Limit the range</span></span> |
| <span class="line"><span style="color:#C678DD;">with</span><span style="color:#ABB2BF;"> sink (</span></span> |
| <span class="line"><span style="color:#98C379;"> 'sink'</span><span style="color:#56B6C2;">=</span><span style="color:#98C379;">'iotdb-thrift-sink'</span><span style="color:#ABB2BF;">,</span></span> |
| <span class="line"><span style="color:#98C379;"> 'node-urls'</span><span style="color:#56B6C2;"> =</span><span style="color:#98C379;"> '127.0.0.1:6668'</span><span style="color:#ABB2BF;">, </span><span style="color:#7F848E;font-style:italic;">-- The URL of the data service port of the DataNode node on the target IoTDB</span></span> |
| <span class="line"><span style="color:#ABB2BF;">)</span></span> |
| <span class="line"><span style="color:#ABB2BF;">)</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>On D IoTDB, execute the following statement to synchronize data from D to A:</p><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" data-title="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code><span class="line"><span style="color:#C678DD;">create</span><span style="color:#ABB2BF;"> pipe DA</span></span> |
| <span class="line"><span style="color:#C678DD;">with</span><span style="color:#ABB2BF;"> source (</span></span> |
| <span class="line"><span style="color:#98C379;"> 'inclusion'</span><span style="color:#56B6C2;">=</span><span style="color:#98C379;">'all'</span><span style="color:#ABB2BF;">, </span><span style="color:#7F848E;font-style:italic;">-- Indicates synchronization of full data, schema , and auth</span></span> |
| <span class="line"><span style="color:#98C379;"> 'path'</span><span style="color:#56B6C2;">=</span><span style="color:#98C379;">'root.db.**'</span><span style="color:#ABB2BF;">, </span><span style="color:#7F848E;font-style:italic;">-- Limit the range</span></span> |
| <span class="line"><span style="color:#ABB2BF;">)</span></span> |
| <span class="line"><span style="color:#C678DD;">with</span><span style="color:#ABB2BF;"> sink (</span></span> |
| <span class="line"><span style="color:#98C379;"> 'sink'</span><span style="color:#56B6C2;">=</span><span style="color:#98C379;">'iotdb-thrift-sink'</span><span style="color:#ABB2BF;">,</span></span> |
| <span class="line"><span style="color:#98C379;"> 'node-urls'</span><span style="color:#56B6C2;"> =</span><span style="color:#98C379;"> '127.0.0.1:6668'</span><span style="color:#ABB2BF;">, </span><span style="color:#7F848E;font-style:italic;">-- The URL of the data service port of the DataNode node on the target IoTDB</span></span> |
| <span class="line"><span style="color:#ABB2BF;">)</span></span> |
| <span class="line"><span style="color:#ABB2BF;">)</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="_3-4-cascading-data-transfer" tabindex="-1"><a class="header-anchor" href="#_3-4-cascading-data-transfer"><span>3.4 Cascading data transfer</span></a></h3><p>This example is used to demonstrate the scenario where data is transferred in a cascading manner between multiple IoTDB, with data from cluster A synchronized to cluster B, and then to cluster C, as shown in the figure below:</p><figure><img src="`+p+`" alt="" tabindex="0" loading="lazy"><figcaption></figcaption></figure><p>In this example, to synchronize the data from cluster A to C, the <code>forwarding-pipe-requests</code> needs to be set to <code>true</code> between BC. The detailed statement is as follows:</p><p>On A IoTDB, execute the following statement to synchronize data from A to B:</p><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" data-title="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code><span class="line"><span style="color:#C678DD;">create</span><span style="color:#ABB2BF;"> pipe AB</span></span> |
| <span class="line"><span style="color:#C678DD;">with</span><span style="color:#ABB2BF;"> sink (</span></span> |
| <span class="line"><span style="color:#98C379;"> 'sink'</span><span style="color:#56B6C2;">=</span><span style="color:#98C379;">'iotdb-thrift-sink'</span><span style="color:#ABB2BF;">,</span></span> |
| <span class="line"><span style="color:#98C379;"> 'node-urls'</span><span style="color:#56B6C2;"> =</span><span style="color:#98C379;"> '127.0.0.1:6668'</span><span style="color:#ABB2BF;">, </span><span style="color:#7F848E;font-style:italic;">-- The URL of the data service port of the DataNode node on the target IoTDB</span></span> |
| <span class="line"><span style="color:#ABB2BF;">)</span></span> |
| <span class="line"><span style="color:#ABB2BF;">)</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>On B IoTDB, execute the following statement to synchronize data from B to C:</p><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" data-title="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code><span class="line"><span style="color:#C678DD;">create</span><span style="color:#ABB2BF;"> pipe BC</span></span> |
| <span class="line"><span style="color:#C678DD;">with</span><span style="color:#ABB2BF;"> source (</span></span> |
| <span class="line"><span style="color:#98C379;"> 'forwarding-pipe-requests'</span><span style="color:#56B6C2;"> =</span><span style="color:#98C379;"> 'true'</span><span style="color:#7F848E;font-style:italic;"> -- Whether to forward data written by other Pipes</span></span> |
| <span class="line"><span style="color:#ABB2BF;">)</span></span> |
| <span class="line"><span style="color:#C678DD;">with</span><span style="color:#ABB2BF;"> sink (</span></span> |
| <span class="line"><span style="color:#98C379;"> 'sink'</span><span style="color:#56B6C2;">=</span><span style="color:#98C379;">'iotdb-thrift-sink'</span><span style="color:#ABB2BF;">,</span></span> |
| <span class="line"><span style="color:#98C379;"> 'node-urls'</span><span style="color:#56B6C2;"> =</span><span style="color:#98C379;"> '127.0.0.1:6669'</span><span style="color:#ABB2BF;">, </span><span style="color:#7F848E;font-style:italic;">-- The URL of the data service port of the DataNode node on the target IoTDB</span></span> |
| <span class="line"><span style="color:#ABB2BF;">)</span></span> |
| <span class="line"><span style="color:#ABB2BF;">)</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="_3-5-compression-synchronization-v1-3-3" tabindex="-1"><a class="header-anchor" href="#_3-5-compression-synchronization-v1-3-3"><span>3.5 Compression Synchronization (V1.3.3+)</span></a></h3><p>IoTDB supports specifying data compression methods during synchronization. Real time compression and transmission of data can be achieved by configuring the <code>compressor</code> parameter. <code>Compressor</code> currently supports 5 optional algorithms: snappy/gzip/lz4/zstd/lzma2, and can choose multiple compression algorithm combinations to compress in the order of configuration <code>rate-limit-bytes-per-second</code>(supported in V1.3.3 and later versions) is the maximum number of bytes allowed to be transmitted per second, calculated as compressed bytes. If it is less than 0, there is no limit.</p><p>For example, to create a synchronization task named A2B:</p><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" data-title="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code><span class="line"><span style="color:#C678DD;">create</span><span style="color:#ABB2BF;"> pipe A2B </span></span> |
| <span class="line"><span style="color:#C678DD;">with</span><span style="color:#ABB2BF;"> sink (</span></span> |
| <span class="line"><span style="color:#98C379;"> 'node-urls'</span><span style="color:#56B6C2;"> =</span><span style="color:#98C379;"> '127.0.0.1:6668'</span><span style="color:#ABB2BF;">, </span><span style="color:#7F848E;font-style:italic;">-- The URL of the data service port of the DataNode node on the target IoTDB</span></span> |
| <span class="line"><span style="color:#98C379;"> 'compressor'</span><span style="color:#56B6C2;"> =</span><span style="color:#98C379;"> 'snappy,lz4'</span><span style="color:#7F848E;font-style:italic;"> -- Compression algorithms</span></span> |
| <span class="line"><span style="color:#ABB2BF;">)</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="_3-6-encrypted-synchronization-v1-3-1" tabindex="-1"><a class="header-anchor" href="#_3-6-encrypted-synchronization-v1-3-1"><span>3.6 Encrypted Synchronization (V1.3.1+)</span></a></h3><p>IoTDB supports the use of SSL encryption during the synchronization process, ensuring the secure transfer of data between different IoTDB instances. By configuring SSL-related parameters, such as the certificate address and password (<code>ssl.trust-store-path</code>)、(<code>ssl.trust-store-pwd</code>), data can be protected by SSL encryption during the synchronization process.</p><p>For example, to create a synchronization task named A2B:</p><div class="language-sql line-numbers-mode" data-highlighter="shiki" data-ext="sql" data-title="sql" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code><span class="line"><span style="color:#C678DD;">create</span><span style="color:#ABB2BF;"> pipe A2B</span></span> |
| <span class="line"><span style="color:#C678DD;">with</span><span style="color:#ABB2BF;"> sink (</span></span> |
| <span class="line"><span style="color:#98C379;"> 'sink'</span><span style="color:#56B6C2;">=</span><span style="color:#98C379;">'iotdb-thrift-ssl-sink'</span><span style="color:#ABB2BF;">,</span></span> |
| <span class="line"><span style="color:#98C379;"> 'node-urls'</span><span style="color:#56B6C2;">=</span><span style="color:#98C379;">'127.0.0.1:6667'</span><span style="color:#ABB2BF;">, </span><span style="color:#7F848E;font-style:italic;">-- The URL of the data service port of the DataNode node on the target IoTDB</span></span> |
| <span class="line"><span style="color:#98C379;"> 'ssl.trust-store-path'</span><span style="color:#56B6C2;">=</span><span style="color:#98C379;">'pki/trusted'</span><span style="color:#ABB2BF;">, </span><span style="color:#7F848E;font-style:italic;">-- The trust store certificate path required to connect to the target DataNode</span></span> |
| <span class="line"><span style="color:#98C379;"> 'ssl.trust-store-pwd'</span><span style="color:#56B6C2;">=</span><span style="color:#98C379;">'root'</span><span style="color:#7F848E;font-style:italic;"> -- The trust store certificate password required to connect to the target DataNode</span></span> |
| <span class="line"><span style="color:#ABB2BF;">)</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h2 id="_4-reference-notes" tabindex="-1"><a class="header-anchor" href="#_4-reference-notes"><span>4. Reference: Notes</span></a></h2><p>You can adjust the parameters for data synchronization by modifying the IoTDB configuration file (<code>iotdb-system.properties</code>), such as the directory for storing synchronized data. The complete configuration is as follows:</p><p>V1.3.3+:</p><div class="language-properties line-numbers-mode" data-highlighter="shiki" data-ext="properties" data-title="properties" style="background-color:#282c34;color:#abb2bf;"><pre class="shiki one-dark-pro vp-code"><code><span class="line"><span style="color:#7F848E;font-style:italic;"># pipe_receiver_file_dir</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># If this property is unset, system will save the data in the default relative path directory under the IoTDB folder(i.e., %IOTDB_HOME%/\${cn_system_dir}/pipe/receiver).</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># If it is absolute, system will save the data in the exact location it points to.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># If it is relative, system will save the data in the relative path directory it indicates under the IoTDB folder.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># Note: If pipe_receiver_file_dir is assigned an empty string(i.e.,zero-size), it will be handled as a relative path.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># effectiveMode: restart</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># For windows platform</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># If its prefix is a drive specifier followed by "\\\\", or if its prefix is "\\\\\\\\", then the path is absolute. Otherwise, it is relative.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># pipe_receiver_file_dir=data\\\\confignode\\\\system\\\\pipe\\\\receiver</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># For Linux platform</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># If its prefix is "/", then the path is absolute. Otherwise, it is relative.</span></span> |
| <span class="line"><span style="color:#C678DD;">pipe_receiver_file_dir</span><span style="color:#ABB2BF;">=</span><span style="color:#98C379;">data/confignode/system/pipe/receiver</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;">####################</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;">### Pipe Configuration</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;">####################</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># Uncomment the following field to configure the pipe lib directory.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># effectiveMode: first_start</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># For Windows platform</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># If its prefix is a drive specifier followed by "\\\\", or if its prefix is "\\\\\\\\", then the path is</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># absolute. Otherwise, it is relative.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># pipe_lib_dir=ext\\\\pipe</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># For Linux platform</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># If its prefix is "/", then the path is absolute. Otherwise, it is relative.</span></span> |
| <span class="line"><span style="color:#C678DD;">pipe_lib_dir</span><span style="color:#ABB2BF;">=</span><span style="color:#98C379;">ext/pipe</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)).</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># effectiveMode: restart</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># Datatype: int</span></span> |
| <span class="line"><span style="color:#C678DD;">pipe_subtask_executor_max_thread_num</span><span style="color:#ABB2BF;">=</span><span style="color:#98C379;">5</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># The connection timeout (in milliseconds) for the thrift client.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># effectiveMode: restart</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># Datatype: int</span></span> |
| <span class="line"><span style="color:#C678DD;">pipe_sink_timeout_ms</span><span style="color:#ABB2BF;">=</span><span style="color:#98C379;">900000</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># The maximum number of selectors that can be used in the sink.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># Recommend to set this value to less than or equal to pipe_sink_max_client_number.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># effectiveMode: restart</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># Datatype: int</span></span> |
| <span class="line"><span style="color:#C678DD;">pipe_sink_selector_number</span><span style="color:#ABB2BF;">=</span><span style="color:#98C379;">4</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># The maximum number of clients that can be used in the sink.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># effectiveMode: restart</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># Datatype: int</span></span> |
| <span class="line"><span style="color:#C678DD;">pipe_sink_max_client_number</span><span style="color:#ABB2BF;">=</span><span style="color:#98C379;">16</span></span> |
| <span class="line"></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># The total bytes that all pipe sinks can transfer per second.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># When given a value less than or equal to 0, it means no limit.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># default value is -1, which means no limit.</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># effectiveMode: hot_reload</span></span> |
| <span class="line"><span style="color:#7F848E;font-style:italic;"># Datatype: double</span></span> |
| <span class="line"><span style="color:#C678DD;">pipe_all_sinks_rate_limit_bytes_per_second</span><span style="color:#ABB2BF;">=</span><span style="color:#98C379;">-1</span></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h2 id="_5-reference-parameter-description" tabindex="-1"><a class="header-anchor" href="#_5-reference-parameter-description"><span>5. Reference: parameter description</span></a></h2><h3 id="_5-1-source-parameter-v1-3-3" tabindex="-1"><a class="header-anchor" href="#_5-1-source-parameter-v1-3-3"><span>5.1 source parameter(V1.3.3)</span></a></h3><table><thead><tr><th style="text-align:left;">key</th><th style="text-align:left;">value</th><th style="text-align:left;">value range</th><th style="text-align:left;">required or not</th><th style="text-align:left;">default value</th></tr></thead><tbody><tr><td style="text-align:left;">source</td><td style="text-align:left;">iotdb-source</td><td style="text-align:left;">String: iotdb-source</td><td style="text-align:left;">Required</td><td style="text-align:left;">-</td></tr><tr><td style="text-align:left;">inclusion</td><td style="text-align:left;">Used to specify the range of data to be synchronized in the data synchronization task, including data, schema, and auth</td><td style="text-align:left;">String:all, data(insert,delete), schema(database,timeseries,ttl), auth</td><td style="text-align:left;">Optional</td><td style="text-align:left;">data.insert</td></tr><tr><td style="text-align:left;">inclusion.exclusion</td><td style="text-align:left;">Used to exclude specific operations from the range specified by inclusion, reducing the amount of data synchronized</td><td style="text-align:left;">String:all, data(insert,delete), schema(database,timeseries,ttl), auth</td><td style="text-align:left;">Optional</td><td style="text-align:left;">-</td></tr><tr><td style="text-align:left;">path</td><td style="text-align:left;">Used to filter the path pattern schema of time series and data to be synchronized / schema synchronization can only use pathpath is exact matching, parameters must be prefix paths or complete paths, i.e., cannot contain <code>"*"</code>, at most one <code>"**"</code> at the end of the path parameter</td><td style="text-align:left;">String:IoTDB pattern</td><td style="text-align:left;">Optional</td><td style="text-align:left;">root.**</td></tr><tr><td style="text-align:left;">pattern</td><td style="text-align:left;">Used to filter the path prefix of time series</td><td style="text-align:left;">String: Optional</td><td style="text-align:left;">Optional</td><td style="text-align:left;">root</td></tr><tr><td style="text-align:left;">start-time</td><td style="text-align:left;">The start event time for synchronizing all data, including start-time</td><td style="text-align:left;">Long: [Long.MIN_VALUE, Long.MAX_VALUE]</td><td style="text-align:left;">Optional</td><td style="text-align:left;">Long.MIN_VALUE</td></tr><tr><td style="text-align:left;">end-time</td><td style="text-align:left;">The end event time for synchronizing all data, including end-time</td><td style="text-align:left;">Long: [Long.MIN_VALUE, Long.MAX_VALUE]</td><td style="text-align:left;">Optional</td><td style="text-align:left;">Long.MAX_VALUE</td></tr><tr><td style="text-align:left;">realtime.mode</td><td style="text-align:left;">The extraction mode for newly inserted data (after pipe creation)</td><td style="text-align:left;">String: batch</td><td style="text-align:left;">Optional</td><td style="text-align:left;">batch</td></tr><tr><td style="text-align:left;">forwarding-pipe-requests</td><td style="text-align:left;">Whether to forward data written by other Pipes (usually data synchronization)</td><td style="text-align:left;">Boolean: true</td><td style="text-align:left;">Optional</td><td style="text-align:left;">true</td></tr><tr><td style="text-align:left;">history.loose-range</td><td style="text-align:left;">When transferring TsFile, whether to relax the range of historical data (before the creation of the pipe). "": Do not relax the range, select data strictly according to the set conditions. "time": Relax the time range to avoid splitting TsFile, which can improve synchronization efficiency. "path": Relax the path range to avoid splitting TsFile, which can improve synchronization efficiency. "time, path", "path, time", "all": Relax all ranges to avoid splitting TsFile, which can improve synchronization efficiency.</td><td style="text-align:left;">String: "" 、 "time" 、 "path" 、 "time, path" 、 "path, time" 、 "all"</td><td style="text-align:left;">Optional</td><td style="text-align:left;">""</td></tr><tr><td style="text-align:left;">realtime.loose-range</td><td style="text-align:left;">When transferring TsFile, whether to relax the range of real-time data (before the creation of the pipe). "": Do not relax the range, select data strictly according to the set conditions. "time": Relax the time range to avoid splitting TsFile, which can improve synchronization efficiency. "path": Relax the path range to avoid splitting TsFile, which can improve synchronization efficiency. "time, path", "path, time", "all": Relax all ranges to avoid splitting TsFile, which can improve synchronization efficiency.</td><td style="text-align:left;">String: "" 、 "time" 、 "path" 、 "time, path" 、 "path, time" 、 "all"</td><td style="text-align:left;">Optional</td><td style="text-align:left;">""</td></tr><tr><td style="text-align:left;">mods.enable</td><td style="text-align:left;">Whether to send the mods file of tsfile</td><td style="text-align:left;">Boolean: true / false</td><td style="text-align:left;">Optional</td><td style="text-align:left;">false</td></tr></tbody></table><blockquote><p>💎 <strong>Explanation</strong>:To maintain compatibility with lower versions, history.enable, history.start-time, history.end-time, realtime.enable can still be used, but they are not recommended in the new version.</p><p>💎 <strong>Explanation: Differences between Stream and Batch Data Extraction Modes</strong></p><ul><li><strong>stream (recommended)</strong>: In this mode, tasks process and send data in real-time. It is characterized by high timeliness and low throughput.</li><li><strong>batch</strong>: In this mode, tasks process and send data in batches (according to the underlying data files). It is characterized by low timeliness and high throughput.</li></ul></blockquote><h3 id="_5-2-sink-parameter" tabindex="-1"><a class="header-anchor" href="#_5-2-sink-parameter"><span>5.2 sink parameter</span></a></h3><blockquote><p>In versions 1.3.3 and above, when only the sink is included, the additional "with sink" prefix is no longer required.</p></blockquote><h4 id="iotdb-thrift-sink" tabindex="-1"><a class="header-anchor" href="#iotdb-thrift-sink"><span>iotdb-thrift-sink</span></a></h4><table><thead><tr><th style="text-align:left;">key</th><th style="text-align:left;">value</th><th style="text-align:left;">value Range</th><th style="text-align:left;">required or not</th><th style="text-align:left;">Default Value</th></tr></thead><tbody><tr><td style="text-align:left;">sink</td><td style="text-align:left;">iotdb-thrift-sink or iotdb-thrift-async-sink</td><td style="text-align:left;">String: iotdb-thrift-sink or iotdb-thrift-async-sink</td><td style="text-align:left;">Required</td><td style="text-align:left;"></td></tr><tr><td style="text-align:left;">node-urls</td><td style="text-align:left;">The URL of the data service port of any DataNode nodes on the target IoTDB (please note that synchronization tasks do not support forwarding to its own service)</td><td style="text-align:left;">String. Example: '127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669', '127.0.0.1:6667'</td><td style="text-align:left;">Required</td><td style="text-align:left;">-</td></tr><tr><td style="text-align:left;">batch.enable</td><td style="text-align:left;">Whether to enable batched log transmission mode to improve transmission throughput and reduce IOPS</td><td style="text-align:left;">Boolean: true, false</td><td style="text-align:left;">Optional</td><td style="text-align:left;">true</td></tr><tr><td style="text-align:left;">batch.max-delay-seconds</td><td style="text-align:left;">Effective when batched log transmission mode is enabled, it represents the maximum waiting time for a batch of data before sending (unit: s)</td><td style="text-align:left;">Integer</td><td style="text-align:left;">Optional</td><td style="text-align:left;">1</td></tr><tr><td style="text-align:left;">batch.size-bytes</td><td style="text-align:left;">Effective when batched log transmission mode is enabled, it represents the maximum batch size for a batch of data (unit: byte)</td><td style="text-align:left;">Long</td><td style="text-align:left;">Optional</td><td style="text-align:left;">16<em>1024</em>1024</td></tr></tbody></table><h4 id="iotdb-thrift-ssl-sink" tabindex="-1"><a class="header-anchor" href="#iotdb-thrift-ssl-sink"><span>iotdb-thrift-ssl-sink</span></a></h4><table><thead><tr><th style="text-align:left;">key</th><th style="text-align:left;">value</th><th style="text-align:left;">value Range</th><th style="text-align:left;">required or not</th><th style="text-align:left;">Default Value</th></tr></thead><tbody><tr><td style="text-align:left;">sink</td><td style="text-align:left;">iotdb-thrift-ssl-sink</td><td style="text-align:left;">String: iotdb-thrift-ssl-sink</td><td style="text-align:left;">Required</td><td style="text-align:left;">-</td></tr><tr><td style="text-align:left;">node-urls</td><td style="text-align:left;">The URL of the data service port of any DataNode nodes on the target IoTDB (please note that synchronization tasks do not support forwarding to its own service)</td><td style="text-align:left;">String. Example: '127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669', '127.0.0.1:6667'</td><td style="text-align:left;">Required</td><td style="text-align:left;">-</td></tr><tr><td style="text-align:left;">batch.enable</td><td style="text-align:left;">Whether to enable batched log transmission mode to improve transmission throughput and reduce IOPS</td><td style="text-align:left;">Boolean: true, false</td><td style="text-align:left;">Optional</td><td style="text-align:left;">true</td></tr><tr><td style="text-align:left;">batch.max-delay-seconds</td><td style="text-align:left;">Effective when batched log transmission mode is enabled, it represents the maximum waiting time for a batch of data before sending (unit: s)</td><td style="text-align:left;">Integer</td><td style="text-align:left;">Optional</td><td style="text-align:left;">1</td></tr><tr><td style="text-align:left;">batch.size-bytes</td><td style="text-align:left;">Effective when batched log transmission mode is enabled, it represents the maximum batch size for a batch of data (unit: byte)</td><td style="text-align:left;">Long</td><td style="text-align:left;">Optional</td><td style="text-align:left;">16<em>1024</em>1024</td></tr><tr><td style="text-align:left;">ssl.trust-store-path</td><td style="text-align:left;">The trust store certificate path required to connect to the target DataNode</td><td style="text-align:left;">String: certificate directory name, when configured as a relative directory, it is relative to the IoTDB root directory. Example: '127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669', '127.0.0.1:6667'</td><td style="text-align:left;">Required</td><td style="text-align:left;">-</td></tr><tr><td style="text-align:left;">ssl.trust-store-pwd</td><td style="text-align:left;">The trust store certificate password required to connect to the target DataNode</td><td style="text-align:left;">Integer</td><td style="text-align:left;">Required</td><td style="text-align:left;">-</td></tr></tbody></table>`,52))])}const D=c(g,[["render",B],["__file","Data-Sync_apache.html.vue"]]),F=JSON.parse('{"path":"/UserGuide/latest/User-Manual/Data-Sync_apache.html","title":"Data Sync","lang":"en-US","frontmatter":{"description":"Data Sync Data synchronization is a typical requirement in industrial Internet of Things (IoT). Through data synchronization mechanisms, it is possible to achieve data sharing b...","head":[["link",{"rel":"alternate","hreflang":"zh-cn","href":"https://iotdb.apache.org/zh/UserGuide/latest/User-Manual/Data-Sync_apache.html"}],["meta",{"property":"og:url","content":"https://iotdb.apache.org/UserGuide/latest/User-Manual/Data-Sync_apache.html"}],["meta",{"property":"og:site_name","content":"IoTDB Website"}],["meta",{"property":"og:title","content":"Data Sync"}],["meta",{"property":"og:description","content":"Data Sync Data synchronization is a typical requirement in industrial Internet of Things (IoT). Through data synchronization mechanisms, it is possible to achieve data sharing b..."}],["meta",{"property":"og:type","content":"article"}],["meta",{"property":"og:image","content":"https://iotdb.apache.org/img/sync_en_01.png"}],["meta",{"property":"og:locale","content":"en-US"}],["meta",{"property":"og:locale:alternate","content":"zh-CN"}],["meta",{"property":"og:updated_time","content":"2025-03-04T02:53:50.000Z"}],["meta",{"property":"article:modified_time","content":"2025-03-04T02:53:50.000Z"}],["script",{"type":"application/ld+json"},"{\\"@context\\":\\"https://schema.org\\",\\"@type\\":\\"Article\\",\\"headline\\":\\"Data Sync\\",\\"image\\":[\\"https://iotdb.apache.org/img/sync_en_01.png\\",\\"https://iotdb.apache.org/img/Data-Sync02.png\\",\\"https://iotdb.apache.org/img/pipe1.jpg\\",\\"https://iotdb.apache.org/img/pipe2.jpg\\",\\"https://iotdb.apache.org/img/sync_en_03.png\\",\\"https://iotdb.apache.org/img/sync_en_04.png\\"],\\"dateModified\\":\\"2025-03-04T02:53:50.000Z\\",\\"author\\":[]}"]]},"headers":[{"level":2,"title":"1. Function Overview","slug":"_1-function-overview","link":"#_1-function-overview","children":[{"level":3,"title":"1.1 Data Synchronization","slug":"_1-1-data-synchronization","link":"#_1-1-data-synchronization","children":[]},{"level":3,"title":"1.2 Functional limitations and instructions","slug":"_1-2-functional-limitations-and-instructions","link":"#_1-2-functional-limitations-and-instructions","children":[]}]},{"level":2,"title":"2. Usage Instructions","slug":"_2-usage-instructions","link":"#_2-usage-instructions","children":[{"level":3,"title":"2.1 Create Task","slug":"_2-1-create-task","link":"#_2-1-create-task","children":[]},{"level":3,"title":"2.2 Start Task","slug":"_2-2-start-task","link":"#_2-2-start-task","children":[]},{"level":3,"title":"2.3 Stop Task","slug":"_2-3-stop-task","link":"#_2-3-stop-task","children":[]},{"level":3,"title":"2.4 Delete Task","slug":"_2-4-delete-task","link":"#_2-4-delete-task","children":[]},{"level":3,"title":"2.5 View Task","slug":"_2-5-view-task","link":"#_2-5-view-task","children":[]},{"level":3,"title":"2.6 Synchronization Plugins","slug":"_2-6-synchronization-plugins","link":"#_2-6-synchronization-plugins","children":[]}]},{"level":2,"title":"3. Use examples","slug":"_3-use-examples","link":"#_3-use-examples","children":[{"level":3,"title":"3.1 Full data synchronisation","slug":"_3-1-full-data-synchronisation","link":"#_3-1-full-data-synchronisation","children":[]},{"level":3,"title":"3.2 Partial data synchronization","slug":"_3-2-partial-data-synchronization","link":"#_3-2-partial-data-synchronization","children":[]},{"level":3,"title":"3.3 Edge-cloud data transfer","slug":"_3-3-edge-cloud-data-transfer","link":"#_3-3-edge-cloud-data-transfer","children":[]},{"level":3,"title":"3.4 Cascading data transfer","slug":"_3-4-cascading-data-transfer","link":"#_3-4-cascading-data-transfer","children":[]},{"level":3,"title":"3.5 Compression Synchronization (V1.3.3+)","slug":"_3-5-compression-synchronization-v1-3-3","link":"#_3-5-compression-synchronization-v1-3-3","children":[]},{"level":3,"title":"3.6 Encrypted Synchronization (V1.3.1+)","slug":"_3-6-encrypted-synchronization-v1-3-1","link":"#_3-6-encrypted-synchronization-v1-3-1","children":[]}]},{"level":2,"title":"4. Reference: Notes","slug":"_4-reference-notes","link":"#_4-reference-notes","children":[]},{"level":2,"title":"5. Reference: parameter description","slug":"_5-reference-parameter-description","link":"#_5-reference-parameter-description","children":[{"level":3,"title":"5.1 source parameter(V1.3.3)","slug":"_5-1-source-parameter-v1-3-3","link":"#_5-1-source-parameter-v1-3-3","children":[]},{"level":3,"title":"5.2 sink parameter","slug":"_5-2-sink-parameter","link":"#_5-2-sink-parameter","children":[]}]}],"git":{"createdTime":1696932526000,"updatedTime":1741056830000,"contributors":[{"name":"wanghui42","username":"wanghui42","email":"105700158+wanghui42@users.noreply.github.com","commits":2,"url":"https://github.com/wanghui42"},{"name":"Haonan","username":"Haonan","email":"hhaonan@outlook.com","commits":1,"url":"https://github.com/Haonan"},{"name":"CritasWang","username":"CritasWang","email":"critas@outlook.com","commits":3,"url":"https://github.com/CritasWang"},{"name":"Caideyipi","username":"Caideyipi","email":"87789683+Caideyipi@users.noreply.github.com","commits":1,"url":"https://github.com/Caideyipi"},{"name":"shuwenwei","username":"shuwenwei","email":"55970239+shuwenwei@users.noreply.github.com","commits":1,"url":"https://github.com/shuwenwei"},{"name":"majialin","username":"majialin","email":"107627937+mal117@users.noreply.github.com","commits":4,"url":"https://github.com/majialin"},{"name":"石林松","username":"石林松","email":"50943998+shi10lin0s@users.noreply.github.com","commits":1,"url":"https://github.com/石林松"},{"name":"W1y1r","username":"W1y1r","email":"150988475+W1y1r@users.noreply.github.com","commits":7,"url":"https://github.com/W1y1r"},{"name":"Mister-Hope","username":"Mister-Hope","email":"mister-hope@outlook.com","commits":1,"url":"https://github.com/Mister-Hope"},{"name":"leto-b","username":"leto-b","email":"bingqian.bai@timecho.com","commits":2,"url":"https://github.com/leto-b"}]},"readingTime":{"minutes":12.28,"words":3683},"filePathRelative":"UserGuide/latest/User-Manual/Data-Sync_apache.md","localizedDate":"October 10, 2023","autoDesc":true}');export{D as comp,F as data}; |