同步工具是持续将边缘端(发送端) IoTDB 中的时间序列数据上传并加载至云端(接收端) IoTDB 的套件工具。
同步工具的发送端内嵌于 IoTDB 的引擎,使用同步工具需要首先启动IoTDB。
可以在发送端使用 SQL 命令来启动或者关闭一个同步任务,并且可以随时查看同步任务的状态。在接收端,您可以通过设置IP白名单来规定准入IP地址范围。

假设目前有两台机器A和B都安装了IoTDB,希望将A上的数据不断同步至B中。为了更好地描述这个过程,我们引入以下概念。
root.sg.d.s,发送端B也包括路径root.sg.d.s,当发送端A删除root.sg存储组时将也会在接收端删除所有B在接收端的root.sg.d.s中存放的数据。在发送端和接收端执行如下语句即可快速开始两个 IoTDB 之间的数据同步,完整的 SQL 语句和配置事项请查看配置参数和SQL两节,更多使用范例请参考使用范例节。
IoTDB> START PIPESERVER
IOTDB> STOP PIPESERVER
IoTDB> CREATE PIPESINK my_iotdb AS IoTDB (ip='输入你的IP')
IoTDB> CREATE PIPE my_pipe TO my_iotdb
IoTDB> START PIPE my_pipe
IoTDB> SHOW PIPES
IoTDB> STOP PIPE my_pipe
IoTDB> START PIPE my_pipe
IoTDB> DROP PIPE my_pipe
所有参数修改均在$IOTDB_HOME$/conf/iotdb-engine.properties中,所有修改完成之后执行load configuration之后即可立刻生效。
| 参数名 | max_number_of_sync_file_retry |
|---|---|
| 描述 | 发送端同步文件到接收端失败时的最大重试次数 |
| 类型 | Int : [0,2147483647] |
| 默认值 | 5 |
| 参数名 | ip_white_list |
|---|---|
| 描述 | 设置同步功能发送端 IP 地址的白名单,以网段的形式表示,多个网段之间用逗号分隔。发送端向接收端同步数据时,只有当该发送端 IP 地址处于该白名单设置的网段范围内,接收端才允许同步操作。如果白名单为空,则接收端不允许任何发送端同步数据。默认接收端接受全部 IP 的同步请求。 |
| 类型 | String |
| 默认值 | 0.0.0.0/0 |
| 参数名 | pipe_server_port |
|---|---|
| 描述 | 同步接收端服务器监听接口,请确认该端口不是系统保留端口并且未被占用。 |
| 类型 | Short Int : [0,65535] |
| 默认值 | 6670 |
IoTDB> CREATE PIPESINK <PipeSinkName> AS IoTDB [(ip='127.0.0.1',port=6670);]
IoTDB> SHOW PIPESINKTYPE IoTDB> +-----+ | type| +-----+ |IoTDB| +-----+
IoTDB> SHOW PIPESINKS IoTDB> SHOW PIPESINK [PipeSinkName] IoTDB> +-----------+-----+------------------------+ | name| type| attributes| +-----------+-----+------------------------+ |my_pipesink|IoTDB|ip='127.0.0.1',port=6670| +-----------+-----+------------------------+
IoTDB> DROP PIPESINK <PipeSinkName>
**(即所有序列中的数据),FROM语句目前仅支持root,Where语句仅支持指定time的起始时间SyncDelOp参数为true时会同步删除数据操作,否则不同步删除数据操作IoTDB> CREATE PIPE my_pipe TO my_iotdb [FROM (select ** from root WHERE time>=yyyy-mm-dd HH:MM:SS)] [WITH SyncDelOp=true]
该指令在发送端和接收端均可执行
create time,pipe的创建时间
name,pipe的名字
role,当前IoTDB在pipe中的角色,可能有两种角色:
sender,当前IoTDB为同步发送端
receiver,当前IoTDB为同步接收端
remote,pipe的对端信息
当role为receiver时,这一字段值为发送端ip
当role为sender时,这一字段值为pipeSink名称
status,pipe状态
message,pipe运行信息,当pipe正常运行时,这一字段通常为空,当出现异常时,可能出现两种状态:
IoTDB> SHOW PIPES IoTDB> +-----------------------+--------+--------+-------------+---------+-------+ | create time| name | role| remote| status|message| +-----------------------+--------+--------+-------------+---------+-------+ |2022-03-30T20:58:30.689|my_pipe1| sender| my_pipesink| STOP| | +-----------------------+--------+--------+-------------+---------+-------+ |2022-03-31T12:55:28.129|my_pipe2|receiver|192.168.11.11| RUNNING| | +-----------------------+--------+--------+-------------+---------+-------+
SHOW PIPES等效IoTDB> SHOW PIPE [PipeName]
IoTDB> STOP PIPE <PipeName>
IoTDB> START PIPE <PipeName>
IoTDB> DROP PIPE <PipeName>
IoTDB> START PIPESERVER
IoTDB> STOP PIPESERVER
IoTDB> SHOW PIPESERVER +----------+ | enalbe| +----------+ |true/false| +----------+
vi conf/iotdb-engine.properties 配置云端参数,将白名单设置为仅接收来自IP为 192.168.0.1的边端的数据#################### ### PIPE Server Configuration #################### # PIPE server port to listen # Datatype: int # pipe_server_port=6670 # White IP list of Sync client. # Please use the form of network segment to present the range of IP, for example: 192.168.0.0/16 # If there are more than one IP segment, please separate them by commas # The default is to allow all IP to sync # Datatype: String ip_white_list=192.168.0.1/32
IoTDB> START PIPESERVER
IoTDB> SHOW PIPESERVER
max_number_of_sync_file_retry参数设置为10#################### ### PIPE Sender Configuration #################### # The maximum number of retry when syncing a file to receiver fails. max_number_of_sync_file_retry=10
IoTDB> CREATE PIPESINK my_iotdb AS IoTDB (ip='192.168.0.1',port=6670)
IoTDB> CREATE PIPE p TO my_iotdb FROM (select ** from root where time>=2022-03-30 00:00:00) WITH SyncDelOp=false
IoTDB> START PIPE p
IoTDB> SHOW PIPE p
在发送端执行以下SQL
SET STORAGE GROUP TO root.vehicle; CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE; CREATE TIMESERIES root.vehicle.d0.s1 WITH DATATYPE=TEXT, ENCODING=PLAIN; CREATE TIMESERIES root.vehicle.d1.s2 WITH DATATYPE=FLOAT, ENCODING=RLE; CREATE TIMESERIES root.vehicle.d1.s3 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN; insert into root.vehicle.d0(timestamp,s0) values(now(),10); insert into root.vehicle.d0(timestamp,s0,s1) values(now(),12,'12'); insert into root.vehicle.d0(timestamp,s1) values(now(),'14'); insert into root.vehicle.d1(timestamp,s2) values(now(),16.0); insert into root.vehicle.d1(timestamp,s2,s3) values(now(),18.0,true); insert into root.vehicle.d1(timestamp,s3) values(now(),false); flush;
在发送端和接受端执行查询,可查询到相同的结果
IoTDB> select ** from root.vehicle +-----------------------------+------------------+------------------+------------------+------------------+ | Time|root.vehicle.d0.s0|root.vehicle.d0.s1|root.vehicle.d1.s3|root.vehicle.d1.s2| +-----------------------------+------------------+------------------+------------------+------------------+ |2022-04-03T20:08:17.127+08:00| 10| null| null| null| |2022-04-03T20:08:17.358+08:00| 12| 12| null| null| |2022-04-03T20:08:17.393+08:00| null| 14| null| null| |2022-04-03T20:08:17.538+08:00| null| null| null| 16.0| |2022-04-03T20:08:17.753+08:00| null| null| true| 18.0| |2022-04-03T20:08:18.263+08:00| null| null| false| null| +-----------------------------+------------------+------------------+------------------+------------------+ Total line number = 6 It costs 0.134s
执行
STOP PIPESERVER
关闭本地的 IoTDB Pipe Server 时提示
Msg: 328: Failed to stop pipe server because there is pipe still running.
原因:接收端有正在运行的同步任务
解决方案:在发送端先执行 STOP PIPE PipeName 停止任务,后关闭 IoTDB Pipe Server
执行
CREATE PIPE mypipe
提示
Msg: 411: Create transport for pipe mypipe error, because CREATE request connects to receiver 127.0.0.1:6670 error..
SHOW PIPESERVER 检查是否启动接收端,若未启动使用 START PIPESERVER 启动;检查接收端iotdb-engine.properties中的白名单是否包含发送端ip。执行
DROP PIPESINK pipesinkName
提示
Msg: 411: Can not drop pipeSink demo, because pipe mypipe is using it.
SHOW PIPE,停止使用该 PipeSink 的 PIPE发送端创建 PIPE 提示
Msg: 411: Pipe p is RUNNING, please retry after drop it.
DROP PIPE p 后重试