同步工具是持续将边缘端(发送端) IoTDB 中的时间序列数据上传并加载至云端(接收端) IoTDB 的套件工具。
IoTDB 同步工具内嵌于 IoTDB 引擎,与下游接收端相连,下游支持 IoTDB(单机/集群)。
可以在发送端使用 SQL 命令来启动或者关闭一个同步任务,并且可以随时查看同步任务的状态。在接收端,您可以通过设置 IP 白名单来规定准入 IP 地址范围。
TsFile 同步工具实现了数据从 “流入-> IoTDB ->流出” 的闭环。假设目前有两台机器A和B都安装了IoTDB,希望将 A 上的数据不断同步至 B 中。为了更好地描述这个过程,我们引入以下概念。
root.sg.d.s,发送端B也包括路径root.sg.d.s,当发送端A删除root.sg database 时将也会在接收端删除所有B在接收端的root.sg.d.s中存放的数据。SHOW DATANODES 中所有处于 Running 状态的 DataNode 节点均可连通,否则将执行失败。在发送端和接收端执行如下语句即可快速开始两个 IoTDB 之间的数据同步,完整的 SQL 语句和配置事项请查看配置参数和SQL两节,更多使用范例请参考使用范例节。
IoTDB> CREATE PIPESINK my_iotdb AS IoTDB (ip='接收端IP', port='接收端端口')
IoTDB> CREATE PIPE my_pipe TO my_iotdb
IoTDB> START PIPE my_pipe
IoTDB> SHOW PIPES
IoTDB> STOP PIPE my_pipe
IoTDB> START PIPE my_pipe
IoTDB> DROP PIPE my_pipe
所有参数修改均在$IOTDB_HOME$/conf/iotdb-common.properties中,所有修改完成之后执行load configuration之后即可立刻生效。
| 参数名 | max_number_of_sync_file_retry |
|---|---|
| 描述 | 发送端同步文件到接收端失败时的最大重试次数 |
| 类型 | Int : [0,2147483647] |
| 默认值 | 5 |
| 参数名 | ip_white_list |
|---|---|
| 描述 | 设置同步功能发送端 IP 地址的白名单,以网段的形式表示,多个网段之间用逗号分隔。发送端向接收端同步数据时,只有当该发送端 IP 地址处于该白名单设置的网段范围内,接收端才允许同步操作。如果白名单为空,则接收端不允许任何发送端同步数据。默认接收端拒绝除了本地以外的全部 IP 的同步请求。 对该参数进行配置时,需要保证发送端所有 DataNode 地址均被覆盖。 |
| 类型 | String |
| 默认值 | 127.0.0.1/32 |
IoTDB> SHOW PIPESINKTYPE IoTDB> +-----+ | type| +-----+ |IoTDB| +-----+
rpc_address 与 rpc_port。IoTDB> CREATE PIPESINK <PipeSinkName> AS IoTDB [(ip='127.0.0.1',port=6667);]
IoTDB> DROP PIPESINK <PipeSinkName>
IoTDB> SHOW PIPESINKS IoTDB> SHOW PIPESINK [PipeSinkName] IoTDB> +-----------+-----+------------------------+ | name| type| attributes| +-----------+-----+------------------------+ |my_pipesink|IoTDB|ip='127.0.0.1',port=6667| +-----------+-----+------------------------+
**(即所有序列中的数据),from 语句目前仅支持root,where语句仅支持指定 time 的起始时间。起始时间的指定形式可以是 yyyy-mm-dd HH:MM:SS或时间戳。IoTDB> CREATE PIPE my_pipe TO my_iotdb [FROM (select ** from root WHERE time>=yyyy-mm-dd HH:MM:SS)]
IoTDB> STOP PIPE <PipeName>
IoTDB> START PIPE <PipeName>
IoTDB> DROP PIPE <PipeName>
该指令在发送端和接收端均可执行
显示所有同步任务状态
create time:Pipe 的创建时间
name:Pipe 的名字
role:当前 IoTDB 在 Pipe 中的角色,可能有两种角色:
remote:Pipe的对端信息
status:Pipe状态
attributes:Pipe属性
message:Pipe运行信息,当 Pipe 正常运行时,这一字段通常为NORMAL,当出现异常时,可能出现两种状态:
IoTDB> SHOW PIPES IoTDB> +-----------------------+--------+--------+-------------+---------+------------------------------------+-------+ | create time| name | role| remote| status| attributes|message| +-----------------------+--------+--------+-------------+---------+------------------------------------+-------+ |2022-03-30T20:58:30.689|my_pipe1| sender| my_pipesink| STOP|SyncDelOp=false,DataStartTimestamp=0| NORMAL| +-----------------------+--------+--------+-------------+---------+------------------------------------+-------+ |2022-03-31T12:55:28.129|my_pipe2|receiver|192.168.11.11| RUNNING| Database='root.vehicle'| NORMAL| +-----------------------+--------+--------+-------------+---------+------------------------------------+-------+
SHOW PIPES等效IoTDB> SHOW PIPE [PipeName]
vi conf/iotdb-common.properties 配置云端参数,将白名单设置为仅接收来自 IP 为 192.168.0.1 的边端的数据
#################### ### PIPE Server Configuration #################### # White IP list of Sync client. # Please use the form of network segment to present the range of IP, for example: 192.168.0.0/16 # If there are more than one IP segment, please separate them by commas # The default is to allow all IP to sync # Datatype: String ip_white_list=192.168.0.1/32
创建云端 PipeSink,指定类型为 IoTDB,指定云端 IP 地址为 192.168.0.1,指定云端的 PipeServer 服务端口为6667
IoTDB> CREATE PIPESINK my_iotdb AS IoTDB (ip='192.168.0.1',port=6667)
创建Pipe,指定连接到my_iotdb的PipeSink,在WHREE子句中输入开始时间点2022年3月30日0时。以下两条执行语句等价。
IoTDB> CREATE PIPE p TO my_iotdb FROM (select ** from root where time>=2022-03-30 00:00:00) IoTDB> CREATE PIPE p TO my_iotdb FROM (select ** from root where time>= 1648569600000)
启动Pipe
IoTDB> START PIPE p
显示同步任务状态
IoTDB> SHOW PIPE p
在发送端执行以下 SQL
CREATE DATABASE root.vehicle; CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE; CREATE TIMESERIES root.vehicle.d0.s1 WITH DATATYPE=TEXT, ENCODING=PLAIN; CREATE TIMESERIES root.vehicle.d1.s2 WITH DATATYPE=FLOAT, ENCODING=RLE; CREATE TIMESERIES root.vehicle.d1.s3 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN; insert into root.vehicle.d0(timestamp,s0) values(now(),10); insert into root.vehicle.d0(timestamp,s0,s1) values(now(),12,'12'); insert into root.vehicle.d0(timestamp,s1) values(now(),'14'); insert into root.vehicle.d1(timestamp,s2) values(now(),16.0); insert into root.vehicle.d1(timestamp,s2,s3) values(now(),18.0,true); insert into root.vehicle.d1(timestamp,s3) values(now(),false); flush;
在发送端和接受端执行查询,可查询到相同的结果
IoTDB> select ** from root.vehicle +-----------------------------+------------------+------------------+------------------+------------------+ | Time|root.vehicle.d0.s0|root.vehicle.d0.s1|root.vehicle.d1.s3|root.vehicle.d1.s2| +-----------------------------+------------------+------------------+------------------+------------------+ |2022-04-03T20:08:17.127+08:00| 10| null| null| null| |2022-04-03T20:08:17.358+08:00| 12| 12| null| null| |2022-04-03T20:08:17.393+08:00| null| 14| null| null| |2022-04-03T20:08:17.538+08:00| null| null| null| 16.0| |2022-04-03T20:08:17.753+08:00| null| null| true| 18.0| |2022-04-03T20:08:18.263+08:00| null| null| false| null| +-----------------------------+------------------+------------------+------------------+------------------+ Total line number = 6 It costs 0.134s
CREATE PIPESINK demo as IoTDB 提示 PIPESINK [demo] already exists in IoTDB.DROP PIPESINK pipesinkName 提示 Can not drop PIPESINK [demo], because PIPE [mypipe] is using it.SHOW PIPE,停止使用该 PipeSink 的 PIPECREATE PIPE p to demo 提示 PIPE [p] is STOP, please retry after drop it.CREATE PIPE p to demo提示 Fail to create PIPE [p] because Connection refused on DataNode: {id=2, internalEndPoint=TEndPoint(ip:127.0.0.1, port:10732)}.SHOW DATANODES 语句,检查无法连通的 DataNode 网络,或等待其状态变为 Unknown 后重新执行语句。START PIPE p 提示 Fail to start PIPE [p] because Connection refused on DataNode: {id=2, internalEndPoint=TEndPoint(ip:127.0.0.1, port:10732)}.SHOW DATANODES 语句,检查无法连通的 DataNode 网络,或等待其状态变为 Unknown 后重新执行语句。STOP PIPE p 提示 Fail to stop PIPE [p] because Connection refused on DataNode: {id=2, internalEndPoint=TEndPoint(ip:127.0.0.1, port:10732)}.SHOW DATANODES 语句,检查无法连通的 DataNode 网络,或等待其状态变为 Unknown 后重新执行语句。DROP PIPE p 提示 Fail to DROP_PIPE because Fail to drop PIPE [p] because Connection refused on DataNode: {id=2, internalEndPoint=TEndPoint(ip:127.0.0.1, port:10732)}. Please execute [DROP PIPE p] later to retry.SHOW DATANODES 语句,检查无法连通的 DataNode 网络,或等待其状态变为 Unknown 后重新执行语句。org.apache.iotdb.commons.exception.IoTDBException: root.** already been created as database