| <!-- |
| |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| |
| --> |
| # 数据同步 |
| |
| 本文档主要为数据同步功能的SQL语句,详细功能介绍及使用说明见 [数据同步](../User-Manual/Data-Sync_timecho.md) |
| |
| ## 1. 创建任务 |
| |
| **语法:** |
| |
| ```SQL |
| CREATE PIPE [IF NOT EXISTS] <PipeId> -- PipeId 是能够唯一标定任务的名字 |
| -- 数据抽取插件,可选插件 |
| WITH SOURCE ( |
| [<parameter> = <value>,], |
| ) |
| -- 数据处理插件,可选插件 |
| WITH PROCESSOR ( |
| [<parameter> = <value>,], |
| ) |
| -- 数据连接插件,必填插件 |
| WITH SINK ( |
| [<parameter> = <value>,], |
| ) |
| ``` |
| |
| **示例一:全量数据同步** |
| |
| ```SQL |
| create pipe A2B |
| with sink ( |
| 'sink'='iotdb-thrift-sink', |
| 'node-urls' = '127.0.0.1:6668', |
| ) |
| ``` |
| |
| **示例二:部分数据同步** |
| |
| ```SQL |
| create pipe A2B |
| WITH SOURCE ( |
| 'source'= 'iotdb-source', |
| 'mode.streaming' = 'true' |
| 'database-name'='db_b.*', |
| 'start-time' = '2023.08.23T08:00:00+00:00', |
| 'end-time' = '2023.10.23T08:00:00+00:00' |
| ) |
| with SINK ( |
| 'sink'='iotdb-thrift-async-sink', |
| 'node-urls' = '127.0.0.1:6668', |
| ) |
| ``` |
| |
| **示例三:双向数据传输** |
| |
| * 在 A IoTDB 上执行下列语句 |
| |
| ```SQL |
| create pipe AB |
| with source ( |
| 'source.mode.double-living' ='true' |
| ) |
| with sink ( |
| 'sink'='iotdb-thrift-sink', |
| 'node-urls' = '127.0.0.1:6668', |
| ) |
| ``` |
| |
| * 在 B IoTDB 上执行下列语句 |
| |
| ```SQL |
| create pipe BA |
| with source ( |
| 'source.mode.double-living' ='true' |
| ) |
| with sink ( |
| 'sink'='iotdb-thrift-sink', |
| 'node-urls' = '127.0.0.1:6667', |
| ) |
| ``` |
| |
| **示例四:边云数据传输** |
| |
| * 在 B IoTDB 上执行下列语句,将 B 中数据同步至 A |
| |
| ```SQL |
| create pipe BA |
| with source ( |
| 'database-name'='db_b.*', |
| 'table-name'='.*', |
| ) |
| with sink ( |
| 'sink'='iotdb-thrift-sink', |
| 'node-urls' = '127.0.0.1:6667', |
| ) |
| ``` |
| |
| * 在 C IoTDB 上执行下列语句,将 C 中数据同步至 A |
| |
| ```SQL |
| create pipe CA |
| with source ( |
| 'database-name'='db_c.*', |
| 'table-name'='.*', |
| ) |
| with sink ( |
| 'sink'='iotdb-thrift-sink', |
| 'node-urls' = '127.0.0.1:6668', |
| ) |
| ``` |
| |
| * 在 D IoTDB 上执行下列语句,将 D 中数据同步至 A |
| |
| ```SQL |
| create pipe DA |
| with source ( |
| 'database-name'='db_d.*', |
| 'table-name'='.*', |
| ) |
| with sink ( |
| 'sink'='iotdb-thrift-sink', |
| 'node-urls' = '127.0.0.1:6669', |
| ) |
| ``` |
| |
| **示例五:级联数据传输** |
| |
| * 在 A IoTDB 上执行下列语句,将 A 中数据同步至 B |
| |
| ```SQL |
| create pipe AB |
| with sink ( |
| 'sink'='iotdb-thrift-sink', |
| 'node-urls' = '127.0.0.1:6668', |
| ) |
| ``` |
| |
| * 在 B IoTDB 上执行下列语句,将 B 中数据同步至 C |
| |
| ```SQL |
| create pipe BC |
| with source ( |
| ) |
| with sink ( |
| 'sink'='iotdb-thrift-sink', |
| 'node-urls' = '127.0.0.1:6669', |
| ) |
| ``` |
| |
| **示例六:跨网闸数据传输** |
| |
| ```SQL |
| create pipe A2B |
| with sink ( |
| 'sink'='iotdb-air-gap-sink', |
| 'node-urls' = '10.53.53.53:9780', |
| ) |
| ``` |
| |
| **示例七:压缩同步** |
| |
| ```SQL |
| create pipe A2B |
| with sink ( |
| 'node-urls' = '127.0.0.1:6668', |
| 'compressor' = 'snappy,lz4', |
| 'rate-limit-bytes-per-second'='1048576' |
| ) |
| ``` |
| |
| **示例八:加密同步** |
| |
| ```SQL |
| create pipe A2B |
| with sink ( |
| 'sink'='iotdb-thrift-ssl-sink', |
| 'node-urls'='127.0.0.1:6667', |
| 'ssl.trust-store-path'='pki/trusted', |
| 'ssl.trust-store-pwd'='root' |
| ) |
| ``` |
| |
| **示例九:本地导出 Object 类型数据** |
| |
| ```SQL |
| CREATE PIPE tsfile_export_local |
| WITH SOURCE ( |
| 'source' = 'iotdb-source', |
| 'table-name' = 'test_table' |
| ) |
| WITH PROCESSOR ( |
| 'processor' = 'do-nothing-processor' |
| ) |
| WITH SINK ( |
| 'sink' = 'tsfile-local-sink', |
| 'sink.local.target-path' = '/data/backup/export_2024' |
| 'sink.rate-limit-bytes-per-second' = '10485760' |
| ); |
| ``` |
| |
| **示例十:远程传输 Object 类型数据** |
| |
| * 该方式需提前注册 `tsfile_remote_sink` 插件 |
| |
| ```SQL |
| CREATE PIPE tsfile_export_scp |
| WITH SOURCE ( |
| 'source' = 'iotdb-source', |
| 'table-name' = 'test_table' |
| ) |
| WITH PROCESSOR ( |
| 'processor' = 'do-nothing-processor' |
| ) |
| WITH SINK ( |
| 'sink' = 'tsfile_remote_sink', |
| 'sink.file-mode' = 'scp', |
| 'sink.scp.host' = '192.168.1.100', |
| 'sink.scp.port' = '22', |
| 'sink.scp.user' = 'backup_user', |
| 'sink.scp.password' = 'ComplexPass123!', |
| 'sink.scp.remote-path' = '/remote/archive/', |
| 'sink.rate-limit-bytes-per-second' = '10485760' |
| ); |
| ``` |
| |
| ## 2. 开始任务 |
| |
| **语法:** |
| |
| ```SQL |
| START PIPE<PipeId> |
| ``` |
| |
| **示例:** |
| |
| ```SQL |
| START PIPE A2B |
| ``` |
| |
| ## 3. 停止任务 |
| |
| **语法:** |
| |
| ```SQL |
| STOP PIPE <PipeId> |
| ``` |
| |
| **示例:** |
| |
| ```SQL |
| STOP PIPE A2B |
| ``` |
| |
| ## 4. 删除任务 |
| |
| **语法:** |
| |
| ```SQL |
| DROP PIPE [IF EXISTS] <PipeId> |
| ``` |
| |
| **示例:** |
| |
| ```SQL |
| DROP PIPE IF EXISTS A2B |
| ``` |
| |
| ## 5. 查看任务 |
| |
| **语法:** |
| |
| ```SQL |
| -- 查看全部任务 |
| SHOW PIPES |
| -- 查看指定任务 |
| SHOW PIPE <PipeId> |
| ``` |
| |
| **示例:** |
| |
| ```SQL |
| SHOW PIPES |
| |
| SHOW PIPE A2B |
| ``` |
| |
| ## 6. 修改任务 |
| |
| **语法:** |
| |
| ```SQL |
| ALTER PIPE [IF EXISTS] <PipeId> |
| MODIFY/REPLACE SOURCE(...) |
| MODIFY/REPLACE PROCESSOR(...) |
| MODIFY/REPLACE SINK(...) |
| ``` |
| |
| **示例:** |
| |
| ```SQL |
| ALTER PIPE A2B REPLACE SINK ('sink'='iotdb-thrift-sink', 'node-urls' = '127.0.0.1:6668'); |
| ``` |