数据同步是工业物联网的典型需求,通过数据同步机制,可实现 IoTDB 之间的数据共享,搭建完整的数据链路来满足内网外网数据互通、端边云同步、数据迁移、数据备份等需求。
一个数据同步任务包含 3 个阶段:
通过 SQL 语句声明式地配置 3 个部分的具体内容,可实现灵活的数据同步能力。目前数据同步支持以下信息的同步,您可以在创建同步任务时对同步范围进行选择(默认选择 data.insert,即同步新写入的数据):
元数据(schema)、权限(auth)同步功能存在如下限制:
使用元数据同步时,要求Schema region、ConfigNode 的共识协议必须为默认的 ratis 协议,即iotdb-system.properties配置文件中是否包含config_node_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus、schema_region_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus,不包含即为默认值ratis 协议。
为了防止潜在的冲突,请在开启元数据同步时关闭接收端自动创建元数据功能。可通过修改 iotdb-system.properties配置文件中的enable_auto_create_schema配置项为 false,关闭元数据自动创建功能。
开启元数据同步时,不支持使用自定义插件。
在进行数据同步任务时,请避免执行任何删除操作,防止两端状态不一致。
数据同步任务有三种状态:RUNNING、STOPPED 和 DROPPED。任务状态转换如下图所示:
创建后任务会直接启动,同时当任务发生异常停止后,系统会自动尝试重启任务。
提供以下 SQL 语句对同步任务进行状态管理。
使用 CREATE PIPE 语句来创建一条数据同步任务,下列属性中PipeId和sink必填,source和processor为选填项,输入 SQL 时注意 SOURCE与 SINK 插件顺序不能替换。
SQL 示例如下:
CREATE PIPE [IF NOT EXISTS] <PipeId> -- PipeId 是能够唯一标定任务的名字 -- 数据抽取插件,可选插件 WITH SOURCE ( [<parameter> = <value>,], ) -- 数据处理插件,可选插件 WITH PROCESSOR ( [<parameter> = <value>,], ) -- 数据连接插件,必填插件 WITH SINK ( [<parameter> = <value>,], )
IF NOT EXISTS 语义:用于创建操作中,确保当指定 Pipe 不存在时,执行创建命令,防止因尝试创建已存在的 Pipe 而导致报错。
开始处理数据:
START PIPE<PipeId>
停止处理数据:
STOP PIPE <PipeId>
删除指定任务:
DROP PIPE [IF EXISTS] <PipeId>
IF EXISTS 语义:用于删除操作中,确保当指定 Pipe 存在时,执行删除命令,防止因尝试删除不存在的 Pipe 而导致报错。
删除任务不需要先停止同步任务。
查看全部任务:
SHOW PIPES
查看指定任务:
SHOW PIPE <PipeId>
pipe 的 show pipes 结果示例:
+--------------------------------+-----------------------+-------+----------+-------------+-----------------------------------------------------------+----------------+-------------------+-------------------------+ | ID| CreationTime| State|PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| +--------------------------------+-----------------------+-------+----------+-------------+-----------------------------------------------------------+----------------+-------------------+-------------------------+ |59abf95db892428b9d01c5fa318014ea|2024-06-17T14:03:44.189|RUNNING| {}| {}|{sink=iotdb-thrift-sink, sink.ip=127.0.0.1, sink.port=6668}| | 128| 1.03| +--------------------------------+-----------------------+-------+----------+-------------+-----------------------------------------------------------+----------------+-------------------+-------------------------+
其中各列含义如下:
为了使得整体架构更加灵活以匹配不同的同步场景需求,我们支持在同步任务框架中进行插件组装。系统为您预置了一些常用插件可直接使用,同时您也可以自定义 processor 插件 和 Sink 插件,并加载至 IoTDB 系统进行使用。查看系统中的插件(含自定义与内置插件)可以用以下语句:
SHOW PIPEPLUGINS
返回结果如下:
IoTDB> SHOW PIPEPLUGINS +------------------------------+----------+--------------------------------------------------------------------------------------------------+----------------------------------------------------+ | PluginName|PluginType| ClassName| PluginJar| +------------------------------+----------+--------------------------------------------------------------------------------------------------+----------------------------------------------------+ | DO-NOTHING-PROCESSOR| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.processor.donothing.DoNothingProcessor| | | DO-NOTHING-SINK| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.connector.donothing.DoNothingConnector| | | IOTDB-SOURCE| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.extractor.iotdb.IoTDBExtractor| | | IOTDB-THRIFT-SINK| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.thrift.IoTDBThriftConnector| | | IOTDB-THRIFT-SSL-SINK| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.thrift.IoTDBThriftSslConnector| | +------------------------------+----------+--------------------------------------------------------------------------------------------------+----------------------------------------------------+
预置插件详细介绍如下(各插件的详细参数可参考本文参数说明):
导入自定义插件可参考流处理框架章节。
本例子用来演示将一个 IoTDB 的所有数据同步至另一个 IoTDB,数据链路如下图所示:
在这个例子中,我们可以创建一个名为 A2B 的同步任务,用来同步 A IoTDB 到 B IoTDB 间的全量数据,这里需要用到用到 sink 的 iotdb-thrift-sink 插件(内置插件),需通过 node-urls 配置目标端 IoTDB 中 DataNode 节点的数据服务端口的 url,如下面的示例语句:
create pipe A2B with sink ( 'sink'='iotdb-thrift-sink', 'node-urls' = '127.0.0.1:6668', -- 目标端 IoTDB 中 DataNode 节点的数据服务端口的 url )
本例子用来演示同步某个历史时间范围( 2023 年 8 月 23 日 8 点到 2023 年 10 月 23 日 8 点)的数据至另一个 IoTDB,数据链路如下图所示:
在这个例子中,我们可以创建一个名为 A2B 的同步任务。首先我们需要在 source 中定义传输数据的范围,由于传输的是历史数据(历史数据是指同步任务创建之前存在的数据),需要配置数据的起止时间 start-time 和 end-time 以及传输的模式 mode。通过 node-urls 配置目标端 IoTDB 中 DataNode 节点的数据服务端口的 url。
详细语句如下:
create pipe A2B WITH SOURCE ( 'source'= 'iotdb-source', 'realtime.mode' = 'stream' -- 新插入数据(pipe创建后)的抽取模式 'start-time' = '2023.08.23T08:00:00+00:00', -- 同步所有数据的开始 event time,包含 start-time 'end-time' = '2023.10.23T08:00:00+00:00' -- 同步所有数据的结束 event time,包含 end-time ) with SINK ( 'sink'='iotdb-thrift-async-sink', 'node-urls' = '127.0.0.1:6668', -- 目标端 IoTDB 中 DataNode 节点的数据服务端口的 url )
本例子用来演示多个 IoTDB 之间边云传输数据的场景,数据由 B 、C、D 集群分别都同步至 A 集群,数据链路如下图所示:
在这个例子中,为了将 B 、C、D 集群的数据同步至 A,在 BA 、CA、DA 之间的 pipe 需要配置path限制范围,以及要保持边侧和云侧的数据一致 pipe 需要配置inclusion=all来同步全量数据和元数据,详细语句如下:
在 B IoTDB 上执行下列语句,将 B 中数据同步至 A:
create pipe BA with source ( 'inclusion'='all', -- 表示同步全量数据、元数据和权限 'path'='root.db.**', -- 限制范围 ) with sink ( 'sink'='iotdb-thrift-sink', 'node-urls' = '127.0.0.1:6667', -- 目标端 IoTDB 中 DataNode 节点的数据服务端口的 url )
在 C IoTDB 上执行下列语句,将 C 中数据同步至 A:
create pipe CA with source ( 'inclusion'='all', -- 表示同步全量数据、元数据和权限 'path'='root.db.**', -- 限制范围 ) with sink ( 'sink'='iotdb-thrift-sink', 'node-urls' = '127.0.0.1:6668', -- 目标端 IoTDB 中 DataNode 节点的数据服务端口的 url )
在 D IoTDB 上执行下列语句,将 D 中数据同步至 A:
create pipe DA with source ( 'inclusion'='all', -- 表示同步全量数据、元数据和权限 'path'='root.db.**', -- 限制范围 ) with sink ( 'sink'='iotdb-thrift-sink', 'node-urls' = '127.0.0.1:6669', -- 目标端 IoTDB 中 DataNode 节点的数据服务端口的 url )
本例子用来演示多个 IoTDB 之间级联传输数据的场景,数据由 A 集群同步至 B 集群,再同步至 C 集群,数据链路如下图所示:
在这个例子中,为了将 A 集群的数据同步至 C,在 BC 之间的 pipe 需要将 forwarding-pipe-requests 配置为true,详细语句如下:
在 A IoTDB 上执行下列语句,将 A 中数据同步至 B:
create pipe AB with sink ( 'sink'='iotdb-thrift-sink', 'node-urls' = '127.0.0.1:6668', -- 目标端 IoTDB 中 DataNode 节点的数据服务端口的 url )
在 B IoTDB 上执行下列语句,将 B 中数据同步至 C:
create pipe BC with source ( 'forwarding-pipe-requests' = 'true' --是否转发由其他 Pipe 写入的数据 ) with sink ( 'sink'='iotdb-thrift-sink', 'node-urls' = '127.0.0.1:6669', -- 目标端 IoTDB 中 DataNode 节点的数据服务端口的 url )
IoTDB 支持在同步过程中指定数据压缩方式。可通过配置 compressor 参数,实现数据的实时压缩和传输。compressor目前支持 snappy / gzip / lz4 / zstd / lzma2 5 种可选算法,且可以选择多种压缩算法组合,按配置的顺序进行压缩。rate-limit-bytes-per-second(V1.3.3 及以后版本支持)每秒最大允许传输的byte数,计算压缩后的byte,若小于0则不限制。
如创建一个名为 A2B 的同步任务:
create pipe A2B with sink ( 'node-urls' = '127.0.0.1:6668', -- 目标端 IoTDB 中 DataNode 节点的数据服务端口的 url 'compressor' = 'snappy,lz4' -- 压缩算法 )
IoTDB 支持在同步过程中使用 SSL 加密,从而在不同的 IoTDB 实例之间安全地传输数据。通过配置 SSL 相关的参数,如证书地址和密码(ssl.trust-store-path)、(ssl.trust-store-pwd)可以确保数据在同步过程中被 SSL 加密所保护。
如创建名为 A2B 的同步任务:
create pipe A2B with sink ( 'sink'='iotdb-thrift-ssl-sink', 'node-urls'='127.0.0.1:6667', -- 目标端 IoTDB 中 DataNode 节点的数据服务端口的 url 'ssl.trust-store-path'='pki/trusted', -- 连接目标端 DataNode 所需的 trust store 证书路径 'ssl.trust-store-pwd'='root' -- 连接目标端 DataNode 所需的 trust store 证书密码 )
可通过修改 IoTDB 配置文件(iotdb-system.properties)以调整数据同步的参数,如同步数据存储目录等。完整配置如下::
V1.3.3+:
# pipe_receiver_file_dir # If this property is unset, system will save the data in the default relative path directory under the IoTDB folder(i.e., %IOTDB_HOME%/${cn_system_dir}/pipe/receiver). # If it is absolute, system will save the data in the exact location it points to. # If it is relative, system will save the data in the relative path directory it indicates under the IoTDB folder. # Note: If pipe_receiver_file_dir is assigned an empty string(i.e.,zero-size), it will be handled as a relative path. # effectiveMode: restart # For windows platform # If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is absolute. Otherwise, it is relative. # pipe_receiver_file_dir=data\\confignode\\system\\pipe\\receiver # For Linux platform # If its prefix is "/", then the path is absolute. Otherwise, it is relative. pipe_receiver_file_dir=data/confignode/system/pipe/receiver #################### ### Pipe Configuration #################### # Uncomment the following field to configure the pipe lib directory. # effectiveMode: first_start # For Windows platform # If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is # absolute. Otherwise, it is relative. # pipe_lib_dir=ext\\pipe # For Linux platform # If its prefix is "/", then the path is absolute. Otherwise, it is relative. pipe_lib_dir=ext/pipe # The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor. # The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)). # effectiveMode: restart # Datatype: int pipe_subtask_executor_max_thread_num=5 # The connection timeout (in milliseconds) for the thrift client. # effectiveMode: restart # Datatype: int pipe_sink_timeout_ms=900000 # The maximum number of selectors that can be used in the sink. # Recommend to set this value to less than or equal to pipe_sink_max_client_number. # effectiveMode: restart # Datatype: int pipe_sink_selector_number=4 # The maximum number of clients that can be used in the sink. # effectiveMode: restart # Datatype: int pipe_sink_max_client_number=16 # The total bytes that all pipe sinks can transfer per second. # When given a value less than or equal to 0, it means no limit. # default value is -1, which means no limit. # effectiveMode: hot_reload # Datatype: double pipe_all_sinks_rate_limit_bytes_per_second=-1
| 参数 | 描述 | value 取值范围 | 是否必填 | 默认取值 |
|---|---|---|---|---|
| source | iotdb-source | String: iotdb-source | 必填 | - |
| inclusion | 用于指定数据同步任务中需要同步范围,分为数据、元数据和权限 | String:all, data(insert,delete), schema(database,timeseries,ttl), auth | 选填 | data.insert |
| inclusion.exclusion | 用于从 inclusion 指定的同步范围内排除特定的操作,减少同步的数据量 | String:all, data(insert,delete), schema(database,timeseries,ttl), auth | 选填 | 空字符串 |
| mode | 用于在每个 data region 发送完毕时分别发送结束事件,并在全部 data region 发送完毕后自动 drop pipe。query:结束,subscribe:不结束。 | String: query / subscribe | 选填 | subscribe |
| path | 用于筛选待同步的时间序列及其相关元数据 / 数据的路径模式元数据同步只能用pathpath 是精确匹配,参数必须为前缀路径或完整路径,即不能含有 "*",最多在 path参数的尾部含有一个 "**" | String:IoTDB 的 pattern | 选填 | root.** |
| pattern | 用于筛选时间序列的路径前缀 | String: 任意的时间序列前缀 | 选填 | root |
| start-time | 同步所有数据的开始 event time,包含 start-time | Long: [Long.MIN_VALUE, Long.MAX_VALUE] | 选填 | Long.MIN_VALUE |
| end-time | 同步所有数据的结束 event time,包含 end-time | Long: [Long.MIN_VALUE, Long.MAX_VALUE] | 选填 | Long.MAX_VALUE |
| realtime.mode | 新插入数据(pipe 创建后)的抽取模式 | String: batch | 选填 | batch |
| forwarding-pipe-requests | 是否转发由其他 Pipe (通常是数据同步)写入的数据 | Boolean: true | 选填 | true |
| history.loose-range | tsfile传输时,是否放宽历史数据(pipe创建前)范围。““:不放宽范围,严格按照设置的条件挑选数据”time”:放宽时间范围,避免对TsFile进行拆分,可以提升同步效率“path”:放宽路径范围,避免对TsFile进行拆分,可以提升同步效率“time, path” 、 “path, time” 、“all” : 放宽所有范围,避免对TsFile进行拆分,可以提升同步效率 | String: "" 、 “time” 、 “path” 、 “time, path” 、 “path, time” 、 “all” | 选填 | "" |
| realtime.loose-range | tsfile传输时,是否放宽实时数据(pipe创建前)范围。““:不放宽范围,严格按照设置的条件挑选数据”time”:放宽时间范围,避免对TsFile进行拆分,可以提升同步效率“path”:放宽路径范围,避免对TsFile进行拆分,可以提升同步效率“time, path” 、 “path, time” 、“all” : 放宽所有范围,避免对TsFile进行拆分,可以提升同步效率 | String: "" 、 “time” 、 “path” 、 “time, path” 、 “path, time” 、 “all” | 选填 | "" |
| mods.enable | 是否发送 tsfile 的 mods 文件 | Boolean: true / false | 选填 | false |
💎 说明:为保持低版本兼容,history.enable、history.start-time、history.end-time、realtime.enable 仍可使用,但在新版本中不推荐。
💎 说明:数据抽取模式 batch 的含义
- batch:该模式下,任务将对数据进行批量(按底层数据文件)处理、发送,其特点是低时效、高吞吐
在 1.3.3 及以上的版本中,只包含sink的情况下,不再需要额外增加with sink 前缀
| key | value | value 取值范围 | 是否必填 | 默认取值 |
|---|---|---|---|---|
| sink | iotdb-thrift-sink 或 iotdb-thrift-async-sink | String: iotdb-thrift-sink 或 iotdb-thrift-async-sink | 必填 | - |
| node-urls | 目标端 IoTDB 任意多个 DataNode 节点的数据服务端口的 url(请注意同步任务不支持向自身服务进行转发) | String. 例:‘127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669’, ‘127.0.0.1:6667’ | 必填 | - |
| batch.enable | 是否开启日志攒批发送模式,用于提高传输吞吐,降低 IOPS | Boolean: true, false | 选填 | true |
| batch.max-delay-seconds | 在开启日志攒批发送模式时生效,表示一批数据在发送前的最长等待时间(单位:s) | Integer | 选填 | 1 |
| batch.size-bytes | 在开启日志攒批发送模式时生效,表示一批数据最大的攒批大小(单位:byte) | Long | 选填 | 1610241024 |
| key | value | value 取值范围 | 是否必填 | 默认取值 |
|---|---|---|---|---|
| sink | iotdb-thrift-ssl-sink | String: iotdb-thrift-ssl-sink | 必填 | - |
| node-urls | 目标端 IoTDB 任意多个 DataNode 节点的数据服务端口的 url(请注意同步任务不支持向自身服务进行转发) | String. 例:‘127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669’, ‘127.0.0.1:6667’ | 必填 | - |
| batch.enable | 是否开启日志攒批发送模式,用于提高传输吞吐,降低 IOPS | Boolean: true, false | 选填 | true |
| batch.max-delay-seconds | 在开启日志攒批发送模式时生效,表示一批数据在发送前的最长等待时间(单位:s) | Integer | 选填 | 1 |
| batch.size-bytes | 在开启日志攒批发送模式时生效,表示一批数据最大的攒批大小(单位:byte) | Long | 选填 | 1610241024 |
| ssl.trust-store-path | 连接目标端 DataNode 所需的 trust store 证书路径 | String.Example: ‘127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669’, ‘127.0.0.1:6667’ | 必填 | - |
| ssl.trust-store-pwd | 连接目标端 DataNode 所需的 trust store 证书密码 | Integer | 必填 | - |