import ChangeLog from ‘../changelog/connector-jdbc.md’;
JDBC 数据接收器
通过jdbc写入数据。支持批处理模式和流处理模式,支持并发写入,支持精确一次语义(使用XA事务保证)
- 需要确保jdbc驱动jar包已经放在目录
${SEATUNNEL_HOME}/plugins/
下。
- 需要确保jdbc驱动jar包已经放到
${SEATUNNEL_HOME}/lib/
目录下。
使用 Xa transactions
来确保 exactly-once
。所以仅对于支持 Xa transactions
的数据库支持 exactly-once
。你可以设置 is_exactly_once=true
来启用它。
名称 | 类型 | 是否必须 | 默认值 |
---|---|---|---|
url | String | 是 | - |
driver | String | 是 | - |
user | String | 否 | - |
password | String | 否 | - |
query | String | 否 | - |
compatible_mode | String | 否 | - |
dialect | String | 否 | - |
database | String | 否 | - |
table | String | 否 | - |
primary_keys | Array | 否 | - |
connection_check_timeout_sec | Int | 否 | 30 |
max_retries | Int | 否 | 0 |
batch_size | Int | 否 | 1000 |
is_exactly_once | Boolean | 否 | false |
generate_sink_sql | Boolean | 否 | false |
xa_data_source_class_name | String | 否 | - |
max_commit_attempts | Int | 否 | 3 |
transaction_timeout_sec | Int | 否 | -1 |
auto_commit | Boolean | 否 | true |
field_ide | String | 否 | - |
properties | Map | 否 | - |
common-options | 否 | - | |
schema_save_mode | Enum | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST |
data_save_mode | Enum | 否 | APPEND_DATA |
custom_sql | String | 否 | - |
enable_upsert | Boolean | 否 | true |
use_copy_statement | Boolean | 否 | false |
用于连接远程数据源的 jdbc 类名,如果使用MySQL,则值为com.mysql.cj.jdbc.Driver
用户名
密码
JDBC 连接的 URL。参考案例:jdbc:postgresql://localhost/test
使用 sql 语句将上游输入数据写入到数据库。如 INSERT ...
数据库的兼容模式,当数据库支持多种兼容模式时需要。
例如,使用 OceanBase 数据库时,需要将其设置为 ‘mysql’ 或 ‘oracle’ 。使用StarRocks时,需要将其设置为starrocks
。
Postgres 9.5及以下版本,请设置为 postgresLow
来支持 CDC
指定的方言,如果不存在,仍然按照url获取,优先级高于url。例如,当使用 starrocks 时,你需要将其值设置为 starrocks,同理,当使用mysql时,你需要将其值设置为mysql。
如果 SeaTunnel 不支持某种方言,它将使用默认方言 GenericDialect
。请确保您提供的驱动程序支持您想要连接的数据库。
方言名称 | ||
---|---|---|
Greenplum | DB2 | Dameng |
Gbase8a | HIVE | KingBase |
MySQL | StarRocks | Oracle |
Phoenix | Postgres | Redshift |
SapHana | Snowflake | Sqlite |
SqlServer | Tablestore | Teradata |
Vertica | OceanBase | XUGU |
IRIS | Inceptor | Highgo |
使用此 database
和 table-name
自动生成 SQL,并接收上游输入的数据写入数据库。
此选项与 query
选项是互斥的,此选项具有更高的优先级。
使用 database
和此 table-name
自动生成 SQL,并接收上游输入的数据写入数据库。
此选项与 query
选项是互斥的,此选项具有更高的优先级。
table参数可以填入一个任意的表名,这个名字最终会被用作创建表的表名,并且支持变量(${table_name}
,${schema_name}
)。 替换规则如下:${schema_name}
将替换传递给目标端的 SCHEMA 名称,${table_name}
将替换传递给目标端的表名。
mysql 接收器示例:
pgsql (Oracle Sqlserver ...) 接收器示例:
Tip: 如果目标数据库有 SCHEMA 的概念,则表参数必须写成 xxx.xxx
该选项用于辅助生成 insert、delete、update 等 sql 语句。设置了该选项,将会根据该选项生成对应的 sql 语句
用于验证数据库连接的有效性时等待数据库操作完成所需的时间,单位是秒
重试提交失败的最大次数(executeBatch)
对于批量写入,当缓冲的记录数达到 batch_size
数量或者时间达到 checkpoint.interval
时,数据将被刷新到数据库中
是否启用通过XA事务实现的精确一次语义。开启,你还需要设置 xa_data_source_class_name
根据要写入的数据库表结构生成 sql 语句
指数据库驱动的 XA 数据源的类名。以 MySQL 为例,其类名为 com.mysql.cj.jdbc.MysqlXADataSource。了解其他数据库的数据源类名,可以参考文档的附录部分
事务提交失败的最大重试次数
在事务开启后的超时时间,默认值为-1(即永不超时)。请注意,设置超时时间可能会影响到精确一次(exactly-once)的语义
默认启用自动事务提交
字段 field_ide
用于在从 source 同步到 sink 时,确定字段是否需要转换为大写或小写。‘ORIGINAL’ 表示不需要转换,‘UPPERCASE’ 表示转换为大写,‘LOWERCASE’ 表示转换为小写
附加连接配置参数,当属性和URL具有相同参数时,优先级由驱动程序的具体实现确定。例如,在 MySQL 中,属性配置优先于 URL。
Sink插件常用参数,请参考 Sink常用选项 了解详情
在启动同步任务之前,针对目标侧已有的表结构选择不同的处理方案
选项介绍:
RECREATE_SCHEMA
:当表不存在时会创建,当表已存在时会删除并重建
CREATE_SCHEMA_WHEN_NOT_EXIST
:当表不存在时会创建,当表已存在时则跳过创建
ERROR_WHEN_SCHEMA_NOT_EXIST
:当表不存在时将抛出错误
IGNORE
:忽略对表的处理
在启动同步任务之前,针对目标侧已存在的数据选择不同的处理方案
选项介绍:
DROP_DATA
:保留数据库结构,删除数据
APPEND_DATA
:保留数据库结构,保留数据
CUSTOM_PROCESSING
:允许用户自定义数据处理方式
ERROR_WHEN_DATA_EXISTS
:当有数据时抛出错误
当data_save_mode
选择CUSTOM_PROCESSING
时,需要填写CUSTOM_SQL
参数。该参数通常填写一条可以执行的SQL。SQL将在同步任务之前执行
启用通过主键更新插入,如果任务没有key重复数据,设置该参数为 false 可以加快数据导入速度
使用 COPY ${table} FROM STDIN
语句导入数据。仅支持具有 getCopyAPI()
方法连接的驱动程序。例如:Postgresql 驱动程序 org.postgresql.Driver
注意:不支持 MAP
、ARRAY
、ROW
类型
在 is_exactly_once = “true” 的情况下,使用 XA 事务。这需要数据库支持,有些数据库需要一些设置:
1 postgres 需要设置 max_prepared_transactions > 1
例如 ALTER SYSTEM set max_prepared_transactions to 10
2 mysql 版本需要 >= 8.0.29
并且非 root 用户需要授予 XA_RECOVER_ADMIN
权限。例如:将 test_db.* 上的 XA_RECOVER_ADMIN 授予 'user1'@'%'
3 mysql可以尝试在url中添加 rewriteBatchedStatements=true
参数以获得更好的性能
附录参数仅提供参考
简单示例
jdbc { url = "jdbc:mysql://localhost:3306/test" driver = "com.mysql.cj.jdbc.Driver" user = "root" password = "123456" query = "insert into test_table(name,age) values(?,?)" }
精确一次 (Exactly-once)
通过设置 is_exactly_once
开启精确一次语义
jdbc { url = "jdbc:mysql://localhost:3306/test" driver = "com.mysql.cj.jdbc.Driver" max_retries = 0 user = "root" password = "123456" query = "insert into test_table(name,age) values(?,?)" is_exactly_once = "true" xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource" }
变更数据捕获 (Change data capture) 事件
jdbc 接收 CDC 示例
sink { jdbc { url = "jdbc:mysql://localhost:3306" driver = "com.mysql.cj.jdbc.Driver" user = "root" password = "123456" database = "sink_database" table = "sink_table" primary_keys = ["key1", "key2", ...] } }
配置表生成策略
通过设置 schema_save_mode
配置为 CREATE_SCHEMA_WHEN_NOT_EXIST
来支持不存在表时创建表
sink { jdbc { url = "jdbc:mysql://localhost:3306" driver = "com.mysql.cj.jdbc.Driver" user = "root" password = "123456" database = "sink_database" table = "sink_table" primary_keys = ["key1", "key2", ...] schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" data_save_mode="APPEND_DATA" } }
支持Postgres 9.5及以下版本的 CDC 示例
Postgres 9.5及以下版本,通过设置 compatible_mode
配置为 postgresLow
来支持 Postgres CDC 操作
sink { jdbc { url = "jdbc:postgresql://localhost:5432" driver = "org.postgresql.Driver" user = "root" password = "123456" compatible_mode="postgresLow" database = "sink_database" table = "sink_table" generate_sink_sql = true primary_keys = ["key1", "key2", ...] } }