|  | import{_ as l,r as o,o as c,c as d,b as t,d as e,a as n,w as s,e as i}from"./app-DEtdaW2x.js";const p={},u=t("h1",{id:"iotdb-data-sync",tabindex:"-1"},[t("a",{class:"header-anchor",href:"#iotdb-data-sync"},[t("span",null,"IoTDB Data Sync")])],-1),h=i(`<p><strong>A Pipe consists of three subtasks (plugins):</strong></p><ul><li>Extract</li><li>Process</li><li>Connect</li></ul><p><strong>Pipe allows users to customize the processing logic of these three subtasks, just like handling data using UDF (User-Defined Functions)</strong>. Within a Pipe, the aforementioned subtasks are executed and implemented by three types of plugins. Data flows through these three plugins sequentially: Pipe Extractor is used to extract data, Pipe Processor is used to process data, and Pipe Connector is used to send data to an external system.</p><p><strong>The model of a Pipe task is as follows:</strong></p><figure><img src="https://alioss.timecho.com/upload/pipe.png" alt="pipe.png" tabindex="0" loading="lazy"><figcaption>pipe.png</figcaption></figure><p>It describes a data sync task, which essentially describes the attributes of the Pipe Extractor, Pipe Processor, and Pipe Connector plugins. Users can declaratively configure the specific attributes of the three subtasks through SQL statements. By combining different attributes, flexible data ETL (Extract, Transform, Load) capabilities can be achieved.</p><p>By utilizing the data sync functionality, a complete data pipeline can be built to fulfill various requirements such as edge-to-cloud sync, remote disaster recovery, and read-write workload distribution across multiple databases.</p><h2 id="quick-start" tabindex="-1"><a class="header-anchor" href="#quick-start"><span>Quick Start</span></a></h2><p><strong>🎯 Goal: Achieve full data sync of IoTDB A -> IoTDB B</strong></p><ul><li><p>Start two IoTDBs,A(datanode -> 127.0.0.1:6667) B(datanode -> 127.0.0.1:6668)</p></li><li><p>create a Pipe from A -> B, and execute on A</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">create</span> pipe a2b | 
|  | <span class="token keyword">with</span> connector <span class="token punctuation">(</span> | 
|  | <span class="token string">'connector'</span><span class="token operator">=</span><span class="token string">'iotdb-thrift-connector'</span><span class="token punctuation">,</span> | 
|  | <span class="token string">'connector.ip'</span><span class="token operator">=</span><span class="token string">'127.0.0.1'</span><span class="token punctuation">,</span> | 
|  | <span class="token string">'connector.port'</span><span class="token operator">=</span><span class="token string">'6668'</span> | 
|  | <span class="token punctuation">)</span> | 
|  | </code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div></li><li><p>start a Pipe from A -> B, and execute on A</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">start</span> pipe a2b | 
|  | </code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div></li><li><p>Write data to A</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">INSERT</span> <span class="token keyword">INTO</span> root<span class="token punctuation">.</span>db<span class="token punctuation">.</span>d<span class="token punctuation">(</span><span class="token keyword">time</span><span class="token punctuation">,</span> m<span class="token punctuation">)</span> <span class="token keyword">values</span> <span class="token punctuation">(</span><span class="token number">1</span><span class="token punctuation">,</span> <span class="token number">1</span><span class="token punctuation">)</span> | 
|  | </code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div></li><li><p>Checking data synchronised from A at B</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">SELECT</span> <span class="token operator">*</span><span class="token operator">*</span> <span class="token keyword">FROM</span> root | 
|  | </code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div></li></ul><blockquote><p>❗️<strong>Note: The current IoTDB -> IoTDB implementation of data sync does not support DDL sync</strong></p><p>That is: ttl, trigger, alias, template, view, create/delete sequence, create/delete database, etc. are not supported.</p><p><strong>IoTDB -> IoTDB data sync requires the target IoTDB:</strong></p><ul><li>Enable automatic metadata creation: manual configuration of encoding and compression of data types to be consistent with the sender is required</li><li>Do not enable automatic metadata creation: manually create metadata that is consistent with the source</li></ul></blockquote><h2 id="sync-task-management" tabindex="-1"><a class="header-anchor" href="#sync-task-management"><span>Sync Task Management</span></a></h2><h3 id="create-a-sync-task" tabindex="-1"><a class="header-anchor" href="#create-a-sync-task"><span>Create a sync task</span></a></h3><p>A data sync task can be created using the <code>CREATE PIPE</code> statement, a sample SQL statement is shown below:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">CREATE</span> PIPE <span class="token operator"><</span>PipeId<span class="token operator">></span> <span class="token comment">-- PipeId is the name that uniquely identifies the sync task</span> | 
|  | <span class="token keyword">WITH</span> EXTRACTOR <span class="token punctuation">(</span> | 
|  | <span class="token comment">-- Default IoTDB Data Extraction Plugin</span> | 
|  | <span class="token string">'extractor'</span>                    <span class="token operator">=</span> <span class="token string">'iotdb-extractor'</span><span class="token punctuation">,</span> | 
|  | <span class="token comment">-- Path prefix, only data that can match the path prefix will be extracted for subsequent processing and delivery</span> | 
|  | <span class="token string">'extractor.pattern'</span>            <span class="token operator">=</span> <span class="token string">'root.timecho'</span><span class="token punctuation">,</span> | 
|  | <span class="token comment">-- Whether to extract historical data</span> | 
|  | <span class="token string">'extractor.history.enable'</span>     <span class="token operator">=</span> <span class="token string">'true'</span><span class="token punctuation">,</span> | 
|  | <span class="token comment">-- Describes the time range of the historical data being extracted, indicating the earliest possible time</span> | 
|  | <span class="token string">'extractor.history.start-time'</span> <span class="token operator">=</span> <span class="token string">'2011.12.03T10:15:30+01:00'</span><span class="token punctuation">,</span> | 
|  | <span class="token comment">-- Describes the time range of the extracted historical data, indicating the latest time</span> | 
|  | <span class="token string">'extractor.history.end-time'</span>   <span class="token operator">=</span> <span class="token string">'2022.12.03T10:15:30+01:00'</span><span class="token punctuation">,</span> | 
|  | <span class="token comment">-- Whether to extract realtime data</span> | 
|  | <span class="token string">'extractor.realtime.enable'</span>    <span class="token operator">=</span> <span class="token string">'true'</span><span class="token punctuation">,</span> | 
|  | <span class="token punctuation">)</span> | 
|  | <span class="token keyword">WITH</span> PROCESSOR <span class="token punctuation">(</span> | 
|  | <span class="token comment">-- Default data processing plugin, means no processing</span> | 
|  | <span class="token string">'processor'</span>                    <span class="token operator">=</span> <span class="token string">'do-nothing-processor'</span><span class="token punctuation">,</span> | 
|  | <span class="token punctuation">)</span> | 
|  | <span class="token keyword">WITH</span> CONNECTOR <span class="token punctuation">(</span> | 
|  | <span class="token comment">-- IoTDB data sending plugin with target IoTDB</span> | 
|  | <span class="token string">'connector'</span>                    <span class="token operator">=</span> <span class="token string">'iotdb-thrift-connector'</span><span class="token punctuation">,</span> | 
|  | <span class="token comment">-- Data service for one of the DataNode nodes on the target IoTDB ip</span> | 
|  | <span class="token string">'connector.ip'</span>                 <span class="token operator">=</span> <span class="token string">'127.0.0.1'</span><span class="token punctuation">,</span> | 
|  | <span class="token comment">-- Data service port of one of the DataNode nodes of the target IoTDB</span> | 
|  | <span class="token string">'connector.port'</span>               <span class="token operator">=</span> <span class="token string">'6667'</span><span class="token punctuation">,</span> | 
|  | <span class="token punctuation">)</span> | 
|  | </code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p><strong>To create a sync task it is necessary to configure the PipeId and the parameters of the three plugin sections:</strong></p>`,16),m=t("thead",null,[t("tr",null,[t("th",null,"configuration item"),t("th",null,"description"),t("th",null,"Required or not"),t("th",null,"default implementation"),t("th",null,"Default implementation description"),t("th",null,"Whether to allow custom implementations")])],-1),g=t("td",null,"pipeId",-1),b=t("td",null,"Globally uniquely identifies the name of a sync task",-1),v=t("td",null,"-",-1),k=t("td",null,"-",-1),f=t("td",null,"-",-1),y=t("tr",null,[t("td",null,"extractor"),t("td",null,"pipe Extractor plugin, for extracting synchronized data at the bottom of the database"),t("td",null,"Optional"),t("td",null,"iotdb-extractor"),t("td",null,"Integrate all historical data of the database and subsequent realtime data into the sync task"),t("td",null,"no")],-1),T=t("td",null,"processor",-1),w=t("td",null,"Pipe Processor plugin, for processing data",-1),x=t("td",null,"Optional",-1),P=t("td",null,"do-nothing-processor",-1),I=t("td",null,"no processing of incoming data",-1),D=t("td",null,"connector",-1),E=t("td",null,"Pipe Connector plugin,for sending data",-1),q=t("td",null,"-",-1),_=t("td",null,"-",-1),S=i(`<p>In the example, the iotdb-extractor, do-nothing-processor, and iotdb-thrift-connector plugins are used to build the data sync task. iotdb has other built-in data sync plugins, <strong>see the section "System Pre-built Data Sync Plugin"</strong>.<br><strong>An example of a minimalist CREATE PIPE statement is as follows:</strong></p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">CREATE</span> PIPE <span class="token operator"><</span>PipeId<span class="token operator">></span> <span class="token comment">-- PipeId is a name that uniquely identifies the task.</span> | 
|  | <span class="token keyword">WITH</span> CONNECTOR <span class="token punctuation">(</span> | 
|  | <span class="token comment">-- IoTDB data sending plugin with target IoTDB</span> | 
|  | <span class="token string">'connector'</span>      <span class="token operator">=</span> <span class="token string">'iotdb-thrift-connector'</span><span class="token punctuation">,</span> | 
|  | <span class="token comment">-- Data service for one of the DataNode nodes on the target IoTDB ip</span> | 
|  | <span class="token string">'connector.ip'</span>   <span class="token operator">=</span> <span class="token string">'127.0.0.1'</span><span class="token punctuation">,</span> | 
|  | <span class="token comment">-- Data service port of one of the DataNode nodes of the target IoTDB</span> | 
|  | <span class="token string">'connector.port'</span> <span class="token operator">=</span> <span class="token string">'6667'</span><span class="token punctuation">,</span> | 
|  | <span class="token punctuation">)</span> | 
|  | </code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>The expressed semantics are: synchronise the full amount of historical data and subsequent arrivals of realtime data from this database instance to the IoTDB instance with target 127.0.0.1:6667.</p><p><strong>Note:</strong></p><ul><li><p>EXTRACTOR and PROCESSOR are optional, if no configuration parameters are filled in, the system will use the corresponding default implementation.</p></li><li><p>The CONNECTOR is a mandatory configuration that needs to be declared in the CREATE PIPE statement for configuring purposes.</p></li><li><p>The CONNECTOR exhibits self-reusability. For different tasks, if their CONNECTOR possesses identical KV properties (where the value corresponds to every key), <strong>the system will ultimately create only one instance of the CONNECTOR</strong> to achieve resource reuse for connections.</p><ul><li>For example, there are the following pipe1, pipe2 task declarations:</li></ul><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">CREATE</span> PIPE pipe1 | 
|  | <span class="token keyword">WITH</span> CONNECTOR <span class="token punctuation">(</span> | 
|  | <span class="token string">'connector'</span> <span class="token operator">=</span> <span class="token string">'iotdb-thrift-connector'</span><span class="token punctuation">,</span> | 
|  | <span class="token string">'connector.thrift.host'</span> <span class="token operator">=</span> <span class="token string">'localhost'</span><span class="token punctuation">,</span> | 
|  | <span class="token string">'connector.thrift.port'</span> <span class="token operator">=</span> <span class="token string">'9999'</span><span class="token punctuation">,</span> | 
|  | <span class="token punctuation">)</span> | 
|  |  | 
|  | <span class="token keyword">CREATE</span> PIPE pipe2 | 
|  | <span class="token keyword">WITH</span> CONNECTOR <span class="token punctuation">(</span> | 
|  | <span class="token string">'connector'</span> <span class="token operator">=</span> <span class="token string">'iotdb-thrift-connector'</span><span class="token punctuation">,</span> | 
|  | <span class="token string">'connector.thrift.port'</span> <span class="token operator">=</span> <span class="token string">'9999'</span><span class="token punctuation">,</span> | 
|  | <span class="token string">'connector.thrift.host'</span> <span class="token operator">=</span> <span class="token string">'localhost'</span><span class="token punctuation">,</span> | 
|  | <span class="token punctuation">)</span> | 
|  | </code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><ul><li>Since they have identical CONNECTOR declarations (<strong>even if the order of some properties is different</strong>), the framework will automatically reuse the CONNECTOR declared by them. Hence, the CONNECTOR instances for pipe1 and pipe2 will be the same.</li></ul></li><li><p>Please note that we should avoid constructing application scenarios that involve data cycle sync (as it can result in an infinite loop):</p><ul><li>IoTDB A -> IoTDB B -> IoTDB A</li><li>IoTDB A -> IoTDB A</li></ul></li></ul><h3 id="start-task" tabindex="-1"><a class="header-anchor" href="#start-task"><span>START TASK</span></a></h3><p>After the successful execution of the CREATE PIPE statement, task-related instances will be created. However, the overall task's running status will be set to STOPPED, meaning the task will not immediately process data.</p><p>You can use the START PIPE statement to begin processing data for a task:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">START</span> PIPE <span class="token operator"><</span>PipeId<span class="token operator">></span> | 
|  | </code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><h3 id="stop-task" tabindex="-1"><a class="header-anchor" href="#stop-task"><span>STOP TASK</span></a></h3><p>the STOP PIPE statement can be used to halt the data processing:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code>STOP PIPE <span class="token operator"><</span>PipeId<span class="token operator">></span> | 
|  | </code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><h3 id="delete-task" tabindex="-1"><a class="header-anchor" href="#delete-task"><span>DELETE TASK</span></a></h3><p>If a task is in the RUNNING state, you can use the DROP PIPE statement to stop the data processing and delete the entire task:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">DROP</span> PIPE <span class="token operator"><</span>PipeId<span class="token operator">></span> | 
|  | </code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><p>Before deleting a task, there is no need to execute the STOP operation.</p><h3 id="show-task" tabindex="-1"><a class="header-anchor" href="#show-task"><span>SHOW TASK</span></a></h3><p>You can use the SHOW PIPES statement to view all tasks:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">SHOW</span> PIPES | 
|  | </code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><p>The query results are as follows:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token operator">+</span><span class="token comment">-----------+-----------------------+-------+-------------+-------------+-------------+----------------+</span> | 
|  | <span class="token operator">|</span>         ID<span class="token operator">|</span>          CreationTime <span class="token operator">|</span>  State<span class="token operator">|</span>PipeExtractor<span class="token operator">|</span>PipeProcessor<span class="token operator">|</span>PipeConnector<span class="token operator">|</span>ExceptionMessage<span class="token operator">|</span> | 
|  | <span class="token operator">+</span><span class="token comment">-----------+-----------------------+-------+-------------+-------------+-------------+----------------+</span> | 
|  | <span class="token operator">|</span>iotdb<span class="token operator">-</span>kafka<span class="token operator">|</span><span class="token number">2022</span><span class="token operator">-</span><span class="token number">03</span><span class="token operator">-</span><span class="token number">30</span>T20:<span class="token number">58</span>:<span class="token number">30.689</span><span class="token operator">|</span>RUNNING<span class="token operator">|</span>          <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span>          <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span>          <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span>            None<span class="token operator">|</span> | 
|  | <span class="token operator">+</span><span class="token comment">-----------+-----------------------+-------+-------------+-------------+-------------+----------------+</span> | 
|  | <span class="token operator">|</span>iotdb<span class="token operator">-</span>iotdb<span class="token operator">|</span><span class="token number">2022</span><span class="token operator">-</span><span class="token number">03</span><span class="token operator">-</span><span class="token number">31</span>T12:<span class="token number">55</span>:<span class="token number">28.129</span><span class="token operator">|</span>STOPPED<span class="token operator">|</span>          <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span>          <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span>          <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span> TException: <span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token punctuation">.</span><span class="token operator">|</span> | 
|  | <span class="token operator">+</span><span class="token comment">-----------+-----------------------+-------+-------------+-------------+-------------+----------------+</span> | 
|  | </code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>You can use <PipeId> to specify the status of a particular synchronization task:</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">SHOW</span> PIPE <span class="token operator"><</span>PipeId<span class="token operator">></span> | 
|  | </code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><p>Additionally, the WHERE clause can be used to determine if the Pipe Connector used by a specific <PipeId> is being reused.</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">SHOW</span> PIPES | 
|  | <span class="token keyword">WHERE</span> CONNECTOR USED <span class="token keyword">BY</span> <span class="token operator"><</span>PipeId<span class="token operator">></span> | 
|  | </code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="task-running-status-migration" tabindex="-1"><a class="header-anchor" href="#task-running-status-migration"><span>Task Running Status Migration</span></a></h3><p>The task running status can transition through several states during the lifecycle of a data synchronization pipe:</p><ul><li><strong>STOPPED:</strong> The pipe is in a stopped state. It can have the following possibilities: <ul><li>After the successful creation of a pipe, its initial state is set to stopped</li><li>The user manually pauses a pipe that is in normal running state, transitioning its status from RUNNING to STOPPED</li><li>If a pipe encounters an unrecoverable error during execution, its status automatically changes from RUNNING to STOPPED.</li></ul></li><li><strong>RUNNING:</strong> The pipe is actively processing data</li><li><strong>DROPPED:</strong> The pipe is permanently deleted</li></ul><p>The following diagram illustrates the different states and their transitions:</p><figure><img src="https://alioss.timecho.com/docs/img/状态迁移图.png" alt="state migration diagram" tabindex="0" loading="lazy"><figcaption>state migration diagram</figcaption></figure><h2 id="system-pre-built-data-sync-plugin" tabindex="-1"><a class="header-anchor" href="#system-pre-built-data-sync-plugin"><span>System Pre-built Data Sync Plugin</span></a></h2><h3 id="view-pre-built-plugin" tabindex="-1"><a class="header-anchor" href="#view-pre-built-plugin"><span>View pre-built plugin</span></a></h3><p>User can view the plugins in the system on demand. The statement for viewing plugins is shown below.</p><div class="language-sql line-numbers-mode" data-ext="sql" data-title="sql"><pre class="language-sql"><code><span class="token keyword">SHOW</span> PIPEPLUGINS | 
|  | </code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div></div></div><h3 id="pre-built-extractor-plugin" tabindex="-1"><a class="header-anchor" href="#pre-built-extractor-plugin"><span>Pre-built Extractor Plugin</span></a></h3><h4 id="iotdb-extractor" tabindex="-1"><a class="header-anchor" href="#iotdb-extractor"><span>iotdb-extractor</span></a></h4><p>Function: Extract historical or realtime data inside IoTDB into pipe.</p><table><thead><tr><th>key</th><th>value</th><th>value range</th><th>required or optional with default</th></tr></thead><tbody><tr><td>extractor</td><td>iotdb-extractor</td><td>String: iotdb-extractor</td><td>required</td></tr><tr><td>extractor.pattern</td><td>path prefix for filtering time series</td><td>String: any time series prefix</td><td>optional: root</td></tr><tr><td>extractor.history.enable</td><td>whether to synchronize historical data</td><td>Boolean: true, false</td><td>optional: true</td></tr><tr><td>extractor.history.start-time</td><td>start of synchronizing historical data event time,Include start-time</td><td>Long: [Long.MIN_VALUE, Long.MAX_VALUE]</td><td>optional: Long.MIN_VALUE</td></tr><tr><td>extractor.history.end-time</td><td>end of synchronizing historical data event time,Include end-time</td><td>Long: [Long.MIN_VALUE, Long.MAX_VALUE]</td><td>optional: Long.MAX_VALUE</td></tr><tr><td>extractor.realtime.enable</td><td>Whether to synchronize realtime data</td><td>Boolean: true, false</td><td>optional: true</td></tr></tbody></table>`,38),B=t("p",null,[e("🚫 "),t("strong",null,"extractor.pattern Parameter Description")],-1),A={href:"https://iotdb.apache.org/zh/Download/#_1-0-%E7%89%88%E6%9C%AC%E4%B8%8D%E5%85%BC%E5%AE%B9%E7%9A%84%E8%AF%AD%E6%B3%95%E8%AF%A6%E7%BB%86%E8%AF%B4%E6%98%8E",target:"_blank",rel:"noopener noreferrer"},N=i("<li><p>In the underlying implementation, when pattern is detected as root (default value), synchronization efficiency is higher, and any other format will reduce performance.</p></li><li><p>The path prefix does not need to form a complete path. For example, when creating a pipe with the parameter 'extractor.pattern'='root.aligned.1':</p><ul><li>root.aligned.1TS</li><li>root.aligned.1TS.`1`</li><li>root.aligned.100TS</li></ul><p>the data will be synchronized;</p><ul><li>root.aligned.`1`</li><li>root.aligned.`123`</li></ul><p>the data will not be synchronized.</p></li>",2),O=i(`<blockquote><p>❗️<strong>start-time, end-time parameter description of extractor.history</strong></p><ul><li>start-time, end-time should be in ISO format, such as 2011-12-03T10:15:30 or 2011-12-03T10:15:30+01:00</li></ul></blockquote><blockquote><p>✅ <strong>a piece of data from production to IoTDB contains two key concepts of time</strong></p><ul><li><strong>event time:</strong> the time when the data is actually produced (or the generation time assigned to the data by the data production system, which is a time item in the data point), also called the event time.</li><li><strong>arrival time:</strong> the time the data arrived in the IoTDB system.</li></ul><p>The out-of-order data we often refer to refers to data whose <strong>event time</strong> is far behind the current system time (or the maximum <strong>event time</strong> that has been dropped) when the data arrives. On the other hand, whether it is out-of-order data or sequential data, as long as they arrive newly in the system, their <strong>arrival time</strong> will increase with the order in which the data arrives at IoTDB.</p></blockquote><blockquote><p>💎 <strong>the work of iotdb-extractor can be split into two stages</strong></p><ol><li>Historical data extraction: All data with <strong>arrival time</strong> < <strong>current system time</strong> when creating the pipe is called historical data</li><li>Realtime data extraction: All data with <strong>arrival time</strong> >= <strong>current system time</strong> when the pipe is created is called realtime data</li></ol><p>The historical data transmission phase and the realtime data transmission phase are executed serially. Only when the historical data transmission phase is completed, the realtime data transmission phase is executed.**</p><p>Users can specify iotdb-extractor to:</p><ul><li>Historical data extraction(<code>'extractor.history.enable' = 'true'</code>, <code>'extractor.realtime.enable' = 'false'</code> )</li><li>Realtime data extraction(<code>'extractor.history.enable' = 'false'</code>, <code>'extractor.realtime.enable' = 'true'</code> )</li><li>Full data extraction(<code>'extractor.history.enable' = 'true'</code>, <code>'extractor.realtime.enable' = 'true'</code> )</li><li>Disable simultaneous sets <code>extractor.history.enable</code> and <code>extractor.realtime.enable</code> to <code>false</code></li></ul></blockquote><h3 id="pre-built-processor-plugin" tabindex="-1"><a class="header-anchor" href="#pre-built-processor-plugin"><span>Pre-built Processor Plugin</span></a></h3><h4 id="do-nothing-processor" tabindex="-1"><a class="header-anchor" href="#do-nothing-processor"><span>do-nothing-processor</span></a></h4><p>Function: Do not do anything with the events passed in by the extractor.</p><table><thead><tr><th>key</th><th>value</th><th>value range</th><th>required or optional with default</th></tr></thead><tbody><tr><td>processor</td><td>do-nothing-processor</td><td>String: do-nothing-processor</td><td>required</td></tr></tbody></table><h3 id="pre-connector-plugin" tabindex="-1"><a class="header-anchor" href="#pre-connector-plugin"><span>pre-connector plugin</span></a></h3><h4 id="iotdb-thrift-sync-connector-alias-iotdb-thrift-connector" tabindex="-1"><a class="header-anchor" href="#iotdb-thrift-sync-connector-alias-iotdb-thrift-connector"><span>iotdb-thrift-sync-connector(alias:iotdb-thrift-connector)</span></a></h4><p>Function: Primarily used for data transfer between IoTDB instances (v1.2.0+). Data is transmitted using the Thrift RPC framework and a single-threaded blocking IO model. It guarantees that the receiving end applies the data in the same order as the sending end receives the write requests.</p><p>Limitation: Both the source and target IoTDB versions need to be v1.2.0+.</p><table><thead><tr><th>key</th><th>value</th><th>value range</th><th>required or optional with default</th></tr></thead><tbody><tr><td>connector</td><td>iotdb-thrift-connector or iotdb-thrift-sync-connector</td><td>String: iotdb-thrift-connector or iotdb-thrift-sync-connector</td><td>required</td></tr><tr><td>connector.ip</td><td>the data service IP of one of the DataNode nodes in the target IoTDB</td><td>String</td><td>optional: and connector.node-urls fill in either one</td></tr><tr><td>connector.port</td><td>the data service port of one of the DataNode nodes in the target IoTDB</td><td>Integer</td><td>optional: and connector.node-urls fill in either one</td></tr><tr><td>connector.node-urls</td><td>the URL of the data service port of any multiple DataNode nodes in the target IoTDB</td><td>String。eg:'127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669', '127.0.0.1:6667'</td><td>optional: and connector.ip:connector.port fill in either one</td></tr></tbody></table><blockquote><p>📌 Please ensure that the receiving end has already created all the time series present in the sending end or has enabled automatic metadata creation. Otherwise, it may result in the failure of the pipe operation.</p></blockquote><h4 id="iotdb-thrift-async-connector" tabindex="-1"><a class="header-anchor" href="#iotdb-thrift-async-connector"><span>iotdb-thrift-async-connector</span></a></h4><p>Function: Primarily used for data transfer between IoTDB instances (v1.2.0+).<br> Data is transmitted using the Thrift RPC framework, employing a multi-threaded async non-blocking IO model, resulting in high transfer performance. It is particularly suitable for distributed scenarios on the target end.<br> It does not guarantee that the receiving end applies the data in the same order as the sending end receives the write requests, but it guarantees data integrity (at-least-once).</p><p>Limitation: Both the source and target IoTDB versions need to be v1.2.0+.</p><table><thead><tr><th>key</th><th>value</th><th>value range</th><th>required or optional with default</th></tr></thead><tbody><tr><td>connector</td><td>iotdb-thrift-async-connector</td><td>String: iotdb-thrift-async-connector</td><td>required</td></tr><tr><td>connector.ip</td><td>the data service IP of one of the DataNode nodes in the target IoTDB</td><td>String</td><td>optional: and connector.node-urls fill in either one</td></tr><tr><td>connector.port</td><td>the data service port of one of the DataNode nodes in the target IoTDB</td><td>Integer</td><td>optional: and connector.node-urls fill in either one</td></tr><tr><td>connector.node-urls</td><td>the URL of the data service port of any multiple DataNode nodes in the target IoTDB</td><td>String。eg:'127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669', '127.0.0.1:6667'</td><td>optional: and connector.ip:connector.port fill in either one</td></tr></tbody></table><blockquote><p>📌 Please ensure that the receiving end has already created all the time series present in the sending end or has enabled automatic metadata creation. Otherwise, it may result in the failure of the pipe operation.</p></blockquote><h4 id="iotdb-legacy-pipe-connector" tabindex="-1"><a class="header-anchor" href="#iotdb-legacy-pipe-connector"><span>iotdb-legacy-pipe-connector</span></a></h4><p>Function: Mainly used to transfer data from IoTDB (v1.2.0+) to lower versions of IoTDB, using the data synchronization (Sync) protocol before version v1.2.0.<br> Data is transmitted using the Thrift RPC framework. It employs a single-threaded sync blocking IO model, resulting in weak transfer performance.</p><p>Limitation: The source IoTDB version needs to be v1.2.0+. The target IoTDB version can be either v1.2.0+, v1.1.x (lower versions of IoTDB are theoretically supported but untested).</p><p>Note: In theory, any version prior to v1.2.0 of IoTDB can serve as the data synchronization (Sync) receiver for v1.2.0+.</p><table><thead><tr><th>key</th><th>value</th><th>value range</th><th>required or optional with default</th></tr></thead><tbody><tr><td>connector</td><td>iotdb-legacy-pipe-connector</td><td>string: iotdb-legacy-pipe-connector</td><td>required</td></tr><tr><td>connector.ip</td><td>data service of one DataNode node of the target IoTDB ip</td><td>string</td><td>required</td></tr><tr><td>connector.port</td><td>the data service port of one of the DataNode nodes in the target IoTDB</td><td>integer</td><td>required</td></tr><tr><td>connector.user</td><td>the user name of the target IoTDB. Note that the user needs to support data writing and TsFile Load permissions.</td><td>string</td><td>optional: root</td></tr><tr><td>connector.password</td><td>the password of the target IoTDB. Note that the user needs to support data writing and TsFile Load permissions.</td><td>string</td><td>optional: root</td></tr><tr><td>connector.version</td><td>the version of the target IoTDB, used to disguise its actual version and bypass the version consistency check of the target.</td><td>string</td><td>optional: 1.1</td></tr></tbody></table><blockquote><p>📌 Make sure that the receiver has created all the time series on the sender side, or that automatic metadata creation is turned on, otherwise the pipe run will fail.</p></blockquote><h4 id="do-nothing-connector" tabindex="-1"><a class="header-anchor" href="#do-nothing-connector"><span>do-nothing-connector</span></a></h4><p>Function: Does not do anything with the events passed in by the processor.</p><table><thead><tr><th>key</th><th>value</th><th>value range</th><th>required or optional with default</th></tr></thead><tbody><tr><td>connector</td><td>do-nothing-connector</td><td>String: do-nothing-connector</td><td>required</td></tr></tbody></table><h2 id="authority-management" tabindex="-1"><a class="header-anchor" href="#authority-management"><span>Authority Management</span></a></h2><table><thead><tr><th>Authority Name</th><th>Description</th></tr></thead><tbody><tr><td>CREATE_PIPE</td><td>Register task,path-independent</td></tr><tr><td>START_PIPE</td><td>Start task,path-independent</td></tr><tr><td>STOP_PIPE</td><td>Stop task,path-independent</td></tr><tr><td>DROP_PIPE</td><td>Uninstall task,path-independent</td></tr><tr><td>SHOW_PIPES</td><td>Query task,path-independent</td></tr></tbody></table><h2 id="configure-parameters" tabindex="-1"><a class="header-anchor" href="#configure-parameters"><span>Configure Parameters</span></a></h2><p>In iotdb-common.properties :</p><div class="language-Properties line-numbers-mode" data-ext="Properties" data-title="Properties"><pre class="language-Properties"><code>#################### | 
|  | ### Pipe Configuration | 
|  | #################### | 
|  |  | 
|  | # Uncomment the following field to configure the pipe lib directory. | 
|  | # For Windows platform | 
|  | # If its prefix is a drive specifier followed by "\\\\", or if its prefix is "\\\\\\\\", then the path is | 
|  | # absolute. Otherwise, it is relative. | 
|  | # pipe_lib_dir=ext\\\\pipe | 
|  | # For Linux platform | 
|  | # If its prefix is "/", then the path is absolute. Otherwise, it is relative. | 
|  | # pipe_lib_dir=ext/pipe | 
|  |  | 
|  | # The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor. | 
|  | # The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)). | 
|  | # pipe_subtask_executor_max_thread_num=5 | 
|  |  | 
|  | # The connection timeout (in milliseconds) for the thrift client. | 
|  | # pipe_connector_timeout_ms=900000 | 
|  |  | 
|  | # The maximum number of selectors that can be used in the async connector. | 
|  | # pipe_async_connector_selector_number=1 | 
|  |  | 
|  | # The core number of clients that can be used in the async connector. | 
|  | # pipe_async_connector_core_client_number=8 | 
|  |  | 
|  | # The maximum number of clients that can be used in the async connector. | 
|  | # pipe_async_connector_max_client_number=16 | 
|  | </code></pre><div class="line-numbers" aria-hidden="true"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h2 id="functionality-features" tabindex="-1"><a class="header-anchor" href="#functionality-features"><span>Functionality Features</span></a></h2><h3 id="at-least-one-semantic-guarantee-at-least-once" tabindex="-1"><a class="header-anchor" href="#at-least-one-semantic-guarantee-at-least-once"><span>At least one semantic guarantee <strong>at-least-once</strong></span></a></h3><p>The data synchronization feature provides an at-least-once delivery semantic when transferring data to external systems. In most scenarios, the synchronization feature guarantees exactly-once delivery, ensuring that all data is synchronized exactly once.</p><p>However, in the following scenarios, it is possible for some data to be synchronized multiple times <strong>(due to resumable transmission)</strong>:</p><ul><li>Temporary network failures: If a data transmission request fails, the system will retry sending it until reaching the maximum retry attempts.</li><li>Abnormal implementation of the Pipe plugin logic: If an error is thrown during the plugin's execution, the system will retry sending the data until reaching the maximum retry attempts.</li><li>Data partition switching due to node failures or restarts: After the partition change is completed, the affected data will be retransmitted.</li><li>Cluster unavailability: Once the cluster becomes available again, the affected data will be retransmitted.</li></ul><h3 id="source-data-writing-with-pipe-processing-and-asynchronous-decoupling-of-data-transmission" tabindex="-1"><a class="header-anchor" href="#source-data-writing-with-pipe-processing-and-asynchronous-decoupling-of-data-transmission"><span>Source: Data Writing with Pipe Processing and Asynchronous Decoupling of Data Transmission</span></a></h3><p>In the data synchronization feature, data transfer adopts an asynchronous replication mode.</p><p>Data synchronization is completely decoupled from the writing operation, eliminating any impact on the critical path of writing. This mechanism allows the framework to maintain the writing speed of a time-series database while ensuring continuous data synchronization.</p><h3 id="source-high-availability-of-pipe-service-in-a-highly-available-cluster-deployment" tabindex="-1"><a class="header-anchor" href="#source-high-availability-of-pipe-service-in-a-highly-available-cluster-deployment"><span>Source: High Availability of Pipe Service in a Highly Available Cluster Deployment</span></a></h3><p>When the sender end IoTDB is deployed in a high availability cluster mode, the data synchronization service will also be highly available. The data synchronization framework monitors the data synchronization progress of each data node and periodically takes lightweight distributed consistent snapshots to preserve the synchronization state.</p><ul><li>In the event of a failure of a data node in the sender cluster, the data synchronization framework can leverage the consistent snapshot and the data stored in replicas to quickly recover and resume synchronization, thus achieving high availability of the data synchronization service.</li><li>In the event of a complete failure and restart of the sender cluster, the data synchronization framework can also use snapshots to recover the synchronization service.</li></ul>`,43);function C(R,z){const a=o("font"),r=o("ExternalLinkIcon");return c(),d("div",null,[u,t("p",null,[t("strong",null,[e("The IoTDB data sync transfers data from IoTDB to another data platform, and "),n(a,{color:"RED"},{default:s(()=>[e("a data sync task is called a Pipe")]),_:1}),e(".")])]),h,t("table",null,[m,t("tbody",null,[t("tr",null,[g,b,t("td",null,[n(a,{color:"red"},{default:s(()=>[e("required")]),_:1})]),v,k,f]),y,t("tr",null,[T,w,x,P,I,t("td",null,[n(a,{color:"red"},{default:s(()=>[e("yes")]),_:1})])]),t("tr",null,[D,E,t("td",null,[n(a,{color:"red"},{default:s(()=>[e("required")]),_:1})]),q,_,t("td",null,[n(a,{color:"red"},{default:s(()=>[e("yes")]),_:1})])])])]),S,t("blockquote",null,[B,t("ul",null,[t("li",null,[t("p",null,[e("Pattern should use backquotes to modify illegal characters or illegal path nodes, for example, if you want to filter root.`a@b` or root.`123`, you should set the pattern to root.`a@b` or root.`123`(Refer specifically to "),t("a",A,[e("Timing of single and double quotes and backquotes"),n(r)]),e(")")])]),N])]),O])}const U=l(p,[["render",C],["__file","Data-Sync.html.vue"]]),W=JSON.parse('{"path":"/UserGuide/V1.2.x/User-Manual/Data-Sync.html","title":"IoTDB Data Sync","lang":"en-US","frontmatter":{"description":"IoTDB Data Sync The IoTDB data sync transfers data from IoTDB to another data platform, and . A Pipe consists of three subtasks (plugins): Extract Process Connect Pipe allows us...","head":[["link",{"rel":"alternate","hreflang":"zh-cn","href":"https://iotdb.apache.org/zh/UserGuide/V1.2.x/User-Manual/Data-Sync.html"}],["meta",{"property":"og:url","content":"https://iotdb.apache.org/UserGuide/V1.2.x/User-Manual/Data-Sync.html"}],["meta",{"property":"og:site_name","content":"IoTDB Website"}],["meta",{"property":"og:title","content":"IoTDB Data Sync"}],["meta",{"property":"og:description","content":"IoTDB Data Sync The IoTDB data sync transfers data from IoTDB to another data platform, and . A Pipe consists of three subtasks (plugins): Extract Process Connect Pipe allows us..."}],["meta",{"property":"og:type","content":"article"}],["meta",{"property":"og:image","content":"https://alioss.timecho.com/upload/pipe.png"}],["meta",{"property":"og:locale","content":"en-US"}],["meta",{"property":"og:locale:alternate","content":"zh-CN"}],["meta",{"property":"og:updated_time","content":"2024-04-08T07:45:44.000Z"}],["meta",{"property":"article:modified_time","content":"2024-04-08T07:45:44.000Z"}],["script",{"type":"application/ld+json"},"{\\"@context\\":\\"https://schema.org\\",\\"@type\\":\\"Article\\",\\"headline\\":\\"IoTDB Data Sync\\",\\"image\\":[\\"https://alioss.timecho.com/upload/pipe.png\\",\\"https://alioss.timecho.com/docs/img/%E7%8A%B6%E6%80%81%E8%BF%81%E7%A7%BB%E5%9B%BE.png\\"],\\"dateModified\\":\\"2024-04-08T07:45:44.000Z\\",\\"author\\":[]}"]]},"headers":[{"level":2,"title":"Quick Start","slug":"quick-start","link":"#quick-start","children":[]},{"level":2,"title":"Sync Task Management","slug":"sync-task-management","link":"#sync-task-management","children":[{"level":3,"title":"Create a sync task","slug":"create-a-sync-task","link":"#create-a-sync-task","children":[]},{"level":3,"title":"START TASK","slug":"start-task","link":"#start-task","children":[]},{"level":3,"title":"STOP TASK","slug":"stop-task","link":"#stop-task","children":[]},{"level":3,"title":"DELETE TASK","slug":"delete-task","link":"#delete-task","children":[]},{"level":3,"title":"SHOW TASK","slug":"show-task","link":"#show-task","children":[]},{"level":3,"title":"Task Running Status Migration","slug":"task-running-status-migration","link":"#task-running-status-migration","children":[]}]},{"level":2,"title":"System Pre-built Data Sync Plugin","slug":"system-pre-built-data-sync-plugin","link":"#system-pre-built-data-sync-plugin","children":[{"level":3,"title":"View pre-built plugin","slug":"view-pre-built-plugin","link":"#view-pre-built-plugin","children":[]},{"level":3,"title":"Pre-built Extractor Plugin","slug":"pre-built-extractor-plugin","link":"#pre-built-extractor-plugin","children":[]},{"level":3,"title":"Pre-built Processor Plugin","slug":"pre-built-processor-plugin","link":"#pre-built-processor-plugin","children":[]},{"level":3,"title":"pre-connector plugin","slug":"pre-connector-plugin","link":"#pre-connector-plugin","children":[]}]},{"level":2,"title":"Authority Management","slug":"authority-management","link":"#authority-management","children":[]},{"level":2,"title":"Configure Parameters","slug":"configure-parameters","link":"#configure-parameters","children":[]},{"level":2,"title":"Functionality Features","slug":"functionality-features","link":"#functionality-features","children":[{"level":3,"title":"At least one semantic guarantee at-least-once","slug":"at-least-one-semantic-guarantee-at-least-once","link":"#at-least-one-semantic-guarantee-at-least-once","children":[]},{"level":3,"title":"Source: Data Writing with Pipe Processing and Asynchronous Decoupling of Data Transmission","slug":"source-data-writing-with-pipe-processing-and-asynchronous-decoupling-of-data-transmission","link":"#source-data-writing-with-pipe-processing-and-asynchronous-decoupling-of-data-transmission","children":[]},{"level":3,"title":"Source: High Availability of Pipe Service in a Highly Available Cluster Deployment","slug":"source-high-availability-of-pipe-service-in-a-highly-available-cluster-deployment","link":"#source-high-availability-of-pipe-service-in-a-highly-available-cluster-deployment","children":[]}]}],"git":{"createdTime":1696932526000,"updatedTime":1712562344000,"contributors":[{"name":"wanghui42","email":"105700158+wanghui42@users.noreply.github.com","commits":3},{"name":"Caideyipi","email":"87789683+Caideyipi@users.noreply.github.com","commits":1}]},"readingTime":{"minutes":10.98,"words":3293},"filePathRelative":"UserGuide/V1.2.x/User-Manual/Data-Sync.md","localizedDate":"October 10, 2023","autoDesc":true}');export{U as comp,W as data}; |