blob: 34f836300237c02b4c798e6b944583e83924c9e9 [file] [log] [blame] [view]
# 模式演进
模式演进是指数据表的Schema可以改变,数据同步任务可以自动适应新的表结构的变化而无需其他操作。
## 已支持的引擎
- Zeta
## 已支持的模式变更事件类型
- `ADD COLUMN`
- `DROP COLUMN`
- `RENAME COLUMN`
- `MODIFY COLUMN`
## 已支持的连接器
### 源
[Mysql-CDC](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/MySQL-CDC.md)
[Oracle-CDC](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Oracle-CDC.md)
### 目标
[Jdbc-Mysql](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Jdbc.md)
[Jdbc-Oracle](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Jdbc.md)
[Jdbc-Postgres](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Jdbc.md)
[Jdbc-Dameng](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Jdbc.md)
[Jdbc-SqlServer](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)
[StarRocks](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/StarRocks.md)
[Doris](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Doris.md)
[Paimon](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Paimon.md#模式演变)
[Elasticsearch](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Elasticsearch.md#模式演变)
注意:
* 目前模式演进不支持transform。不同类型数据库(Oracle-CDC -> Jdbc-Mysql)的模式演进目前不支持ddl中列的默认值。
* 当你使用Oracle-CDC时,你不能使用用户名`SYS``SYSTEM`来修改表结构,否则ddl事件将被过滤,这可能导致模式演进不起作用;
另外,如果你的表名以`ORA_TEMP_`开头,也会有相同的问题。
* 早期版本的`达梦`数据库不支持将`Varchar`类型字段更改为`Text`类型字段。
## 启用Schema evolution功能
CDC源连接器中模式演进默认是关闭的。你需要在CDC连接器中配置`schema-changes.enabled = true`来启用它。
## 示例
### Mysql-CDC -> Jdbc-Mysql
```
env {
# You can set engine configuration here
parallelism = 5
job.mode = "STREAMING"
checkpoint.interval = 5000
read_limit.bytes_per_second=7000000
read_limit.rows_per_second=400
}
source {
MySQL-CDC {
server-id = 5652-5657
username = "st_user_source"
password = "mysqlpw"
table-names = ["shop.products"]
url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
schema-changes.enabled = true
}
}
sink {
jdbc {
url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
driver = "com.mysql.cj.jdbc.Driver"
user = "st_user_sink"
password = "mysqlpw"
generate_sink_sql = true
database = shop
table = mysql_cdc_e2e_sink_table_with_schema_change_exactly_once
primary_keys = ["id"]
is_exactly_once = true
xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource"
}
}
```
### Oracle-cdc -> Jdbc-Oracle
```
env {
# You can set engine configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
Oracle-CDC {
plugin_output = "customers"
username = "dbzuser"
password = "dbz"
database-names = ["ORCLCDB"]
schema-names = ["DEBEZIUM"]
table-names = ["ORCLCDB.DEBEZIUM.FULL_TYPES"]
url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
source.reader.close.timeout = 120000
connection.pool.size = 1
schema-changes.enabled = true
}
}
sink {
Jdbc {
plugin_input = "customers"
driver = "oracle.jdbc.driver.OracleDriver"
url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
user = "dbzuser"
password = "dbz"
generate_sink_sql = true
database = "ORCLCDB"
table = "DEBEZIUM.FULL_TYPES_SINK"
batch_size = 1
primary_keys = ["ID"]
connection.pool.size = 1
}
}
```
### Oracle-cdc -> Jdbc-Mysql
```
env {
# You can set engine configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
Oracle-CDC {
plugin_output = "customers"
username = "dbzuser"
password = "dbz"
database-names = ["ORCLCDB"]
schema-names = ["DEBEZIUM"]
table-names = ["ORCLCDB.DEBEZIUM.FULL_TYPES"]
url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
source.reader.close.timeout = 120000
connection.pool.size = 1
schema-changes.enabled = true
}
}
sink {
jdbc {
plugin_input = "customers"
url = "jdbc:mysql://oracle-host:3306/oracle_sink"
driver = "com.mysql.cj.jdbc.Driver"
user = "st_user_sink"
password = "mysqlpw"
generate_sink_sql = true
# You need to configure both database and table
database = oracle_sink
table = oracle_cdc_2_mysql_sink_table
primary_keys = ["ID"]
}
}
```
### Mysql-cdc -> StarRocks
```
env {
# You can set engine configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
MySQL-CDC {
username = "st_user_source"
password = "mysqlpw"
table-names = ["shop.products"]
url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
schema-changes.enabled = true
}
}
sink {
StarRocks {
nodeUrls = ["starrocks_cdc_e2e:8030"]
username = "root"
password = ""
database = "shop"
table = "${table_name}"
url = "jdbc:mysql://starrocks_cdc_e2e:9030/shop"
max_retries = 3
enable_upsert_delete = true
schema_save_mode="RECREATE_SCHEMA"
data_save_mode="DROP_DATA"
save_mode_create_template = """
CREATE TABLE IF NOT EXISTS shop.`${table_name}` (
${rowtype_primary_key},
${rowtype_fields}
) ENGINE=OLAP
PRIMARY KEY (${rowtype_primary_key})
DISTRIBUTED BY HASH (${rowtype_primary_key})
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"enable_persistent_index" = "true",
"replicated_storage" = "true",
"compression" = "LZ4"
)
"""
}
}
```
### Mysql-CDC -> Doris
```
env {
# You can set engine configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
MySQL-CDC {
server-id = 5652-5657
username = "st_user_source"
password = "mysqlpw"
table-names = ["shop.products"]
url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
schema-changes.enabled = true
}
}
sink {
Doris {
fenodes = "doris_e2e:8030"
username = "root"
password = ""
database = "shop"
table = "products"
sink.label-prefix = "test-cdc"
sink.enable-2pc = "true"
sink.enable-delete = "true"
doris.config {
format = "json"
read_json_by_line = "true"
}
}
}
```
### Mysql-CDC -> Jdbc-Postgres
```hocon
env {
# You can set engine configuration here
parallelism = 5
job.mode = "STREAMING"
checkpoint.interval = 5000
read_limit.bytes_per_second=7000000
read_limit.rows_per_second=400
}
source {
MySQL-CDC {
server-id = 5652-5657
username = "st_user_source"
password = "mysqlpw"
table-names = ["shop.products"]
url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
schema-changes.enabled = true
}
}
sink {
jdbc {
url = "jdbc:postgresql://postgresql:5432/shop"
driver = "org.postgresql.Driver"
user = "postgres"
password = "postgres"
generate_sink_sql = true
database = shop
table = "public.sink_table_with_schema_change"
primary_keys = ["id"]
# Validate ddl update for sink writer multi replica
multi_table_sink_replica = 2
}
}
```
### Mysql-CDC -> Jdbc-Dameng
```hocon
env {
# You can set engine configuration here
parallelism = 5
job.mode = "STREAMING"
checkpoint.interval = 5000
read_limit.bytes_per_second=7000000
read_limit.rows_per_second=400
}
source {
MySQL-CDC {
server-id = 5652-5657
username = "st_user_source"
password = "mysqlpw"
table-names = ["shop.products"]
url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
schema-changes.enabled = true
}
}
sink {
jdbc {
url = "jdbc:dm://e2e_dmdb:5236"
driver = "dm.jdbc.driver.DmDriver"
connection_check_timeout_sec = 1000
user = "SYSDBA"
password = "SYSDBA"
generate_sink_sql = true
database = "DAMENG"
table = "SYSDBA.sink_table_with_schema_change"
primary_keys = ["id"]
# Validate ddl update for sink writer multi replica
multi_table_sink_replica = 2
}
}
```
### Mysql-CDC -> Jdbc-SqlServer
```hocon
env {
# You can set engine configuration here
parallelism = 5
job.mode = "STREAMING"
checkpoint.interval = 5000
read_limit.bytes_per_second=7000000
read_limit.rows_per_second=400
}
source {
MySQL-CDC {
server-id = 5652-5657
username = "st_user_source"
password = "mysqlpw"
table-names = ["shop.products"]
url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
schema-changes.enabled = true
}
}
sink {
jdbc {
url = "jdbc:sqlserver://e2e_sqlserver:1433"
driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
user = "sa"
password = "paanssy1234$"
generate_sink_sql = true
database = master
table = "dbo.sink_table_with_schema_change"
primary_keys = ["id"]
# Validate ddl update for sink writer multi replica
multi_table_sink_replica = 2
}
}
```