| <!-- |
| |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| |
| --> |
| |
| # 数据同步 |
| 数据同步是工业物联网的典型需求,通过数据同步机制,可实现 IoTDB 之间的数据共享,搭建完整的数据链路来满足内网外网数据互通、端边云同步、数据迁移、数据备份等需求。 |
| |
| ## 功能概述 |
| |
| ### 数据同步 |
| |
| 一个数据同步任务包含 3 个阶段: |
| |
|  |
| |
| - 抽取(Source)阶段:该部分用于从源 IoTDB 抽取数据,在 SQL 语句中的 source 部分定义 |
| - 处理(Process)阶段:该部分用于处理从源 IoTDB 抽取出的数据,在 SQL 语句中的 processor 部分定义 |
| - 发送(Sink)阶段:该部分用于向目标 IoTDB 发送数据,在 SQL 语句中的 sink 部分定义 |
| |
| 通过 SQL 语句声明式地配置 3 个部分的具体内容,可实现灵活的数据同步能力。目前数据同步支持以下信息的同步,您可以在创建同步任务时对同步范围进行选择(默认选择 data.insert,即同步新写入的数据): |
| |
| <table style="text-align: left;"> |
| <tbody> |
| <tr> <th>同步范围</th> |
| <th>同步内容</th> |
| <th>说明</th> |
| </tr> |
| <tr> |
| <td colspan="2">all</td> |
| <td>所有范围</td> |
| </tr> |
| <tr> |
| <td rowspan="2">data(数据)</td> |
| <td>insert(增量)</td> |
| <td>同步新写入的数据</td> |
| </tr> |
| <tr> |
| <td>delete(删除)</td> |
| <td>同步被删除的数据</td> |
| </tr> |
| <tr> |
| <td rowspan="3">schema(元数据)</td> |
| <td>database(数据库)</td> |
| <td>同步数据库的创建、修改或删除操作</td> |
| </tr> |
| <tr> |
| <td>timeseries(时间序列)</td> |
| <td>同步时间序列的定义和属性</td> |
| </tr> |
| <tr> |
| <td>TTL(数据到期时间)</td> |
| <td>同步数据的存活时间</td> |
| </tr> |
| <tr> |
| <td>auth(权限)</td> |
| <td>-</td> |
| <td>同步用户权限和访问控制</td> |
| </tr> |
| </tbody> |
| </table> |
| |
| ### 功能限制及说明 |
| |
| 元数据(schema)、权限(auth)同步功能存在如下限制: |
| |
| - 使用元数据同步时,要求`Schema region`、`ConfigNode` 的共识协议必须为默认的 ratis 协议,即`iotdb-system.properties`配置文件中是否包含`config_node_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus`、`schema_region_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus`,不包含即为默认值ratis 协议。 |
| |
| - 为了防止潜在的冲突,请在开启元数据同步时关闭接收端自动创建元数据功能。可通过修改 `iotdb-system.properties`配置文件中的`enable_auto_create_schema`配置项为 false,关闭元数据自动创建功能。 |
| |
| - 开启元数据同步时,不支持使用自定义插件。 |
| |
| - 在进行数据同步任务时,请避免执行任何删除操作,防止两端状态不一致。 |
| |
| ## 使用说明 |
| |
| 数据同步任务有三种状态:RUNNING、STOPPED 和 DROPPED。任务状态转换如下图所示: |
| |
|  |
| |
| 创建后任务会直接启动,同时当任务发生异常停止后,系统会自动尝试重启任务。 |
| |
| 提供以下 SQL 语句对同步任务进行状态管理。 |
| |
| ### 创建任务 |
| |
| 使用 `CREATE PIPE` 语句来创建一条数据同步任务,下列属性中`PipeId`和`sink`必填,`source`和`processor`为选填项,输入 SQL 时注意 `SOURCE`与 `SINK` 插件顺序不能替换。 |
| |
| SQL 示例如下: |
| |
| ```SQL |
| CREATE PIPE [IF NOT EXISTS] <PipeId> -- PipeId 是能够唯一标定任务的名字 |
| -- 数据抽取插件,可选插件 |
| WITH SOURCE ( |
| [<parameter> = <value>,], |
| ) |
| -- 数据处理插件,可选插件 |
| WITH PROCESSOR ( |
| [<parameter> = <value>,], |
| ) |
| -- 数据连接插件,必填插件 |
| WITH SINK ( |
| [<parameter> = <value>,], |
| ) |
| ``` |
| |
| **IF NOT EXISTS 语义**:用于创建操作中,确保当指定 Pipe 不存在时,执行创建命令,防止因尝试创建已存在的 Pipe 而导致报错。 |
| |
| **注意**:V1.3.6 起,创建一个全量数据同步 Pipe (例如 Pipeid : `alldatapipe`)时,系统会自动将其拆分为两个独立的 Pipe: |
| |
| * 历史 Pipe:PipeId 为原名称加 _history后缀(如 `alldatapipe_history`),source 参数默认携带 `'realtime.enable'='false', 'inclusion'='data.insert', 'inclusion.exclusion'=''` |
| |
| * 实时 Pipe:PipeId 为原名称加 _realtime后缀(如 `alldatapipe_realtime`),source 参数默认携带 `'history.enable'='false'` ,若配置了元数据同步,则由实时 Pipe 负责发送 |
| |
| 创建成功后,原 PipeId(如 `alldatapipe`)将不再作为有效标识符。在进行启动、停止、删除、查看等任务操作时,必须使用拆分后的独立 PipeId(即 `*_history`或 `*_realtime`)。操作示例见[查看任务](./Data-Sync_apache.md#查看任务)小节 |
| |
| |
| ### 开始任务 |
| |
| 开始处理数据: |
| |
| ```SQL |
| START PIPE<PipeId> |
| ``` |
| |
| ### 停止任务 |
| |
| 停止处理数据: |
| |
| ```SQL |
| STOP PIPE <PipeId> |
| ``` |
| |
| ### 删除任务 |
| |
| 删除指定任务: |
| |
| ```SQL |
| DROP PIPE [IF EXISTS] <PipeId> |
| ``` |
| |
| **IF EXISTS 语义**:用于删除操作中,确保当指定 Pipe 存在时,执行删除命令,防止因尝试删除不存在的 Pipe 而导致报错。 |
| |
| 删除任务不需要先停止同步任务。 |
| |
| ### 查看任务 |
| |
| 查看全部任务: |
| |
| ```SQL |
| SHOW PIPES |
| ``` |
| |
| 查看指定任务: |
| |
| ```SQL |
| SHOW PIPE <PipeId> |
| ``` |
| |
| pipe 的 show pipes 结果示例: |
| |
| ```SQL |
| +--------------------------------+-----------------------+-------+----------+-------------+-----------------------------------------------------------+----------------+-------------------+-------------------------+ |
| | ID| CreationTime| State|PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| |
| +--------------------------------+-----------------------+-------+----------+-------------+-----------------------------------------------------------+----------------+-------------------+-------------------------+ |
| |59abf95db892428b9d01c5fa318014ea|2024-06-17T14:03:44.189|RUNNING| {}| {}|{sink=iotdb-thrift-sink, sink.ip=127.0.0.1, sink.port=6668}| | 128| 1.03| |
| +--------------------------------+-----------------------+-------+----------+-------------+-----------------------------------------------------------+----------------+-------------------+-------------------------+ |
| ``` |
| |
| |
| 其中各列含义如下: |
| |
| - **ID**:同步任务的唯一标识符 |
| - **CreationTime**:同步任务的创建的时间 |
| - **State**:同步任务的状态 |
| - **PipeSource**:同步数据流的来源 |
| - **PipeProcessor**:同步数据流在传输过程中的处理逻辑 |
| - **PipeSink**:同步数据流的目的地 |
| - **ExceptionMessage**:显示同步任务的异常信息 |
| - **RemainingEventCount(统计存在延迟)**:剩余 event 数,当前数据同步任务中的所有 event 总数,包括数据和元数据同步的 event,以及系统和用户自定义的 event。 |
| - **EstimatedRemainingSeconds(统计存在延迟)**:剩余时间,基于当前 event 个数和 pipe 处速率,预估完成传输的剩余时间。 |
| |
| 示例: |
| |
| 在 V1.3.6 及之后的版本中,创建一个全量数据同步任务,并查看该任务详情 |
| |
| ```sql |
| IoTDB> create pipe alldatapipe with source('inclusion'='all','exclusion'='auth') with sink('node-urls'='127.0.0.1:6668') |
| |
| IoTDB> show pipe alldatapipe_history |
| +-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ |
| | ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| |
| +-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ |
| |alldatapipe_history|2025-12-18T15:06:16.697|RUNNING|{exclusion=auth, history.enable=true, inclusion=data.insert, inclusion.exclusion=, realtime.enable=false}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| |
| +-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ |
| |
| IoTDB> show pipe alldatapipe_realtime |
| +--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ |
| | ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| |
| +--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ |
| |alldatapipe_realtime|2025-12-18T15:06:16.312|RUNNING|{exclusion=auth, history.enable=false, inclusion=all, realtime.enable=true}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| |
| +--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ |
| |
| ``` |
| |
| |
| ### 同步插件 |
| |
| 为了使得整体架构更加灵活以匹配不同的同步场景需求,我们支持在同步任务框架中进行插件组装。系统为您预置了一些常用插件可直接使用,同时您也可以自定义 processor 插件 和 Sink 插件,并加载至 IoTDB 系统进行使用。查看系统中的插件(含自定义与内置插件)可以用以下语句: |
| |
| ```SQL |
| SHOW PIPEPLUGINS |
| ``` |
| |
| 返回结果如下: |
| |
| ```SQL |
| IoTDB> SHOW PIPEPLUGINS |
| +------------------------------+----------+--------------------------------------------------------------------------------------------------+----------------------------------------------------+ |
| | PluginName|PluginType| ClassName| PluginJar| |
| +------------------------------+----------+--------------------------------------------------------------------------------------------------+----------------------------------------------------+ |
| | DO-NOTHING-PROCESSOR| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.processor.donothing.DoNothingProcessor| | |
| | DO-NOTHING-SINK| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.connector.donothing.DoNothingConnector| | |
| | IOTDB-SOURCE| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.extractor.iotdb.IoTDBExtractor| | |
| | IOTDB-THRIFT-SINK| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.thrift.IoTDBThriftConnector| | |
| | IOTDB-THRIFT-SSL-SINK| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.thrift.IoTDBThriftSslConnector| | |
| +------------------------------+----------+--------------------------------------------------------------------------------------------------+----------------------------------------------------+ |
| |
| ``` |
| |
| 预置插件详细介绍如下(各插件的详细参数可参考本文[参数说明](#参考参数说明)): |
| |
| <table style="text-align: left;"> |
| <tbody> |
| <tr> <th>类型</th> |
| <th>自定义插件</th> |
| <th>插件名称</th> |
| <th>介绍</th> |
| <th>适用版本</th> |
| </tr> |
| <tr> |
| <td>source 插件</td> |
| <td>不支持</td> |
| <td>iotdb-source</td> |
| <td>默认的 extractor 插件,用于抽取 IoTDB 历史或实时数据</td> |
| <td>1.2.x</td> |
| </tr> |
| <tr> |
| <td>processor 插件</td> |
| <td>支持</td> |
| <td>do-nothing-processor</td> |
| <td>默认的 processor 插件,不对传入的数据做任何的处理</td> |
| <td>1.2.x</td> |
| </tr> |
| <tr> |
| <td rowspan="3">sink 插件</td> |
| <td rowspan="3">支持</td> |
| <td>do-nothing-sink</td> |
| <td>不对发送出的数据做任何的处理</td> |
| <td>1.2.x</td> |
| </tr> |
| <tr> |
| <td>iotdb-thrift-sink</td> |
| <td>默认的 sink 插件(V1.3.1及以上),用于 IoTDB(V1.2.0 及以上)与 IoTDB(V1.2.0 及以上)之间的数据传输。使用 Thrift RPC 框架传输数据,多线程 async non-blocking IO 模型,传输性能高,尤其适用于目标端为分布式时的场景</td> |
| <td>1.2.x</td> |
| </tr> |
| <tr> |
| <td>iotdb-thrift-ssl-sink</td> |
| <td>用于 IoTDB(V1.3.1 及以上)与 IoTDB(V1.2.0 及以上)之间的数据传输。使用 Thrift RPC 框架传输数据,单线程 sync blocking IO 模型,适用于安全需求较高的场景 </td> |
| <td>1.3.1+</td> |
| </tr> |
| </tbody> |
| </table> |
| |
| 导入自定义插件可参考[流处理框架](./Streaming_apache.md#自定义流处理插件管理)章节。 |
| |
| ## 使用示例 |
| |
| ### 全量数据同步 |
| |
| 本例子用来演示将一个 IoTDB 的所有数据同步至另一个 IoTDB,数据链路如下图所示: |
| |
|  |
| |
| 在这个例子中,我们可以创建一个名为 A2B 的同步任务,用来同步 A IoTDB 到 B IoTDB 间的全量数据,这里需要用到用到 sink 的 iotdb-thrift-sink 插件(内置插件),需通过 node-urls 配置目标端 IoTDB 中 DataNode 节点的数据服务端口的 url,如下面的示例语句: |
| |
| ```SQL |
| create pipe A2B |
| with sink ( |
| 'sink'='iotdb-thrift-sink', |
| 'node-urls' = '127.0.0.1:6668', -- 目标端 IoTDB 中 DataNode 节点的数据服务端口的 url |
| ) |
| ``` |
| |
| ### 部分数据同步 |
| |
| 本例子用来演示同步某个历史时间范围( 2023 年 8 月 23 日 8 点到 2023 年 10 月 23 日 8 点)的数据至另一个 IoTDB,数据链路如下图所示: |
| |
|  |
| |
| 在这个例子中,我们可以创建一个名为 A2B 的同步任务。首先我们需要在 source 中定义传输数据的范围,由于传输的是历史数据(历史数据是指同步任务创建之前存在的数据),需要配置数据的起止时间 start-time 和 end-time 以及传输的模式 mode。通过 node-urls 配置目标端 IoTDB 中 DataNode 节点的数据服务端口的 url。 |
| |
| 详细语句如下: |
| |
| ```SQL |
| create pipe A2B |
| WITH SOURCE ( |
| 'source'= 'iotdb-source', |
| 'realtime.mode' = 'stream' -- 新插入数据(pipe创建后)的抽取模式 |
| 'path' = 'root.vehicle.**', -- 同步数据的范围 |
| 'start-time' = '2023.08.23T08:00:00+00:00', -- 同步所有数据的开始 event time,包含 start-time |
| 'end-time' = '2023.10.23T08:00:00+00:00' -- 同步所有数据的结束 event time,包含 end-time |
| ) |
| with SINK ( |
| 'sink'='iotdb-thrift-async-sink', |
| 'node-urls' = '127.0.0.1:6668', -- 目标端 IoTDB 中 DataNode 节点的数据服务端口的 url |
| ) |
| ``` |
| |
| ### 边云数据传输 |
| |
| 本例子用来演示多个 IoTDB 之间边云传输数据的场景,数据由 B 、C、D 集群分别都同步至 A 集群,数据链路如下图所示: |
| |
|  |
| |
| 在这个例子中,为了将 B 、C、D 集群的数据同步至 A,在 BA 、CA、DA 之间的 pipe 需要配置`path`限制范围,以及要保持边侧和云侧的数据一致 pipe 需要配置`inclusion=all`来同步全量数据和元数据,详细语句如下: |
| |
| 在 B IoTDB 上执行下列语句,将 B 中数据同步至 A: |
| |
| ```SQL |
| create pipe BA |
| with source ( |
| 'inclusion'='all', -- 表示同步全量数据、元数据和权限 |
| 'path'='root.db.**', -- 限制范围 |
| ) |
| with sink ( |
| 'sink'='iotdb-thrift-sink', |
| 'node-urls' = '127.0.0.1:6667', -- 目标端 IoTDB 中 DataNode 节点的数据服务端口的 url |
| ) |
| ``` |
| |
| 在 C IoTDB 上执行下列语句,将 C 中数据同步至 A: |
| |
| ```SQL |
| create pipe CA |
| with source ( |
| 'inclusion'='all', -- 表示同步全量数据、元数据和权限 |
| 'path'='root.db.**', -- 限制范围 |
| ) |
| with sink ( |
| 'sink'='iotdb-thrift-sink', |
| 'node-urls' = '127.0.0.1:6668', -- 目标端 IoTDB 中 DataNode 节点的数据服务端口的 url |
| ) |
| ``` |
| |
| 在 D IoTDB 上执行下列语句,将 D 中数据同步至 A: |
| |
| ```SQL |
| create pipe DA |
| with source ( |
| 'inclusion'='all', -- 表示同步全量数据、元数据和权限 |
| 'path'='root.db.**', -- 限制范围 |
| ) |
| with sink ( |
| 'sink'='iotdb-thrift-sink', |
| 'node-urls' = '127.0.0.1:6669', -- 目标端 IoTDB 中 DataNode 节点的数据服务端口的 url |
| ) |
| ``` |
| |
| ### 级联数据传输 |
| |
| 本例子用来演示多个 IoTDB 之间级联传输数据的场景,数据由 A 集群同步至 B 集群,再同步至 C 集群,数据链路如下图所示: |
| |
|  |
| |
| 在这个例子中,为了将 A 集群的数据同步至 C,在 BC 之间的 pipe 需要将 `forwarding-pipe-requests` 配置为`true`,详细语句如下: |
| |
| 在 A IoTDB 上执行下列语句,将 A 中数据同步至 B: |
| |
| ```SQL |
| create pipe AB |
| with sink ( |
| 'sink'='iotdb-thrift-sink', |
| 'node-urls' = '127.0.0.1:6668', -- 目标端 IoTDB 中 DataNode 节点的数据服务端口的 url |
| ) |
| ``` |
| |
| 在 B IoTDB 上执行下列语句,将 B 中数据同步至 C: |
| |
| ```SQL |
| create pipe BC |
| with source ( |
| 'forwarding-pipe-requests' = 'true' --是否转发由其他 Pipe 写入的数据 |
| ) |
| with sink ( |
| 'sink'='iotdb-thrift-sink', |
| 'node-urls' = '127.0.0.1:6669', -- 目标端 IoTDB 中 DataNode 节点的数据服务端口的 url |
| ) |
| ``` |
| |
| ### 压缩同步 |
| |
| IoTDB 支持在同步过程中指定数据压缩方式。可通过配置 `compressor` 参数,实现数据的实时压缩和传输。`compressor`目前支持 snappy / gzip / lz4 / zstd / lzma2 5 种可选算法,且可以选择多种压缩算法组合,按配置的顺序进行压缩。`rate-limit-bytes-per-second`(V1.3.3 及以后版本支持)每秒最大允许传输的byte数,计算压缩后的byte,若小于0则不限制。 |
| |
| 如创建一个名为 A2B 的同步任务: |
| |
| ```SQL |
| create pipe A2B |
| with sink ( |
| 'node-urls' = '127.0.0.1:6668', -- 目标端 IoTDB 中 DataNode 节点的数据服务端口的 url |
| 'compressor' = 'snappy,lz4' -- 压缩算法 |
| ) |
| ``` |
| |
| ### 加密同步 |
| |
| IoTDB 支持在同步过程中使用 SSL 加密,从而在不同的 IoTDB 实例之间安全地传输数据。通过配置 SSL 相关的参数,如证书地址和密码(`ssl.trust-store-path`)、(`ssl.trust-store-pwd`)可以确保数据在同步过程中被 SSL 加密所保护。 |
| |
| 如创建名为 A2B 的同步任务: |
| |
| ```SQL |
| create pipe A2B |
| with sink ( |
| 'sink'='iotdb-thrift-ssl-sink', |
| 'node-urls'='127.0.0.1:6667', -- 目标端 IoTDB 中 DataNode 节点的数据服务端口的 url |
| 'ssl.trust-store-path'='pki/trusted', -- 连接目标端 DataNode 所需的 trust store 证书路径 |
| 'ssl.trust-store-pwd'='root' -- 连接目标端 DataNode 所需的 trust store 证书密码 |
| ) |
| ``` |
| |
| ## 参考:注意事项 |
| |
| 可通过修改 IoTDB 配置文件(`iotdb-system.properties`)以调整数据同步的参数,如同步数据存储目录等。完整配置如下:: |
| |
| V1.3.3+: |
| |
| ```Properties |
| # pipe_receiver_file_dir |
| # If this property is unset, system will save the data in the default relative path directory under the IoTDB folder(i.e., %IOTDB_HOME%/${cn_system_dir}/pipe/receiver). |
| # If it is absolute, system will save the data in the exact location it points to. |
| # If it is relative, system will save the data in the relative path directory it indicates under the IoTDB folder. |
| # Note: If pipe_receiver_file_dir is assigned an empty string(i.e.,zero-size), it will be handled as a relative path. |
| # effectiveMode: restart |
| # For windows platform |
| # If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is absolute. Otherwise, it is relative. |
| # pipe_receiver_file_dir=data\\confignode\\system\\pipe\\receiver |
| # For Linux platform |
| # If its prefix is "/", then the path is absolute. Otherwise, it is relative. |
| pipe_receiver_file_dir=data/confignode/system/pipe/receiver |
| |
| #################### |
| ### Pipe Configuration |
| #################### |
| |
| # Uncomment the following field to configure the pipe lib directory. |
| # effectiveMode: first_start |
| # For Windows platform |
| # If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is |
| # absolute. Otherwise, it is relative. |
| # pipe_lib_dir=ext\\pipe |
| # For Linux platform |
| # If its prefix is "/", then the path is absolute. Otherwise, it is relative. |
| pipe_lib_dir=ext/pipe |
| |
| # The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor. |
| # The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)). |
| # effectiveMode: restart |
| # Datatype: int |
| pipe_subtask_executor_max_thread_num=5 |
| |
| # The connection timeout (in milliseconds) for the thrift client. |
| # effectiveMode: restart |
| # Datatype: int |
| pipe_sink_timeout_ms=900000 |
| |
| # The maximum number of selectors that can be used in the sink. |
| # Recommend to set this value to less than or equal to pipe_sink_max_client_number. |
| # effectiveMode: restart |
| # Datatype: int |
| pipe_sink_selector_number=4 |
| |
| # The maximum number of clients that can be used in the sink. |
| # effectiveMode: restart |
| # Datatype: int |
| pipe_sink_max_client_number=16 |
| |
| # The total bytes that all pipe sinks can transfer per second. |
| # When given a value less than or equal to 0, it means no limit. |
| # default value is -1, which means no limit. |
| # effectiveMode: hot_reload |
| # Datatype: double |
| pipe_all_sinks_rate_limit_bytes_per_second=-1 |
| ``` |
| |
| ## 参考:参数说明 |
| |
| ### source 参数(V1.3.3) |
| |
| | 参数 | 描述 | value 取值范围 | 是否必填 | 默认取值 | |
| | ------------------------ | ------------------------------------------------------------ | ------------------------------------------------------------ | -------- | -------------- | |
| | source | iotdb-source | String: iotdb-source | 必填 | - | |
| | inclusion | 用于指定数据同步任务中需要同步范围,分为数据、元数据和权限 | String:all, data(insert,delete), schema(database,timeseries,ttl), auth | 选填 | data.insert | |
| | inclusion.exclusion | 用于从 inclusion 指定的同步范围内排除特定的操作,减少同步的数据量 | String:all, data(insert,delete), schema(database,timeseries,ttl), auth | 选填 | 空字符串 | |
| | mode | 用于在每个 data region 发送完毕时分别发送结束事件,并在全部 data region 发送完毕后自动 drop pipe。query:结束,subscribe:不结束。 | String: query / subscribe | 选填 | subscribe | |
| | path | 用于筛选待同步的时间序列及其相关元数据 / 数据的路径模式元数据同步只能用pathpath 是精确匹配,参数必须为前缀路径或完整路径,即不能含有 `"*"`,最多在 path参数的尾部含有一个 `"**"` | String:IoTDB 的 pattern | 选填 | root.** | |
| | pattern | 用于筛选时间序列的路径前缀 | String: 任意的时间序列前缀 | 选填 | root | |
| | start-time | 同步所有数据的开始 event time,包含 start-time | Long: [Long.MIN_VALUE, Long.MAX_VALUE] | 选填 | Long.MIN_VALUE | |
| | end-time | 同步所有数据的结束 event time,包含 end-time | Long: [Long.MIN_VALUE, Long.MAX_VALUE] | 选填 | Long.MAX_VALUE | |
| | realtime.mode | 新插入数据(pipe 创建后)的抽取模式 | String: batch | 选填 | batch | |
| | forwarding-pipe-requests | 是否转发由其他 Pipe (通常是数据同步)写入的数据 | Boolean: true | 选填 | true | |
| | history.loose-range | tsfile传输时,是否放宽历史数据(pipe创建前)范围。"":不放宽范围,严格按照设置的条件挑选数据"time":放宽时间范围,避免对TsFile进行拆分,可以提升同步效率"path":放宽路径范围,避免对TsFile进行拆分,可以提升同步效率"time, path" 、 "path, time" 、"all" : 放宽所有范围,避免对TsFile进行拆分,可以提升同步效率 | String: "" 、 "time" 、 "path" 、 "time, path" 、 "path, time" 、 "all" | 选填 | "" | |
| | realtime.loose-range | tsfile传输时,是否放宽实时数据(pipe创建前)范围。"":不放宽范围,严格按照设置的条件挑选数据"time":放宽时间范围,避免对TsFile进行拆分,可以提升同步效率"path":放宽路径范围,避免对TsFile进行拆分,可以提升同步效率"time, path" 、 "path, time" 、"all" : 放宽所有范围,避免对TsFile进行拆分,可以提升同步效率 | String: "" 、 "time" 、 "path" 、 "time, path" 、 "path, time" 、 "all" | 选填 | "" | |
| | mods.enable | 是否发送 tsfile 的 mods 文件 | Boolean: true / false | 选填 | false | |
| |
| > 💎 **说明**:为保持低版本兼容,history.enable、history.start-time、history.end-time、realtime.enable 仍可使用,但在新版本中不推荐。 |
| > |
| > 💎 **说明:数据抽取模式 batch 的含义** |
| > - **batch**:该模式下,任务将对数据进行批量(按底层数据文件)处理、发送,其特点是低时效、高吞吐 |
| |
| |
| ### sink 参数 |
| |
| > 在 1.3.3 及以上的版本中,只包含sink的情况下,不再需要额外增加with sink 前缀 |
| |
| #### iotdb-thrift-sink |
| |
| | key | value | value 取值范围 | 是否必填 | 默认取值 | |
| | ----------------------- | ------------------------------------------------------------ | ------------------------------------------------------------ | -------- | ------------ | |
| | sink | iotdb-thrift-sink 或 iotdb-thrift-async-sink | String: iotdb-thrift-sink 或 iotdb-thrift-async-sink | 必填 | - | |
| | node-urls | 目标端 IoTDB 任意多个 DataNode 节点的数据服务端口的 url(请注意同步任务不支持向自身服务进行转发) | String. 例:'127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669', '127.0.0.1:6667' | 必填 | - | |
| | batch.enable | 是否开启日志攒批发送模式,用于提高传输吞吐,降低 IOPS | Boolean: true, false | 选填 | true | |
| | batch.max-delay-seconds | 在开启日志攒批发送模式时生效,表示一批数据在发送前的最长等待时间(单位:s) | Integer | 选填 | 1 | |
| | batch.max-delay-ms | 在开启日志攒批发送模式时生效,表示一批数据在发送前的最长等待时间(单位:ms)(V1.3.6及以后的V1.x版本支持) | Integer | 选填 | 1 | |
| | batch.size-bytes | 在开启日志攒批发送模式时生效,表示一批数据最大的攒批大小(单位:byte) | Long | 选填 | 16*1024*1024 | |
| | load-tsfile-strategy | 文件同步数据时,接收端请求返回发送端前,是否等待接收端本地的 load tsfile 执行结果返回。<br>sync:等待本地的 load tsfile 执行结果返回;<br>async:不等待本地的 load tsfile 执行结果返回。(V1.3.6及以后的V1.x版本支持) | String: sync / async | 选填 | sync | |
| |
| #### iotdb-thrift-ssl-sink |
| |
| | key | value | value 取值范围 | 是否必填 | 默认取值 | |
| | ----------------------- | ------------------------------------------------------------ | ------------------------------------------------------------ | -------- | ------------ | |
| | sink | iotdb-thrift-ssl-sink | String: iotdb-thrift-ssl-sink | 必填 | - | |
| | node-urls | 目标端 IoTDB 任意多个 DataNode 节点的数据服务端口的 url(请注意同步任务不支持向自身服务进行转发) | String. 例:'127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669', '127.0.0.1:6667' | 必填 | - | |
| | batch.enable | 是否开启日志攒批发送模式,用于提高传输吞吐,降低 IOPS | Boolean: true, false | 选填 | true | |
| | batch.max-delay-seconds | 在开启日志攒批发送模式时生效,表示一批数据在发送前的最长等待时间(单位:s) | Integer | 选填 | 1 | |
| | batch.max-delay-ms | 在开启日志攒批发送模式时生效,表示一批数据在发送前的最长等待时间(单位:ms)(V1.3.6及以后的V1.x版本支持) | Integer | 选填 | 1 | |
| | batch.size-bytes | 在开启日志攒批发送模式时生效,表示一批数据最大的攒批大小(单位:byte) | Long | 选填 | 16*1024*1024 | |
| | load-tsfile-strategy | 文件同步数据时,接收端请求返回发送端前,是否等待接收端本地的 load tsfile 执行结果返回。<br>sync:等待本地的 load tsfile 执行结果返回;<br>async:不等待本地的 load tsfile 执行结果返回。(V1.3.6及以后的V1.x版本支持) | String: sync / async | 选填 | sync | |
| | ssl.trust-store-path | 连接目标端 DataNode 所需的 trust store 证书路径 | String.Example: '127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669', '127.0.0.1:6667' | 必填 | - | |
| | ssl.trust-store-pwd | 连接目标端 DataNode 所需的 trust store 证书密码 | Integer | 必填 | - | |
| |
| |