blob: 63632e28d7f9403e1b01506d266c3c7f10e82efd [file] [log] [blame]
import{_ as n,c as i,b as s,d as a,e,a as d,w as r,r as o,o as c}from"./app-W3EENNaa.js";const p={};function u(g,t){const l=o("RouteLink");return c(),i("div",null,[t[3]||(t[3]=s(`<h1 id="数据同步" tabindex="-1"><a class="header-anchor" href="#数据同步"><span>数据同步</span></a></h1><p>数据同步是工业物联网的典型需求,通过数据同步机制,可实现 IoTDB 之间的数据共享,搭建完整的数据链路来满足内网外网数据互通、端边云同步、数据迁移、数据备份等需求。</p><h2 id="功能概述" tabindex="-1"><a class="header-anchor" href="#功能概述"><span>功能概述</span></a></h2><h3 id="数据同步-1" tabindex="-1"><a class="header-anchor" href="#数据同步-1"><span>数据同步</span></a></h3><p>一个数据同步任务包含 3 个阶段:</p><figure><img src="https://alioss.timecho.com/docs/img/dataSync01.png" alt="" tabindex="0" loading="lazy"><figcaption></figcaption></figure><ul><li>抽取(Source)阶段:该部分用于从源 IoTDB 抽取数据,在 SQL 语句中的 source 部分定义</li><li>处理(Process)阶段:该部分用于处理从源 IoTDB 抽取出的数据,在 SQL 语句中的 processor 部分定义</li><li>发送(Sink)阶段:该部分用于向目标 IoTDB 发送数据,在 SQL 语句中的 sink 部分定义</li></ul><p>通过 SQL 语句声明式地配置 3 个部分的具体内容,可实现灵活的数据同步能力。目前数据同步支持以下信息的同步,您可以在创建同步任务时对同步范围进行选择(默认选择 data.insert,即同步新写入的数据):</p><table style="text-align:left;"><tr><th>同步范围</th><th>同步内容</th><th>说明</th></tr><tr><td colspan="2">all</td><td>所有范围</td></tr><tr><td rowspan="2">data(数据)</td><td>insert(增量)</td><td>同步新写入的数据</td></tr><tr><td>delete(删除)</td><td>同步被删除的数据</td></tr><tr><td rowspan="3">schema(元数据)</td><td>database(数据库)</td><td>同步数据库的创建、修改或删除操作</td></tr><tr><td>timeseries(时间序列)</td><td>同步时间序列的定义和属性</td></tr><tr><td>TTL(数据到期时间)</td><td>同步数据的存活时间</td></tr><tr><td>auth(权限)</td><td>-</td><td>同步用户权限和访问控制</td></tr></table><h3 id="功能限制及说明" tabindex="-1"><a class="header-anchor" href="#功能限制及说明"><span>功能限制及说明</span></a></h3><p>元数据(schema)、权限(auth)同步功能存在如下限制:</p><ul><li><p>使用元数据同步时,要求<code>Schema region</code>、<code>ConfigNode</code> 的共识协议必须为默认的 ratis 协议,即:<code>iotdb-common.properties</code>配置文件中<code>config_node_consensus_protocol_class</code>和<code>schema_region_consensus_protocol_class</code>配置项均为<code>org.apache.iotdb.consensus.ratis.RatisConsensus</code></p></li><li><p>为了防止潜在的冲突,请在开启元数据同步时关闭接收端自动创建元数据功能。可通过修改 <code>iotdb-common.properties</code>配置文件中的<code>enable_auto_create_schema</code>配置项为 false,关闭元数据自动创建功能。</p></li><li><p>开启元数据同步时,不支持使用自定义插件。</p></li><li><p>双活集群中元数据同步需避免两端同时操作。</p></li><li><p>在进行数据同步任务时,请避免执行任何删除操作,防止两端状态不一致。</p></li></ul><h2 id="使用说明" tabindex="-1"><a class="header-anchor" href="#使用说明"><span>使用说明</span></a></h2><p>数据同步任务有三种状态:RUNNING、STOPPED 和 DROPPED。任务状态转换如下图所示:</p><figure><img src="https://alioss.timecho.com/docs/img/dataSync02.png" alt="" tabindex="0" loading="lazy"><figcaption></figcaption></figure><p>提供以下 SQL 语句对同步任务进行状态管理。</p><h3 id="创建任务" tabindex="-1"><a class="header-anchor" href="#创建任务"><span>创建任务</span></a></h3><p>使用 <code>CREATE PIPE</code> 语句来创建一条数据同步任务,下列属性中<code>PipeId</code>和<code>sink</code>必填,<code>source</code>和<code>processor</code>为选填项,输入 SQL 时注意 <code>SOURCE</code>与 <code>SINK</code> 插件顺序不能替换。</p><p>SQL 示例如下:</p><div class="language-SQL line-numbers-mode" data-highlighter="prismjs" data-ext="SQL" data-title="SQL"><pre><code><span class="line">CREATE PIPE &lt;PipeId&gt; -- PipeId 是能够唯一标定任务任务的名字</span>
<span class="line">-- 数据抽取插件,可选插件</span>
<span class="line">WITH SOURCE (</span>
<span class="line"> [&lt;parameter&gt; = &lt;value&gt;,],</span>
<span class="line">)</span>
<span class="line">-- 数据处理插件,可选插件</span>
<span class="line">WITH PROCESSOR (</span>
<span class="line"> [&lt;parameter&gt; = &lt;value&gt;,],</span>
<span class="line">)</span>
<span class="line">-- 数据连接插件,必填插件</span>
<span class="line">WITH SINK (</span>
<span class="line"> [&lt;parameter&gt; = &lt;value&gt;,],</span>
<span class="line">)</span>
<span class="line"></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="开始任务" tabindex="-1"><a class="header-anchor" href="#开始任务"><span>开始任务</span></a></h3><p>创建之后,任务不会立即被处理,需要启动任务。使用<code>START PIPE</code>语句来启动任务,从而开始处理数据:</p><div class="language-SQL line-numbers-mode" data-highlighter="prismjs" data-ext="SQL" data-title="SQL"><pre><code><span class="line">START PIPE&lt;PipeId&gt;</span>
<span class="line"></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div></div></div><h3 id="停止任务" tabindex="-1"><a class="header-anchor" href="#停止任务"><span>停止任务</span></a></h3><p>停止处理数据:</p><div class="language-SQL line-numbers-mode" data-highlighter="prismjs" data-ext="SQL" data-title="SQL"><pre><code><span class="line">STOP PIPE &lt;PipeId&gt;</span>
<span class="line"></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div></div></div><h3 id="删除任务" tabindex="-1"><a class="header-anchor" href="#删除任务"><span>删除任务</span></a></h3><p>删除指定任务:</p><div class="language-SQL line-numbers-mode" data-highlighter="prismjs" data-ext="SQL" data-title="SQL"><pre><code><span class="line">DROP PIPE &lt;PipeId&gt;</span>
<span class="line"></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div></div></div><p>删除任务不需要先停止同步任务。</p><h3 id="查看任务" tabindex="-1"><a class="header-anchor" href="#查看任务"><span>查看任务</span></a></h3><p>查看全部任务:</p><div class="language-SQL line-numbers-mode" data-highlighter="prismjs" data-ext="SQL" data-title="SQL"><pre><code><span class="line">SHOW PIPES</span>
<span class="line"></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div></div></div><p>查看指定任务:</p><div class="language-SQL line-numbers-mode" data-highlighter="prismjs" data-ext="SQL" data-title="SQL"><pre><code><span class="line">SHOW PIPE &lt;PipeId&gt;</span>
<span class="line"></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div></div></div><p>pipe show pipes 结果示例:</p><div class="language-SQL line-numbers-mode" data-highlighter="prismjs" data-ext="SQL" data-title="SQL"><pre><code><span class="line">+--------------------------------+-----------------------+-------+---------------+--------------------+------------------------------------------------------------+----------------+</span>
<span class="line">| ID| CreationTime| State| PipeSource| PipeProcessor| PipeSink|ExceptionMessage|</span>
<span class="line">+--------------------------------+-----------------------+-------+---------------+--------------------+------------------------------------------------------------+----------------+</span>
<span class="line">|3421aacb16ae46249bac96ce4048a220|2024-08-13T09:55:18.717|RUNNING| {}| {}|{{sink=iotdb-thrift-sink, sink.ip=127.0.0.1, sink.port=6668}}| |</span>
<span class="line">+--------------------------------+-----------------------+-------+---------------+--------------------+------------------------------------------------------------+----------------+</span>
<span class="line"></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>其中各列含义如下:</p><ul><li><strong>ID</strong>:同步任务的唯一标识符</li><li><strong>CreationTime</strong>:同步任务的创建的时间</li><li><strong>State</strong>:同步任务的状态</li><li><strong>PipeSource</strong>:同步数据流的来源</li><li><strong>PipeProcessor</strong>:同步数据流在传输过程中的处理逻辑</li><li><strong>PipeSink</strong>:同步数据流的目的地</li><li><strong>ExceptionMessage</strong>:显示同步任务的异常信息</li></ul><h3 id="同步插件" tabindex="-1"><a class="header-anchor" href="#同步插件"><span>同步插件</span></a></h3><p>为了使得整体架构更加灵活以匹配不同的同步场景需求,我们支持在同步任务框架中进行插件组装。系统为您预置了一些常用插件可直接使用,同时您也可以自定义 processor 插件 和 Sink 插件,并加载至 IoTDB 系统进行使用。查看系统中的插件(含自定义与内置插件)可以用以下语句:</p><div class="language-SQL line-numbers-mode" data-highlighter="prismjs" data-ext="SQL" data-title="SQL"><pre><code><span class="line">SHOW PIPEPLUGINS</span>
<span class="line"></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div></div></div><p>返回结果如下(1.3.2 版本):</p><div class="language-SQL line-numbers-mode" data-highlighter="prismjs" data-ext="SQL" data-title="SQL"><pre><code><span class="line">IoTDB&gt; SHOW PIPEPLUGINS</span>
<span class="line">+---------------------+----------+-------------------------------------------------------------------------------------------+----------------------------------------------------+</span>
<span class="line">| PluginName|PluginType| ClassName| PluginJar|</span>
<span class="line">+---------------------+----------+-------------------------------------------------------------------------------------------+----------------------------------------------------+</span>
<span class="line">| DO-NOTHING-PROCESSOR| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.processor.donothing.DoNothingProcessor| |</span>
<span class="line">| DO-NOTHING-SINK| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.connector.donothing.DoNothingConnector| |</span>
<span class="line">| IOTDB-AIR-GAP-SINK| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.airgap.IoTDBAirGapConnector| |</span>
<span class="line">| IOTDB-SOURCE| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.extractor.iotdb.IoTDBExtractor| |</span>
<span class="line">| IOTDB-THRIFT-SINK| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.thrift.IoTDBThriftConnector| |</span>
<span class="line">|IOTDB-THRIFT-SSL-SINK| Builtin|org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.thrift.IoTDBThriftSslConnector| |</span>
<span class="line">+---------------------+----------+-------------------------------------------------------------------------------------------+----------------------------------------------------+</span>
<span class="line"></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>预置插件详细介绍如下(各插件的详细参数可参考本文<a href="#%E5%8F%82%E8%80%83%E5%8F%82%E6%95%B0%E8%AF%B4%E6%98%8E">参数说明</a>):</p><table style="text-align:left;"><tr><th>类型</th><th>自定义插件</th><th>插件名称</th><th>介绍</th><th>适用版本</th></tr><tr><td>source 插件</td><td>不支持</td><td>iotdb-source</td><td>默认的 extractor 插件,用于抽取 IoTDB 历史或实时数据</td><td>1.2.x</td></tr><tr><td>processor 插件</td><td>支持</td><td>do-nothing-processor</td><td>默认的 processor 插件,不对传入的数据做任何的处理</td><td>1.2.x</td></tr><tr><td rowspan="4">sink 插件</td><td rowspan="4">支持</td><td>do-nothing-sink</td><td>不对发送出的数据做任何的处理</td><td>1.2.x</td></tr><tr><td>iotdb-thrift-sink</td><td>默认的 sink 插件(V1.3.1及以上),用于 IoTDB(V1.2.0 及以上)与 IoTDB(V1.2.0 及以上)之间的数据传输。使用 Thrift RPC 框架传输数据,多线程 async non-blocking IO 模型,传输性能高,尤其适用于目标端为分布式时的场景</td><td>1.2.x</td></tr><tr><td>iotdb-air-gap-sink</td><td>用于 IoTDB(V1.2.2 及以上)向 IoTDB(V1.2.2 及以上)跨单向数据网闸的数据同步。支持的网闸型号包括南瑞 Syskeeper 2000 等</td><td>1.2.x</td></tr><tr><td>iotdb-thrift-ssl-sink</td><td>用于 IoTDB(V1.3.1 及以上)与 IoTDB(V1.2.0 及以上)之间的数据传输。使用 Thrift RPC 框架传输数据,单线程 sync blocking IO 模型,适用于安全需求较高的场景 </td><td>1.3.1+</td></tr></table>`,46)),a("p",null,[t[1]||(t[1]=e("导入自定义插件可参考")),d(l,{to:"/zh/UserGuide/latest/User-Manual/Streaming_timecho.html#%E8%87%AA%E5%AE%9A%E4%B9%89%E6%B5%81%E5%A4%84%E7%90%86%E6%8F%92%E4%BB%B6%E7%AE%A1%E7%90%86"},{default:r(()=>t[0]||(t[0]=[e("流处理框架")])),_:1}),t[2]||(t[2]=e("章节。"))]),t[4]||(t[4]=s(`<h2 id="使用示例" tabindex="-1"><a class="header-anchor" href="#使用示例"><span>使用示例</span></a></h2><h3 id="全量数据同步" tabindex="-1"><a class="header-anchor" href="#全量数据同步"><span>全量数据同步</span></a></h3><p>本例子用来演示将一个 IoTDB 的所有数据同步至另一个 IoTDB,数据链路如下图所示:</p><figure><img src="https://alioss.timecho.com/docs/img/数据同步1.png" alt="" tabindex="0" loading="lazy"><figcaption></figcaption></figure><p>在这个例子中,我们可以创建一个名为 A2B 的同步任务,用来同步 A IoTDB 到 B IoTDB 间的全量数据,这里需要用到用到 sink 的 iotdb-thrift-sink 插件(内置插件),需通过 node-urls 配置目标端 IoTDB 中 DataNode 节点的数据服务端口的 url,如下面的示例语句:</p><div class="language-SQL line-numbers-mode" data-highlighter="prismjs" data-ext="SQL" data-title="SQL"><pre><code><span class="line">create pipe A2B</span>
<span class="line">with sink (</span>
<span class="line"> &#39;sink&#39;=&#39;iotdb-thrift-sink&#39;,</span>
<span class="line"> &#39;node-urls&#39; = &#39;127.0.0.1:6668&#39;, -- 目标端 IoTDB DataNode 节点的数据服务端口的 url</span>
<span class="line">)</span>
<span class="line"></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="部分数据同步" tabindex="-1"><a class="header-anchor" href="#部分数据同步"><span>部分数据同步</span></a></h3><p>本例子用来演示同步某个历史时间范围( 2023 年 8 月 23 日 8 点到 2023 年 10 月 23 日 8 点)的数据至另一个 IoTDB,数据链路如下图所示:</p><figure><img src="https://alioss.timecho.com/docs/img/数据同步1.png" alt="" tabindex="0" loading="lazy"><figcaption></figcaption></figure><p>在这个例子中,我们可以创建一个名为 A2B 的同步任务。首先我们需要在 source 中定义传输数据的范围,由于传输的是历史数据(历史数据是指同步任务创建之前存在的数据),需要配置数据的起止时间 start-time end-time 以及传输的模式 mode。通过 node-urls 配置目标端 IoTDB DataNode 节点的数据服务端口的 url。</p><p>详细语句如下:</p><div class="language-SQL line-numbers-mode" data-highlighter="prismjs" data-ext="SQL" data-title="SQL"><pre><code><span class="line">create pipe A2B</span>
<span class="line">WITH SOURCE (</span>
<span class="line"> &#39;source&#39;= &#39;iotdb-source&#39;,</span>
<span class="line"> &#39;realtime.mode&#39; = &#39;stream&#39; -- 新插入数据(pipe创建后)的抽取模式</span>
<span class="line"> &#39;start-time&#39; = &#39;2023.08.23T08:00:00+00:00&#39;, -- 同步所有数据的开始 event time,包含 start-time</span>
<span class="line"> &#39;end-time&#39; = &#39;2023.10.23T08:00:00+00:00&#39; -- 同步所有数据的结束 event time,包含 end-time</span>
<span class="line">) </span>
<span class="line">with SINK (</span>
<span class="line"> &#39;sink&#39;=&#39;iotdb-thrift-async-sink&#39;,</span>
<span class="line"> &#39;node-urls&#39; = &#39;127.0.0.1:6668&#39;, -- 目标端 IoTDB DataNode 节点的数据服务端口的 url</span>
<span class="line">)</span>
<span class="line"></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="双向数据传输" tabindex="-1"><a class="header-anchor" href="#双向数据传输"><span>双向数据传输</span></a></h3><p>本例子用来演示两个 IoTDB 之间互为双活的场景,数据链路如下图所示:</p><figure><img src="https://alioss.timecho.com/docs/img/1706698592139.jpg" alt="" tabindex="0" loading="lazy"><figcaption></figcaption></figure><p>在这个例子中,为了避免数据无限循环,需要将 A B 上的参数<code>forwarding-pipe-requests</code> 均设置为 <code>false</code>,表示不转发从另一 pipe 传输而来的数据,以及要保持两侧的数据一致 pipe 需要配置<code>inclusion=all</code>来同步全量数据和元数据。</p><p>详细语句如下:</p><p>在 A IoTDB 上执行下列语句:</p><div class="language-SQL line-numbers-mode" data-highlighter="prismjs" data-ext="SQL" data-title="SQL"><pre><code><span class="line">create pipe AB</span>
<span class="line">with source (</span>
<span class="line"> &#39;inclusion&#39;=&#39;all&#39;, -- 表示同步全量数据、元数据和权限</span>
<span class="line"> &#39;forwarding-pipe-requests&#39; = &#39;false&#39; --不转发由其他 Pipe 写入的数据</span>
<span class="line">)</span>
<span class="line">with sink (</span>
<span class="line"> &#39;sink&#39;=&#39;iotdb-thrift-sink&#39;,</span>
<span class="line"> &#39;node-urls&#39; = &#39;127.0.0.1:6668&#39;, -- 目标端 IoTDB DataNode 节点的数据服务端口的 url</span>
<span class="line">)</span>
<span class="line"></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>在 B IoTDB 上执行下列语句:</p><div class="language-SQL line-numbers-mode" data-highlighter="prismjs" data-ext="SQL" data-title="SQL"><pre><code><span class="line">create pipe BA</span>
<span class="line">with source (</span>
<span class="line"> &#39;inclusion&#39;=&#39;all&#39;, -- 表示同步全量数据、元数据和权限</span>
<span class="line"> &#39;forwarding-pipe-requests&#39; = &#39;false&#39; --是否转发由其他 Pipe 写入的数据</span>
<span class="line">)</span>
<span class="line">with sink (</span>
<span class="line"> &#39;sink&#39;=&#39;iotdb-thrift-sink&#39;,</span>
<span class="line"> &#39;node-urls&#39; = &#39;127.0.0.1:6667&#39;, -- 目标端 IoTDB DataNode 节点的数据服务端口的 url</span>
<span class="line">)</span>
<span class="line"></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="边云数据传输" tabindex="-1"><a class="header-anchor" href="#边云数据传输"><span>边云数据传输</span></a></h3><p>本例子用来演示多个 IoTDB 之间边云传输数据的场景,数据由 B 、C、D 集群分别都同步至 A 集群,数据链路如下图所示:</p><figure><img src="https://alioss.timecho.com/docs/img/dataSync03.png" alt="" tabindex="0" loading="lazy"><figcaption></figcaption></figure><p>在这个例子中,为了将 B CD 集群的数据同步至 A,在 BA CADA 之间的 pipe 需要配置<code>path</code>限制范围,以及要保持边侧和云侧的数据一致 pipe 需要配置<code>inclusion=all</code>来同步全量数据和元数据,详细语句如下:</p><p>在 B IoTDB 上执行下列语句,将 B 中数据同步至 A:</p><div class="language-SQL line-numbers-mode" data-highlighter="prismjs" data-ext="SQL" data-title="SQL"><pre><code><span class="line">create pipe BA</span>
<span class="line">with source (</span>
<span class="line"> &#39;inclusion&#39;=&#39;all&#39;, -- 表示同步全量数据、元数据和权限</span>
<span class="line"> &#39;path&#39;=&#39;root.db.**&#39;, -- 限制范围</span>
<span class="line">)</span>
<span class="line">with sink (</span>
<span class="line"> &#39;sink&#39;=&#39;iotdb-thrift-sink&#39;,</span>
<span class="line"> &#39;node-urls&#39; = &#39;127.0.0.1:6668&#39;, -- 目标端 IoTDB DataNode 节点的数据服务端口的 url</span>
<span class="line">)</span>
<span class="line"></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>在 C IoTDB 上执行下列语句,将 C 中数据同步至 A:</p><div class="language-SQL line-numbers-mode" data-highlighter="prismjs" data-ext="SQL" data-title="SQL"><pre><code><span class="line">create pipe CA</span>
<span class="line">with source (</span>
<span class="line"> &#39;inclusion&#39;=&#39;all&#39;, -- 表示同步全量数据、元数据和权限</span>
<span class="line"> &#39;path&#39;=&#39;root.db.**&#39;, -- 限制范围</span>
<span class="line">)</span>
<span class="line">with sink (</span>
<span class="line"> &#39;sink&#39;=&#39;iotdb-thrift-sink&#39;,</span>
<span class="line"> &#39;node-urls&#39; = &#39;127.0.0.1:6668&#39;, -- 目标端 IoTDB DataNode 节点的数据服务端口的 url</span>
<span class="line">)</span>
<span class="line"></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>在 D IoTDB 上执行下列语句,将 D 中数据同步至 A:</p><div class="language-SQL line-numbers-mode" data-highlighter="prismjs" data-ext="SQL" data-title="SQL"><pre><code><span class="line">create pipe DA</span>
<span class="line">with source (</span>
<span class="line"> &#39;inclusion&#39;=&#39;all&#39;, -- 表示同步全量数据、元数据和权限</span>
<span class="line"> &#39;path&#39;=&#39;root.db.**&#39;, -- 限制范围</span>
<span class="line">)</span>
<span class="line">with sink (</span>
<span class="line"> &#39;sink&#39;=&#39;iotdb-thrift-sink&#39;,</span>
<span class="line"> &#39;node-urls&#39; = &#39;127.0.0.1:6668&#39;, -- 目标端 IoTDB DataNode 节点的数据服务端口的 url</span>
<span class="line">)</span>
<span class="line"></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="级联数据传输" tabindex="-1"><a class="header-anchor" href="#级联数据传输"><span>级联数据传输</span></a></h3><p>本例子用来演示多个 IoTDB 之间级联传输数据的场景,数据由 A 集群同步至 B 集群,再同步至 C 集群,数据链路如下图所示:</p><figure><img src="https://alioss.timecho.com/docs/img/1706698610134.jpg" alt="" tabindex="0" loading="lazy"><figcaption></figcaption></figure><p>在这个例子中,为了将 A 集群的数据同步至 C,在 BC 之间的 pipe 需要将 <code>forwarding-pipe-requests</code> 配置为<code>true</code>,详细语句如下:</p><p>在 A IoTDB 上执行下列语句,将 A 中数据同步至 B:</p><div class="language-SQL line-numbers-mode" data-highlighter="prismjs" data-ext="SQL" data-title="SQL"><pre><code><span class="line">create pipe AB</span>
<span class="line">with sink (</span>
<span class="line"> &#39;sink&#39;=&#39;iotdb-thrift-sink&#39;,</span>
<span class="line"> &#39;node-urls&#39; = &#39;127.0.0.1:6668&#39;, -- 目标端 IoTDB DataNode 节点的数据服务端口的 url</span>
<span class="line">)</span>
<span class="line"></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><p>在 B IoTDB 上执行下列语句,将 B 中数据同步至 C:</p><div class="language-SQL line-numbers-mode" data-highlighter="prismjs" data-ext="SQL" data-title="SQL"><pre><code><span class="line">create pipe BC</span>
<span class="line">with source (</span>
<span class="line"> &#39;forwarding-pipe-requests&#39; = &#39;true&#39; --是否转发由其他 Pipe 写入的数据</span>
<span class="line">)</span>
<span class="line">with sink (</span>
<span class="line"> &#39;sink&#39;=&#39;iotdb-thrift-sink&#39;,</span>
<span class="line"> &#39;node-urls&#39; = &#39;127.0.0.1:6669&#39;, -- 目标端 IoTDB DataNode 节点的数据服务端口的 url</span>
<span class="line">)</span>
<span class="line"></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="跨网闸数据传输" tabindex="-1"><a class="header-anchor" href="#跨网闸数据传输"><span>跨网闸数据传输</span></a></h3><p>本例子用来演示将一个 IoTDB 的数据,经过单向网闸,同步至另一个 IoTDB 的场景,数据链路如下图所示:</p><figure><img src="https://alioss.timecho.com/docs/img/数据传输1.png" alt="" tabindex="0" loading="lazy"><figcaption></figcaption></figure><p>在这个例子中,需要使用 sink 任务中的 iotdb-air-gap-sink 插件(目前支持部分型号网闸,具体型号请联系天谋科技工作人员确认),配置网闸后,在 A IoTDB 上执行下列语句,其中 node-urls 填写网闸配置的目标端 IoTDB 中 DataNode 节点的数据服务端口的 url,详细语句如下:</p><div class="language-SQL line-numbers-mode" data-highlighter="prismjs" data-ext="SQL" data-title="SQL"><pre><code><span class="line">create pipe A2B</span>
<span class="line">with sink (</span>
<span class="line"> &#39;sink&#39;=&#39;iotdb-air-gap-sink&#39;,</span>
<span class="line"> &#39;node-urls&#39; = &#39;10.53.53.53:9780&#39;, -- 目标端 IoTDB DataNode 节点的数据服务端口的 url</span>
<span class="line">)</span>
<span class="line"></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="压缩同步-v1-3-2" tabindex="-1"><a class="header-anchor" href="#压缩同步-v1-3-2"><span>压缩同步(V1.3.2+ )</span></a></h3><p>IoTDB 支持在同步过程中指定数据压缩方式。可通过配置 <code>compressor</code> 参数,实现数据的实时压缩和传输。<code>compressor</code>目前支持 snappy / gzip / lz4 / zstd / lzma2 5 种可选算法,且可以选择多种压缩算法组合,按配置的顺序进行压缩。</p><p>如创建一个名为 A2B 的同步任务:</p><div class="language-SQL line-numbers-mode" data-highlighter="prismjs" data-ext="SQL" data-title="SQL"><pre><code><span class="line">create pipe A2B </span>
<span class="line">with sink (</span>
<span class="line"> &#39;node-urls&#39; = &#39;127.0.0.1:6668&#39;, -- 目标端 IoTDB DataNode 节点的数据服务端口的 url</span>
<span class="line"> &#39;compressor&#39; = &#39;snappy,lz4&#39; -- 压缩算法</span>
<span class="line">)</span>
<span class="line"></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h3 id="加密同步-v1-3-1" tabindex="-1"><a class="header-anchor" href="#加密同步-v1-3-1"><span>加密同步(V1.3.1+ )</span></a></h3><p>IoTDB 支持在同步过程中使用 SSL 加密,从而在不同的 IoTDB 实例之间安全地传输数据。通过配置 SSL 相关的参数,如证书地址和密码(<code>ssl.trust-store-path</code>)、(<code>ssl.trust-store-pwd</code>)可以确保数据在同步过程中被 SSL 加密所保护。</p><p>如创建名为 A2B 的同步任务:</p><div class="language-SQL line-numbers-mode" data-highlighter="prismjs" data-ext="SQL" data-title="SQL"><pre><code><span class="line">create pipe A2B</span>
<span class="line">with sink (</span>
<span class="line"> &#39;sink&#39;=&#39;iotdb-thrift-ssl-sink&#39;,</span>
<span class="line"> &#39;node-urls&#39;=&#39;127.0.0.1:6667&#39;, -- 目标端 IoTDB DataNode 节点的数据服务端口的 url</span>
<span class="line"> &#39;ssl.trust-store-path&#39;=&#39;pki/trusted&#39;, -- 连接目标端 DataNode 所需的 trust store 证书路径</span>
<span class="line"> &#39;ssl.trust-store-pwd&#39;=&#39;root&#39; -- 连接目标端 DataNode 所需的 trust store 证书密码</span>
<span class="line">)</span>
<span class="line"></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h2 id="参考-注意事项" tabindex="-1"><a class="header-anchor" href="#参考-注意事项"><span>参考:注意事项</span></a></h2><p>可通过修改 IoTDB 配置文件(<code>iotdb-common.properties</code>)以调整数据同步的参数,如同步数据存储目录等。完整配置如下::</p><p>V1.3.0/1/2:</p><div class="language-Properties line-numbers-mode" data-highlighter="prismjs" data-ext="Properties" data-title="Properties"><pre><code><span class="line">####################</span>
<span class="line">### Pipe Configuration</span>
<span class="line">####################</span>
<span class="line"></span>
<span class="line"># Uncomment the following field to configure the pipe lib directory.</span>
<span class="line"># For Windows platform</span>
<span class="line"># If its prefix is a drive specifier followed by &quot;\\\\&quot;, or if its prefix is &quot;\\\\\\\\&quot;, then the path is</span>
<span class="line"># absolute. Otherwise, it is relative.</span>
<span class="line"># pipe_lib_dir=ext\\\\pipe</span>
<span class="line"># For Linux platform</span>
<span class="line"># If its prefix is &quot;/&quot;, then the path is absolute. Otherwise, it is relative.</span>
<span class="line"># pipe_lib_dir=ext/pipe</span>
<span class="line"></span>
<span class="line"># The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor.</span>
<span class="line"># The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)).</span>
<span class="line"># pipe_subtask_executor_max_thread_num=5</span>
<span class="line"></span>
<span class="line"># The connection timeout (in milliseconds) for the thrift client.</span>
<span class="line"># pipe_sink_timeout_ms=900000</span>
<span class="line"></span>
<span class="line"># The maximum number of selectors that can be used in the sink.</span>
<span class="line"># Recommend to set this value to less than or equal to pipe_sink_max_client_number.</span>
<span class="line"># pipe_sink_selector_number=4</span>
<span class="line"></span>
<span class="line"># The maximum number of clients that can be used in the sink.</span>
<span class="line"># pipe_sink_max_client_number=16</span>
<span class="line"></span>
<span class="line"># Whether to enable receiving pipe data through air gap.</span>
<span class="line"># The receiver can only return 0 or 1 in tcp mode to indicate whether the data is received successfully.</span>
<span class="line"># pipe_air_gap_receiver_enabled=false</span>
<span class="line"></span>
<span class="line"># The port for the server to receive pipe data through air gap.</span>
<span class="line"># pipe_air_gap_receiver_port=9780</span>
<span class="line"></span></code></pre><div class="line-numbers" aria-hidden="true" style="counter-reset:line-number 0;"><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div><div class="line-number"></div></div></div><h2 id="参考-参数说明" tabindex="-1"><a class="header-anchor" href="#参考-参数说明"><span>参考:参数说明</span></a></h2><h3 id="source-参数-v1-3-0" tabindex="-1"><a class="header-anchor" href="#source-参数-v1-3-0"><span>source 参数(V1.3.0)</span></a></h3><table><thead><tr><th style="text-align:left;">参数</th><th style="text-align:left;">描述</th><th style="text-align:left;">value 取值范围</th><th style="text-align:left;">是否必填</th><th style="text-align:left;">默认取值</th></tr></thead><tbody><tr><td style="text-align:left;">source</td><td style="text-align:left;">iotdb-source</td><td style="text-align:left;">String: iotdb-source</td><td style="text-align:left;">必填</td><td style="text-align:left;">-</td></tr><tr><td style="text-align:left;">source.pattern</td><td style="text-align:left;">用于筛选时间序列的路径前缀</td><td style="text-align:left;">String: 任意的时间序列前缀</td><td style="text-align:left;">选填</td><td style="text-align:left;">root</td></tr><tr><td style="text-align:left;">source.history.enable</td><td style="text-align:left;">是否发送历史数据</td><td style="text-align:left;">Boolean: true / false</td><td style="text-align:left;">选填</td><td style="text-align:left;">true</td></tr><tr><td style="text-align:left;">source.history.start-time</td><td style="text-align:left;">同步历史数据的开始 event time,包含 start-time</td><td style="text-align:left;">Long: [Long.MIN_VALUE, Long.MAX_VALUE]</td><td style="text-align:left;">选填</td><td style="text-align:left;">Long.MIN_VALUE</td></tr><tr><td style="text-align:left;">source.history.end-time</td><td style="text-align:left;">同步历史数据的结束 event time,包含 end-time</td><td style="text-align:left;">Long: [Long.MIN_VALUE, Long.MAX_VALUE]</td><td style="text-align:left;">选填</td><td style="text-align:left;">Long.MAX_VALUE</td></tr><tr><td style="text-align:left;">source.realtime.enable</td><td style="text-align:left;">是否发送实时数据</td><td style="text-align:left;">Boolean: true / false</td><td style="text-align:left;">选填</td><td style="text-align:left;">true</td></tr><tr><td style="text-align:left;">source.realtime.mode</td><td style="text-align:left;">新插入数据(pipe 创建后)的抽取模式</td><td style="text-align:left;">String: stream, batch</td><td style="text-align:left;">选填</td><td style="text-align:left;">stream</td></tr><tr><td style="text-align:left;">source.forwarding-pipe-requests</td><td style="text-align:left;">是否转发由其他 Pipe (通常是数据同步)写入的数据</td><td style="text-align:left;">Boolean: true, false</td><td style="text-align:left;">选填</td><td style="text-align:left;">true</td></tr><tr><td style="text-align:left;">source.history.loose-range</td><td style="text-align:left;">tsfile 传输时,是否放宽历史数据(pipe 创建前)范围。&quot;&quot;:不放宽范围,严格按照设置的条件挑选数据&quot;time&quot;:放宽时间范围,避免对 TsFile 进行拆分,可以提升同步效率</td><td style="text-align:left;">String: &quot;&quot; / &quot;time&quot;</td><td style="text-align:left;">选填</td><td style="text-align:left;">空字符串</td></tr></tbody></table><blockquote><p>💎 <strong>说明:历史数据与实时数据的差异</strong></p><ul><li><strong>历史数据</strong>:所有 arrival time &lt; 创建 pipe 时当前系统时间的数据称为历史数据</li><li><strong>实时数据</strong>:所有 arrival time &gt;= 创建 pipe 时当前系统时间的数据称为实时数据</li><li><strong>全量数据</strong>: 全量数据 = 历史数据 + 实时数据</li></ul><p>💎 <strong>说明:数据抽取模式 stream batch 的差异</strong></p><ul><li><strong>stream(推荐)</strong>:该模式下,任务将对数据进行实时处理、发送,其特点是高时效、低吞吐</li><li><strong>batch</strong>:该模式下,任务将对数据进行批量(按底层数据文件)处理、发送,其特点是低时效、高吞吐</li></ul></blockquote><h3 id="source-参数-v1-3-1" tabindex="-1"><a class="header-anchor" href="#source-参数-v1-3-1"><span>source 参数(V1.3.1)</span></a></h3><blockquote><p>在 1.3.1 及以上的版本中,各项参数不再需要额外增加 source、processor、sink 前缀</p></blockquote><table><thead><tr><th style="text-align:left;">参数</th><th style="text-align:left;">描述</th><th style="text-align:left;">value 取值范围</th><th style="text-align:left;">是否必填</th><th style="text-align:left;">默认取值</th></tr></thead><tbody><tr><td style="text-align:left;">source</td><td style="text-align:left;">iotdb-source</td><td style="text-align:left;">String: iotdb-source</td><td style="text-align:left;">必填</td><td style="text-align:left;">-</td></tr><tr><td style="text-align:left;">pattern</td><td style="text-align:left;">用于筛选时间序列的路径前缀</td><td style="text-align:left;">String: 任意的时间序列前缀</td><td style="text-align:left;">选填</td><td style="text-align:left;">root</td></tr><tr><td style="text-align:left;">start-time</td><td style="text-align:left;">同步所有数据的开始 event time,包含 start-time</td><td style="text-align:left;">Long: [Long.MIN_VALUE, Long.MAX_VALUE]</td><td style="text-align:left;">选填</td><td style="text-align:left;">Long.MIN_VALUE</td></tr><tr><td style="text-align:left;">end-time</td><td style="text-align:left;">同步所有数据的结束 event time,包含 end-time</td><td style="text-align:left;">Long: [Long.MIN_VALUE, Long.MAX_VALUE]</td><td style="text-align:left;">选填</td><td style="text-align:left;">Long.MAX_VALUE</td></tr><tr><td style="text-align:left;">realtime.mode</td><td style="text-align:left;">新插入数据(pipe 创建后)的抽取模式</td><td style="text-align:left;">String: stream, batch</td><td style="text-align:left;">选填</td><td style="text-align:left;">stream</td></tr><tr><td style="text-align:left;">forwarding-pipe-requests</td><td style="text-align:left;">是否转发由其他 Pipe (通常是数据同步)写入的数据</td><td style="text-align:left;">Boolean: true, false</td><td style="text-align:left;">选填</td><td style="text-align:left;">true</td></tr><tr><td style="text-align:left;">history.loose-range</td><td style="text-align:left;">tsfile 传输时,是否放宽历史数据(pipe 创建前)范围。&quot;&quot;:不放宽范围,严格按照设置的条件挑选数据&quot;time&quot;:放宽时间范围,避免对 TsFile 进行拆分,可以提升同步效率</td><td style="text-align:left;">String: &quot;&quot; / &quot;time&quot;</td><td style="text-align:left;">选填</td><td style="text-align:left;">空字符串</td></tr></tbody></table><blockquote><p>💎 <strong>说明</strong>:为保持低版本兼容,history.enablehistory.start-timehistory.end-timerealtime.enable 仍可使用,但在新版本中不推荐。</p><p>💎 <strong>说明:数据抽取模式 stream 和 batch 的差异</strong></p><ul><li><strong>stream(推荐)</strong>:该模式下,任务将对数据进行实时处理、发送,其特点是高时效、低吞吐</li><li><strong>batch</strong>:该模式下,任务将对数据进行批量(按底层数据文件)处理、发送,其特点是低时效、高吞吐</li></ul></blockquote><h3 id="source-参数-v1-3-2" tabindex="-1"><a class="header-anchor" href="#source-参数-v1-3-2"><span>source 参数(V1.3.2)</span></a></h3><blockquote><p>在 1.3.1 及以上的版本中,各项参数不再需要额外增加 sourceprocessorsink 前缀</p></blockquote><table><thead><tr><th style="text-align:left;">参数</th><th style="text-align:left;">描述</th><th style="text-align:left;">value 取值范围</th><th style="text-align:left;">是否必填</th><th style="text-align:left;">默认取值</th></tr></thead><tbody><tr><td style="text-align:left;">source</td><td style="text-align:left;">iotdb-source</td><td style="text-align:left;">String: iotdb-source</td><td style="text-align:left;">必填</td><td style="text-align:left;">-</td></tr><tr><td style="text-align:left;">inclusion</td><td style="text-align:left;">用于指定数据同步任务中需要同步范围,分为数据、元数据和权限</td><td style="text-align:left;">String:all, data(insert,delete), schema(database,timeseries,ttl), auth</td><td style="text-align:left;">选填</td><td style="text-align:left;">data.insert</td></tr><tr><td style="text-align:left;">inclusion.exclusion</td><td style="text-align:left;">用于从 inclusion 指定的同步范围内排除特定的操作,减少同步的数据量</td><td style="text-align:left;">选填</td><td style="text-align:left;">-</td><td style="text-align:left;"></td></tr><tr><td style="text-align:left;">path</td><td style="text-align:left;">用于筛选待同步的时间序列及其相关元数据 / 数据的路径模式元数据同步只能用 pathpath 是精确匹配,参数必须为前缀路径或完整路径,即不能含有 <code>&quot;*&quot;</code>,最多在 path 参数的尾部含有一个 <code>&quot;**&quot;</code></td><td style="text-align:left;">String:IoTDB 的 pattern</td><td style="text-align:left;">选填</td><td style="text-align:left;">root.**</td></tr><tr><td style="text-align:left;">pattern</td><td style="text-align:left;">用于筛选时间序列的路径前缀</td><td style="text-align:left;">String: 任意的时间序列前缀</td><td style="text-align:left;">选填</td><td style="text-align:left;">root</td></tr><tr><td style="text-align:left;">start-time</td><td style="text-align:left;">同步所有数据的开始 event time,包含 start-time</td><td style="text-align:left;">Long: [Long.MIN_VALUE, Long.MAX_VALUE]</td><td style="text-align:left;">选填</td><td style="text-align:left;">Long.MIN_VALUE</td></tr><tr><td style="text-align:left;">end-time</td><td style="text-align:left;">同步所有数据的结束 event time,包含 end-time</td><td style="text-align:left;">Long: [Long.MIN_VALUE, Long.MAX_VALUE]</td><td style="text-align:left;">选填</td><td style="text-align:left;">Long.MAX_VALUE</td></tr><tr><td style="text-align:left;">realtime.mode</td><td style="text-align:left;">新插入数据(pipe 创建后)的抽取模式</td><td style="text-align:left;">String: stream, batch</td><td style="text-align:left;">选填</td><td style="text-align:left;">stream</td></tr><tr><td style="text-align:left;">forwarding-pipe-requests</td><td style="text-align:left;">是否转发由其他 Pipe (通常是数据同步)写入的数据</td><td style="text-align:left;">Boolean: true, false</td><td style="text-align:left;">选填</td><td style="text-align:left;">true</td></tr><tr><td style="text-align:left;">history.loose-range</td><td style="text-align:left;">tsfile 传输时,是否放宽历史数据(pipe 创建前)范围。&quot;&quot;:不放宽范围,严格按照设置的条件挑选数据&quot;time&quot;:放宽时间范围,避免对 TsFile 进行拆分,可以提升同步效率</td><td style="text-align:left;">String: &quot;&quot; 、 &quot;time&quot;</td><td style="text-align:left;">选填</td><td style="text-align:left;">&quot;&quot;</td></tr><tr><td style="text-align:left;">mods.enable</td><td style="text-align:left;">是否发送 tsfile mods 文件</td><td style="text-align:left;">Boolean: true / false</td><td style="text-align:left;">选填</td><td style="text-align:left;">false</td></tr></tbody></table><blockquote><p>💎 <strong>说明</strong>:为保持低版本兼容,history.enable、history.start-time、history.end-time、realtime.enable 仍可使用,但在新版本中不推荐。</p><p>💎 <strong>说明:数据抽取模式 stream batch 的差异</strong></p><ul><li><strong>stream(推荐)</strong>:该模式下,任务将对数据进行实时处理、发送,其特点是高时效、低吞吐</li><li><strong>batch</strong>:该模式下,任务将对数据进行批量(按底层数据文件)处理、发送,其特点是低时效、高吞吐</li></ul></blockquote><h3 id="sink-参数" tabindex="-1"><a class="header-anchor" href="#sink-参数"><span>sink 参数</span></a></h3><blockquote><p>在 1.3.1 及以上的版本中,各项参数不再需要额外增加 source、processor、sink 前缀</p></blockquote><h4 id="iotdb-thrift-sink-v1-3-0-1-2" tabindex="-1"><a class="header-anchor" href="#iotdb-thrift-sink-v1-3-0-1-2"><span>iotdb-thrift-sink( V1.3.0/1/2)</span></a></h4><table><thead><tr><th style="text-align:left;">key</th><th style="text-align:left;">value</th><th style="text-align:left;">value 取值范围</th><th style="text-align:left;">是否必填</th><th style="text-align:left;">默认取值</th></tr></thead><tbody><tr><td style="text-align:left;">sink</td><td style="text-align:left;">iotdb-thrift-sink 或 iotdb-thrift-async-sink</td><td style="text-align:left;">String: iotdb-thrift-sink iotdb-thrift-async-sink</td><td style="text-align:left;">必填</td><td style="text-align:left;"></td></tr><tr><td style="text-align:left;">sink.node-urls</td><td style="text-align:left;">目标端 IoTDB 任意多个 DataNode 节点的数据服务端口的 url(请注意同步任务不支持向自身服务进行转发)</td><td style="text-align:left;">String. 例:&#39;127.0.0.16667127.0.0.16668127.0.0.16669&#39;, &#39;127.0.0.16667&#39;</td><td style="text-align:left;">必填</td><td style="text-align:left;">-</td></tr><tr><td style="text-align:left;">sink.batch.enable</td><td style="text-align:left;">是否开启日志攒批发送模式,用于提高传输吞吐,降低 IOPS</td><td style="text-align:left;">Boolean: true, false</td><td style="text-align:left;">选填</td><td style="text-align:left;">true</td></tr><tr><td style="text-align:left;">sink.batch.max-delay-seconds</td><td style="text-align:left;">在开启日志攒批发送模式时生效,表示一批数据在发送前的最长等待时间(单位:s)</td><td style="text-align:left;">Integer</td><td style="text-align:left;">选填</td><td style="text-align:left;">1</td></tr><tr><td style="text-align:left;">batch.size-bytes</td><td style="text-align:left;">在开启日志攒批发送模式时生效,表示一批数据最大的攒批大小(单位:byte)</td><td style="text-align:left;">Long</td><td style="text-align:left;">选填</td><td style="text-align:left;">16<em>1024</em>1024</td></tr></tbody></table><h4 id="iotdb-air-gap-sink-v1-3-0-1-2" tabindex="-1"><a class="header-anchor" href="#iotdb-air-gap-sink-v1-3-0-1-2"><span>iotdb-air-gap-sink( V1.3.0/1/2)</span></a></h4><table><thead><tr><th style="text-align:left;">key</th><th style="text-align:left;">value</th><th style="text-align:left;">value 取值范围</th><th style="text-align:left;">是否必填</th><th style="text-align:left;">默认取值</th></tr></thead><tbody><tr><td style="text-align:left;">sink</td><td style="text-align:left;">iotdb-air-gap-sink</td><td style="text-align:left;">String: iotdb-air-gap-sink</td><td style="text-align:left;">必填</td><td style="text-align:left;">-</td></tr><tr><td style="text-align:left;">sink.node-urls</td><td style="text-align:left;">目标端 IoTDB 任意多个 DataNode 节点的数据服务端口的 url</td><td style="text-align:left;">String. 例:&#39;127.0.0.16667127.0.0.16668127.0.0.16669&#39;, &#39;127.0.0.16667&#39;</td><td style="text-align:left;">必填</td><td style="text-align:left;">-</td></tr><tr><td style="text-align:left;">sink.air-gap.handshake-timeout-ms</td><td style="text-align:left;">发送端与接收端在首次尝试建立连接时握手请求的超时时长,单位:毫秒</td><td style="text-align:left;">Integer</td><td style="text-align:left;">选填</td><td style="text-align:left;">5000</td></tr></tbody></table><h4 id="iotdb-thrift-ssl-sink-v1-3-1-2" tabindex="-1"><a class="header-anchor" href="#iotdb-thrift-ssl-sink-v1-3-1-2"><span>iotdb-thrift-ssl-sink( V1.3.1/2)</span></a></h4><table><thead><tr><th style="text-align:left;">key</th><th style="text-align:left;">value</th><th style="text-align:left;">value 取值范围</th><th style="text-align:left;">是否必填</th><th style="text-align:left;">默认取值</th></tr></thead><tbody><tr><td style="text-align:left;">sink</td><td style="text-align:left;">iotdb-thrift-ssl-sink</td><td style="text-align:left;">String: iotdb-thrift-ssl-sink</td><td style="text-align:left;">必填</td><td style="text-align:left;">-</td></tr><tr><td style="text-align:left;">node-urls</td><td style="text-align:left;">目标端 IoTDB 任意多个 DataNode 节点的数据服务端口的 url(请注意同步任务不支持向自身服务进行转发)</td><td style="text-align:left;">String. 例:&#39;127.0.0.16667127.0.0.16668127.0.0.16669&#39;, &#39;127.0.0.16667&#39;</td><td style="text-align:left;">必填</td><td style="text-align:left;">-</td></tr><tr><td style="text-align:left;">batch.enable</td><td style="text-align:left;">是否开启日志攒批发送模式,用于提高传输吞吐,降低 IOPS</td><td style="text-align:left;">Boolean: true, false</td><td style="text-align:left;">选填</td><td style="text-align:left;">true</td></tr><tr><td style="text-align:left;">batch.max-delay-seconds</td><td style="text-align:left;">在开启日志攒批发送模式时生效,表示一批数据在发送前的最长等待时间(单位:s)</td><td style="text-align:left;">Integer</td><td style="text-align:left;">选填</td><td style="text-align:left;">1</td></tr><tr><td style="text-align:left;">batch.size-bytes</td><td style="text-align:left;">在开启日志攒批发送模式时生效,表示一批数据最大的攒批大小(单位:byte)</td><td style="text-align:left;">Long</td><td style="text-align:left;">选填</td><td style="text-align:left;">16<em>1024</em>1024</td></tr><tr><td style="text-align:left;">ssl.trust-store-path</td><td style="text-align:left;">连接目标端 DataNode 所需的 trust store 证书路径</td><td style="text-align:left;">String: 证书目录名,配置为相对目录时,相对于 IoTDB 根目录Example: &#39;127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669&#39;, &#39;127.0.0.1:6667&#39;</td><td style="text-align:left;">必填</td><td style="text-align:left;">-</td></tr><tr><td style="text-align:left;">ssl.trust-store-pwd</td><td style="text-align:left;">连接目标端 DataNode 所需的 trust store 证书密码</td><td style="text-align:left;">Integer</td><td style="text-align:left;">必填</td><td style="text-align:left;">-</td></tr></tbody></table>`,76))])}const m=n(p,[["render",u],["__file","Data-Sync_timecho.html.vue"]]),v=JSON.parse('{"path":"/zh/UserGuide/latest/User-Manual/Data-Sync_timecho.html","title":"数据同步","lang":"zh-CN","frontmatter":{"description":"数据同步 数据同步是工业物联网的典型需求,通过数据同步机制,可实现 IoTDB 之间的数据共享,搭建完整的数据链路来满足内网外网数据互通、端边云同步、数据迁移、数据备份等需求。 功能概述 数据同步 一个数据同步任务包含 3 个阶段: 抽取(Source)阶段:该部分用于从源 IoTDB 抽取数据,在 SQL 语句中的 source 部分定义 处理(Pr...","head":[["link",{"rel":"alternate","hreflang":"en-us","href":"https://iotdb.apache.org/UserGuide/latest/User-Manual/Data-Sync_timecho.html"}],["meta",{"property":"og:url","content":"https://iotdb.apache.org/zh/UserGuide/latest/User-Manual/Data-Sync_timecho.html"}],["meta",{"property":"og:site_name","content":"IoTDB Website"}],["meta",{"property":"og:title","content":"数据同步"}],["meta",{"property":"og:description","content":"数据同步 数据同步是工业物联网的典型需求,通过数据同步机制,可实现 IoTDB 之间的数据共享,搭建完整的数据链路来满足内网外网数据互通、端边云同步、数据迁移、数据备份等需求。 功能概述 数据同步 一个数据同步任务包含 3 个阶段: 抽取(Source)阶段:该部分用于从源 IoTDB 抽取数据,在 SQL 语句中的 source 部分定义 处理(Pr..."}],["meta",{"property":"og:type","content":"article"}],["meta",{"property":"og:image","content":"https://alioss.timecho.com/docs/img/dataSync01.png"}],["meta",{"property":"og:locale","content":"zh-CN"}],["meta",{"property":"og:locale:alternate","content":"en-US"}],["meta",{"property":"og:updated_time","content":"2024-09-26T04:22:52.000Z"}],["meta",{"property":"article:modified_time","content":"2024-09-26T04:22:52.000Z"}],["script",{"type":"application/ld+json"},"{\\"@context\\":\\"https://schema.org\\",\\"@type\\":\\"Article\\",\\"headline\\":\\"数据同步\\",\\"image\\":[\\"https://alioss.timecho.com/docs/img/dataSync01.png\\",\\"https://alioss.timecho.com/docs/img/dataSync02.png\\",\\"https://alioss.timecho.com/docs/img/%E6%95%B0%E6%8D%AE%E5%90%8C%E6%AD%A51.png\\",\\"https://alioss.timecho.com/docs/img/%E6%95%B0%E6%8D%AE%E5%90%8C%E6%AD%A51.png\\",\\"https://alioss.timecho.com/docs/img/1706698592139.jpg\\",\\"https://alioss.timecho.com/docs/img/dataSync03.png\\",\\"https://alioss.timecho.com/docs/img/1706698610134.jpg\\",\\"https://alioss.timecho.com/docs/img/%E6%95%B0%E6%8D%AE%E4%BC%A0%E8%BE%931.png\\"],\\"dateModified\\":\\"2024-09-26T04:22:52.000Z\\",\\"author\\":[]}"]]},"headers":[{"level":2,"title":"功能概述","slug":"功能概述","link":"#功能概述","children":[{"level":3,"title":"数据同步","slug":"数据同步-1","link":"#数据同步-1","children":[]},{"level":3,"title":"功能限制及说明","slug":"功能限制及说明","link":"#功能限制及说明","children":[]}]},{"level":2,"title":"使用说明","slug":"使用说明","link":"#使用说明","children":[{"level":3,"title":"创建任务","slug":"创建任务","link":"#创建任务","children":[]},{"level":3,"title":"开始任务","slug":"开始任务","link":"#开始任务","children":[]},{"level":3,"title":"停止任务","slug":"停止任务","link":"#停止任务","children":[]},{"level":3,"title":"删除任务","slug":"删除任务","link":"#删除任务","children":[]},{"level":3,"title":"查看任务","slug":"查看任务","link":"#查看任务","children":[]},{"level":3,"title":"同步插件","slug":"同步插件","link":"#同步插件","children":[]}]},{"level":2,"title":"使用示例","slug":"使用示例","link":"#使用示例","children":[{"level":3,"title":"全量数据同步","slug":"全量数据同步","link":"#全量数据同步","children":[]},{"level":3,"title":"部分数据同步","slug":"部分数据同步","link":"#部分数据同步","children":[]},{"level":3,"title":"双向数据传输","slug":"双向数据传输","link":"#双向数据传输","children":[]},{"level":3,"title":"边云数据传输","slug":"边云数据传输","link":"#边云数据传输","children":[]},{"level":3,"title":"级联数据传输","slug":"级联数据传输","link":"#级联数据传输","children":[]},{"level":3,"title":"跨网闸数据传输","slug":"跨网闸数据传输","link":"#跨网闸数据传输","children":[]},{"level":3,"title":"压缩同步(V1.3.2+ )","slug":"压缩同步-v1-3-2","link":"#压缩同步-v1-3-2","children":[]},{"level":3,"title":"加密同步(V1.3.1+ )","slug":"加密同步-v1-3-1","link":"#加密同步-v1-3-1","children":[]}]},{"level":2,"title":"参考:注意事项","slug":"参考-注意事项","link":"#参考-注意事项","children":[]},{"level":2,"title":"参考:参数说明","slug":"参考-参数说明","link":"#参考-参数说明","children":[{"level":3,"title":"source 参数(V1.3.0)","slug":"source-参数-v1-3-0","link":"#source-参数-v1-3-0","children":[]},{"level":3,"title":"source 参数(V1.3.1)","slug":"source-参数-v1-3-1","link":"#source-参数-v1-3-1","children":[]},{"level":3,"title":"source 参数(V1.3.2)","slug":"source-参数-v1-3-2","link":"#source-参数-v1-3-2","children":[]},{"level":3,"title":"sink 参数","slug":"sink-参数","link":"#sink-参数","children":[]}]}],"git":{"createdTime":1702027758000,"updatedTime":1727324572000,"contributors":[{"name":"W1y1r","email":"150988475+W1y1r@users.noreply.github.com","commits":3},{"name":"Caideyipi","email":"87789683+Caideyipi@users.noreply.github.com","commits":1},{"name":"CritasWang","email":"critas@outlook.com","commits":1},{"name":"shuwenwei","email":"55970239+shuwenwei@users.noreply.github.com","commits":1}]},"readingTime":{"minutes":18.88,"words":5663},"filePathRelative":"zh/UserGuide/latest/User-Manual/Data-Sync_timecho.md","localizedDate":"2023年12月8日","autoDesc":true}');export{m as comp,v as data};