数据同步

本文档主要为数据同步功能的SQL语句,详细功能介绍及使用说明见 数据同步

1. 创建任务

语法:

CREATE PIPE [IF NOT EXISTS] <PipeId> -- PipeId 是能够唯一标定任务的名字
-- 数据抽取插件,可选插件
WITH SOURCE (
  [<parameter> = <value>,],
)
-- 数据处理插件,可选插件
WITH PROCESSOR (
  [<parameter> = <value>,],
)
-- 数据连接插件,必填插件
WITH SINK (
  [<parameter> = <value>,],
)

示例一:全量数据同步

create pipe A2B
with sink (
  'sink'='iotdb-thrift-sink',
  'node-urls' = '127.0.0.1:6668', 
)

示例二:部分数据同步

create pipe A2B
WITH SOURCE (
  'source'= 'iotdb-source',
  'mode.streaming' = 'true'  
  'database-name'='db_b.*', 
  'start-time' = '2023.08.23T08:00:00+00:00', 
  'end-time' = '2023.10.23T08:00:00+00:00'
) 
with SINK (
  'sink'='iotdb-thrift-async-sink',
  'node-urls' = '127.0.0.1:6668', 
)

示例三:双向数据传输

  • 在 A IoTDB 上执行下列语句
create pipe AB
with source (
  'source.mode.double-living' ='true'
)
with sink (
  'sink'='iotdb-thrift-sink',
  'node-urls' = '127.0.0.1:6668', 
)
  • 在 B IoTDB 上执行下列语句
create pipe BA
with source (
  'source.mode.double-living' ='true' 
)
with sink (
  'sink'='iotdb-thrift-sink',
  'node-urls' = '127.0.0.1:6667', 
)

示例四:边云数据传输

  • 在 B IoTDB 上执行下列语句,将 B 中数据同步至 A
create pipe BA
with source (
   'database-name'='db_b.*', 
   'table-name'='.*', 
)
with sink (
  'sink'='iotdb-thrift-sink',
  'node-urls' = '127.0.0.1:6667', 
)
  • 在 C IoTDB 上执行下列语句,将 C 中数据同步至 A
create pipe CA
with source (
   'database-name'='db_c.*', 
   'table-name'='.*', 
)
with sink (
  'sink'='iotdb-thrift-sink',
  'node-urls' = '127.0.0.1:6668', 
)
  • 在 D IoTDB 上执行下列语句,将 D 中数据同步至 A
create pipe DA
with source (
   'database-name'='db_d.*', 
   'table-name'='.*', 
)
with sink (
  'sink'='iotdb-thrift-sink',
  'node-urls' = '127.0.0.1:6669', 
)

示例五:级联数据传输

  • 在 A IoTDB 上执行下列语句,将 A 中数据同步至 B
create pipe AB
with sink (
  'sink'='iotdb-thrift-sink',
  'node-urls' = '127.0.0.1:6668', 
)
  • 在 B IoTDB 上执行下列语句,将 B 中数据同步至 C
create pipe BC
with source (
)
with sink (
  'sink'='iotdb-thrift-sink',
  'node-urls' = '127.0.0.1:6669', 
)

示例六:跨网闸数据传输

create pipe A2B
with sink (
  'sink'='iotdb-air-gap-sink',
  'node-urls' = '10.53.53.53:9780', 
)

示例七:压缩同步

create pipe A2B 
with sink (
 'node-urls' = '127.0.0.1:6668', 
 'compressor' = 'snappy,lz4',
 'rate-limit-bytes-per-second'='1048576' 
)

示例八:加密同步

create pipe A2B
with sink (
  'sink'='iotdb-thrift-ssl-sink',
  'node-urls'='127.0.0.1:6667',  
  'ssl.trust-store-path'='pki/trusted', 
  'ssl.trust-store-pwd'='root' 
)

示例九:本地导出 Object 类型数据

CREATE PIPE tsfile_export_local
WITH SOURCE (                           
  'source' = 'iotdb-source',
  'table-name' = 'test_table'
)
WITH PROCESSOR (
  'processor' = 'do-nothing-processor'
)
WITH SINK (
  'sink' = 'tsfile-local-sink',                         
  'sink.local.target-path' = '/data/backup/export_2024' 
  'sink.rate-limit-bytes-per-second' = '10485760'      
);

示例十:远程传输 Object 类型数据

  • 该方式需提前注册 tsfile_remote_sink 插件
CREATE PIPE tsfile_export_scp
WITH SOURCE (
  'source' = 'iotdb-source',
  'table-name' = 'test_table'                        
)
WITH PROCESSOR (
  'processor' = 'do-nothing-processor'
)
WITH SINK (
  'sink' = 'tsfile_remote_sink',
  'sink.file-mode' = 'scp',                          
  'sink.scp.host' = '192.168.1.100',                 
  'sink.scp.port' = '22',                            
  'sink.scp.user' = 'backup_user',                  
  'sink.scp.password' = 'ComplexPass123!',           
  'sink.scp.remote-path' = '/remote/archive/',       
  'sink.rate-limit-bytes-per-second' = '10485760'    
);

2. 开始任务

语法:

START PIPE<PipeId>

示例:

START PIPE A2B

3. 停止任务

语法:

STOP PIPE <PipeId>

示例:

STOP PIPE A2B

4. 删除任务

语法:

DROP PIPE [IF EXISTS] <PipeId>

示例:

DROP PIPE IF EXISTS A2B

5. 查看任务

语法:

-- 查看全部任务
SHOW PIPES
-- 查看指定任务
SHOW PIPE <PipeId>

示例:

SHOW PIPES

SHOW PIPE A2B

6. 修改任务

语法:

ALTER PIPE [IF EXISTS] <PipeId>
    MODIFY/REPLACE SOURCE(...)
    MODIFY/REPLACE PROCESSOR(...)
    MODIFY/REPLACE SINK(...)

示例:

ALTER PIPE A2B REPLACE SINK ('sink'='iotdb-thrift-sink', 'node-urls' = '127.0.0.1:6668');