| import Tabs from '@theme/Tabs'; |
| import TabItem from '@theme/TabItem'; |
| |
| # Jdbc |
| |
| > JDBC sink connector |
| |
| ## Description |
| |
| Write data through jdbc |
| |
| :::tip |
| |
| Engine Supported and plugin name |
| |
| * [x] Spark: Jdbc |
| * [x] Flink: Jdbc |
| |
| ::: |
| |
| ## Options |
| |
| <Tabs |
| groupId="engine-type" |
| defaultValue="spark" |
| values={[ |
| {label: 'Spark', value: 'spark'}, |
| {label: 'Flink', value: 'flink'}, |
| ]}> |
| <TabItem value="spark"> |
| |
| | name | type | required | default value | |
| |------------------| ------ |----------|---------------| |
| | driver | string | yes | - | |
| | url | string | yes | - | |
| | user | string | yes | - | |
| | password | string | yes | - | |
| | dbTable | string | yes | - | |
| | saveMode | string | no | update | |
| | useSsl | string | no | false | |
| | customUpdateStmt | string | no | - | |
| | duplicateIncs | string | no | - | |
| | showSql | string | no | true | |
| |
| ### url [string] |
| |
| The URL of the JDBC connection. Refer to a case: `jdbc:mysql://localhost/dbName` |
| |
| ### user [string] |
| |
| username |
| |
| ##### password [string] |
| |
| user password |
| |
| ### dbTable [string] |
| |
| Sink table name, if the table does not exist, it will be created. |
| |
| ### saveMode [string] |
| |
| Storage mode, add mode `update` , perform data overwrite in a specified way when inserting data key conflicts |
| |
| Basic mode, currently supports `overwrite` , `append` , `ignore` and `error` . For the specific meaning of each mode, see [save-modes](https://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes) |
| |
| ### useSsl [string] |
| |
| Configure when `saveMode` is specified as `update` , whether to enable ssl, the default value is `false` |
| |
| ### isolationLevel [string] |
| |
| The transaction isolation level, which applies to current connection. The default value is `READ_UNCOMMITTED` |
| |
| ### customUpdateStmt [string] |
| |
| Configure when `saveMode` is specified as `update` , which is used to specify the update statement template for key conflicts. |
| If `customUpdateStmt` is empty, the sql will auto-generate for all columns, else use the sql which refer to the usage of |
| `INSERT INTO table (...) values (...) ON DUPLICATE KEY UPDATE... ` of `mysql` , use placeholders or fixed values in `values` |
| tips: the tableName of sql should be consistent with the `dbTable`. |
| |
| ### duplicateIncs [string] |
| |
| Configure when `saveMode` is specified as `update` , and when the specified key conflicts, the value is updated to the existing value plus the original value |
| |
| ### showSql |
| |
| Configure when `saveMode` is specified as `update` , whether to show sql |
| |
| </TabItem> |
| <TabItem value="flink"> |
| |
| | name | type | required | default value | |
| | -------------------------- | ------- | -------- | ------------- | |
| | driver | string | yes | - | |
| | url | string | yes | - | |
| | username | string | yes | - | |
| | password | string | no | - | |
| | query | string | yes | - | |
| | batch_size | int | no | - | |
| | source_table_name | string | yes | - | |
| | common-options | string | no | - | |
| | parallelism | int | no | - | |
| | pre_sql | string | no | - | |
| | post_sql | string | no | - | |
| | ignore_post_sql_exceptions | boolean | no | - | |
| |
| ### driver [string] |
| |
| Driver name, such as `com.mysql.cj.jdbc.Driver` for MySQL. |
| |
| Warn: for license compliance, you have to provide MySQL JDBC driver yourself, e.g. copy `mysql-connector-java-xxx.jar` to `$FLINK_HOME/lib` for Standalone. |
| |
| ### url [string] |
| |
| The URL of the JDBC connection. Such as: `jdbc:mysql://localhost:3306/test` |
| |
| ### username [string] |
| |
| username |
| |
| ### password [string] |
| |
| password |
| |
| ### query [string] |
| |
| Insert statement |
| |
| ### batch_size [int] |
| |
| Number of writes per batch |
| |
| ### parallelism [int] |
| |
| The parallelism of an individual operator, for JdbcSink. |
| |
| ### pre_sql [string] |
| |
| This sql can be executed before output. |
| |
| ### post_sql [string] |
| |
| This sql can be executed after output, and just supports for batch job. |
| |
| ### ignore_post_sql_exceptions [boolean] |
| |
| Whether to ignore post_sql exceptions. |
| |
| ### common options [string] |
| |
| Sink plugin common parameters, please refer to [Sink Plugin](common-options.md) for details |
| |
| </TabItem> |
| </Tabs> |
| |
| ## Examples |
| |
| <Tabs |
| groupId="engine-type" |
| defaultValue="spark" |
| values={[ |
| {label: 'Spark', value: 'spark'}, |
| {label: 'Flink', value: 'flink'}, |
| ]}> |
| <TabItem value="spark"> |
| |
| ```bash |
| jdbc { |
| driver = "com.mysql.cj.jdbc.Driver", |
| saveMode = "update", |
| url = "jdbc:mysql://ip:3306/database", |
| user = "userName", |
| password = "***********", |
| dbTable = "tableName", |
| customUpdateStmt = "INSERT INTO table (column1, column2, created, modified, yn) values(?, ?, now(), now(), 1) ON DUPLICATE KEY UPDATE column1 = IFNULL(VALUES (column1), column1), column2 = IFNULL(VALUES (column2), column2)" |
| } |
| ``` |
| |
| > Insert data through JDBC |
| |
| ```bash |
| jdbc { |
| driver = "com.mysql.cj.jdbc.Driver", |
| saveMode = "update", |
| truncate = "true", |
| url = "jdbc:mysql://ip:3306/database", |
| user = "userName", |
| password = "***********", |
| dbTable = "tableName", |
| customUpdateStmt = "INSERT INTO tableName (column1, column2, created, modified, yn) values(?, ?, now(), now(), 1) ON DUPLICATE KEY UPDATE column1 = IFNULL(VALUES (column1), column1), column2 = IFNULL(VALUES (column2), column2)" |
| jdbc.connect_timeout = 10000 |
| jdbc.socket_timeout = 10000 |
| } |
| ``` |
| > Timeout config |
| |
| </TabItem> |
| <TabItem value="flink"> |
| |
| ```conf |
| JdbcSink { |
| source_table_name = fake |
| driver = com.mysql.cj.jdbc.Driver |
| url = "jdbc:mysql://localhost/test" |
| username = root |
| query = "insert into test(name,age) values(?,?)" |
| batch_size = 2 |
| } |
| ``` |
| |
| </TabItem> |
| </Tabs> |