blob: 13d050c9bdca939169594f9db07fe6badf833f0a [file] [view]
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# 数据同步
本文档主要为数据同步功能的SQL语句,详细功能介绍及使用说明见 [数据同步](../User-Manual/Data-Sync_apache.md)
## 1. 创建任务
**语法:**
```SQL
CREATE PIPE [IF NOT EXISTS] <PipeId> -- PipeId 是能够唯一标定任务的名字
-- 数据抽取插件,可选插件
WITH SOURCE (
[<parameter> = <value>,],
)
-- 数据处理插件,可选插件
WITH PROCESSOR (
[<parameter> = <value>,],
)
-- 数据连接插件,必填插件
WITH SINK (
[<parameter> = <value>,],
)
```
**示例一:全量数据同步**
```SQL
create pipe A2B
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6668',
)
```
**示例二:部分数据同步**
```SQL
create pipe A2B
WITH SOURCE (
'source'= 'iotdb-source',
'mode.streaming' = 'true'
'database-name'='db_b.*',
'start-time' = '2023.08.23T08:00:00+00:00',
'end-time' = '2023.10.23T08:00:00+00:00'
)
with SINK (
'sink'='iotdb-thrift-async-sink',
'node-urls' = '127.0.0.1:6668',
)
```
**示例三:边云数据传输**
* 在 B IoTDB 上执行下列语句,将 B 中数据同步至 A
```SQL
create pipe BA
with source (
'database-name'='db_b.*',
'table-name'='.*',
)
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6667',
)
```
* 在 C IoTDB 上执行下列语句,将 C 中数据同步至 A
```SQL
create pipe CA
with source (
'database-name'='db_c.*',
'table-name'='.*',
)
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6668',
)
```
* 在 D IoTDB 上执行下列语句,将 D 中数据同步至 A
```SQL
create pipe DA
with source (
'database-name'='db_d.*',
'table-name'='.*',
)
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6669',
)
```
**示例四:级联数据传输**
* 在 A IoTDB 上执行下列语句,将 A 中数据同步至 B
```SQL
create pipe AB
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6668',
)
```
* 在 B IoTDB 上执行下列语句,将 B 中数据同步至 C
```SQL
create pipe BC
with source (
)
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6669',
)
```
**示例五:压缩同步**
```SQL
create pipe A2B
with sink (
'node-urls' = '127.0.0.1:6668',
'compressor' = 'snappy,lz4',
'rate-limit-bytes-per-second'='1048576'
)
```
**示例六:加密同步**
```SQL
create pipe A2B
with sink (
'sink'='iotdb-thrift-ssl-sink',
'node-urls'='127.0.0.1:6667',
'ssl.trust-store-path'='pki/trusted',
'ssl.trust-store-pwd'='root'
)
```
## 2. 开始任务
**语法:**
```SQL
START PIPE<PipeId>
```
**示例:**
```SQL
START PIPE A2B
```
## 3. 停止任务
**语法:**
```SQL
STOP PIPE <PipeId>
```
**示例:**
```SQL
STOP PIPE A2B
```
## 4. 删除任务
**语法:**
```SQL
DROP PIPE [IF EXISTS] <PipeId>
```
**示例:**
```SQL
DROP PIPE IF EXISTS A2B
```
## 5. 查看任务
**语法:**
```SQL
-- 查看全部任务
SHOW PIPES
-- 查看指定任务
SHOW PIPE <PipeId>
```
**示例:**
```SQL
SHOW PIPES
SHOW PIPE A2B
```
## 6. 修改任务
**语法:**
```SQL
ALTER PIPE [IF EXISTS] <PipeId>
MODIFY/REPLACE SOURCE(...)
MODIFY/REPLACE PROCESSOR(...)
MODIFY/REPLACE SINK(...)
```
**示例:**
```SQL
ALTER PIPE A2B REPLACE SINK ('sink'='iotdb-thrift-sink', 'node-urls' = '127.0.0.1:6668');
```