数据同步

本文档主要为数据同步功能的SQL语句,详细功能介绍及使用说明见 数据同步

1. 创建任务

语法:

CREATE PIPE [IF NOT EXISTS] <PipeId> -- PipeId 是能够唯一标定任务的名字
-- 数据抽取插件,可选插件
WITH SOURCE (
  [<parameter> = <value>,],
)
-- 数据处理插件,可选插件
WITH PROCESSOR (
  [<parameter> = <value>,],
)
-- 数据连接插件,必填插件
WITH SINK (
  [<parameter> = <value>,],
)

示例一:全量数据同步

create pipe A2B
with sink (
  'sink'='iotdb-thrift-sink',
  'node-urls' = '127.0.0.1:6668', 
)

示例二:部分数据同步

create pipe A2B
WITH SOURCE (
  'source'= 'iotdb-source',
  'mode.streaming' = 'true'  
  'database-name'='db_b.*', 
  'start-time' = '2023.08.23T08:00:00+00:00', 
  'end-time' = '2023.10.23T08:00:00+00:00'
) 
with SINK (
  'sink'='iotdb-thrift-async-sink',
  'node-urls' = '127.0.0.1:6668', 
)

示例三:边云数据传输

  • 在 B IoTDB 上执行下列语句,将 B 中数据同步至 A
create pipe BA
with source (
   'database-name'='db_b.*', 
   'table-name'='.*', 
)
with sink (
  'sink'='iotdb-thrift-sink',
  'node-urls' = '127.0.0.1:6667', 
)
  • 在 C IoTDB 上执行下列语句,将 C 中数据同步至 A
create pipe CA
with source (
   'database-name'='db_c.*', 
   'table-name'='.*', 
)
with sink (
  'sink'='iotdb-thrift-sink',
  'node-urls' = '127.0.0.1:6668', 
)
  • 在 D IoTDB 上执行下列语句,将 D 中数据同步至 A
create pipe DA
with source (
   'database-name'='db_d.*', 
   'table-name'='.*', 
)
with sink (
  'sink'='iotdb-thrift-sink',
  'node-urls' = '127.0.0.1:6669', 
)

示例四:级联数据传输

  • 在 A IoTDB 上执行下列语句,将 A 中数据同步至 B
create pipe AB
with sink (
  'sink'='iotdb-thrift-sink',
  'node-urls' = '127.0.0.1:6668', 
)
  • 在 B IoTDB 上执行下列语句,将 B 中数据同步至 C
create pipe BC
with source (
)
with sink (
  'sink'='iotdb-thrift-sink',
  'node-urls' = '127.0.0.1:6669', 
)

示例五:压缩同步

create pipe A2B 
with sink (
 'node-urls' = '127.0.0.1:6668', 
 'compressor' = 'snappy,lz4',
 'rate-limit-bytes-per-second'='1048576' 
)

示例六:加密同步

create pipe A2B
with sink (
  'sink'='iotdb-thrift-ssl-sink',
  'node-urls'='127.0.0.1:6667',  
  'ssl.trust-store-path'='pki/trusted', 
  'ssl.trust-store-pwd'='root' 
)

2. 开始任务

语法:

START PIPE<PipeId>

示例:

START PIPE A2B

3. 停止任务

语法:

STOP PIPE <PipeId>

示例:

STOP PIPE A2B

4. 删除任务

语法:

DROP PIPE [IF EXISTS] <PipeId>

示例:

DROP PIPE IF EXISTS A2B

5. 查看任务

语法:

-- 查看全部任务
SHOW PIPES
-- 查看指定任务
SHOW PIPE <PipeId>

示例:

SHOW PIPES

SHOW PIPE A2B

6. 修改任务

语法:

ALTER PIPE [IF EXISTS] <PipeId>
    MODIFY/REPLACE SOURCE(...)
    MODIFY/REPLACE PROCESSOR(...)
    MODIFY/REPLACE SINK(...)

示例:

ALTER PIPE A2B REPLACE SINK ('sink'='iotdb-thrift-sink', 'node-urls' = '127.0.0.1:6668');