blob: 17948b3676bb4ca88a987ceee8730d04c68b9920 [file] [log] [blame]
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
# Jdbc
> JDBC sink connector
## Description
Write data through jdbc
:::tip
Engine Supported and plugin name
* [x] Spark: Jdbc
* [x] Flink: Jdbc
:::
## Options
<Tabs
groupId="engine-type"
defaultValue="spark"
values={[
{label: 'Spark', value: 'spark'},
{label: 'Flink', value: 'flink'},
]}>
<TabItem value="spark">
| name | type | required | default value |
|------------------| ------ |----------|---------------|
| driver | string | yes | - |
| url | string | yes | - |
| user | string | yes | - |
| password | string | yes | - |
| dbTable | string | yes | - |
| saveMode | string | no | update |
| useSsl | string | no | false |
| customUpdateStmt | string | no | - |
| duplicateIncs | string | no | - |
| showSql | string | no | true |
### url [string]
The URL of the JDBC connection. Refer to a case: `jdbc:mysql://localhost/dbName`
### user [string]
username
##### password [string]
user password
### dbTable [string]
Sink table name, if the table does not exist, it will be created.
### saveMode [string]
Storage mode, add mode `update` , perform data overwrite in a specified way when inserting data key conflicts
Basic mode, currently supports `overwrite` , `append` , `ignore` and `error` . For the specific meaning of each mode, see [save-modes](https://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes)
### useSsl [string]
Configure when `saveMode` is specified as `update` , whether to enable ssl, the default value is `false`
### isolationLevel [string]
The transaction isolation level, which applies to current connection. The default value is `READ_UNCOMMITTED`
### customUpdateStmt [string]
Configure when `saveMode` is specified as `update` , which is used to specify the update statement template for key conflicts.
If `customUpdateStmt` is empty, the sql will auto-generate for all columns, else use the sql which refer to the usage of
`INSERT INTO table (...) values (...) ON DUPLICATE KEY UPDATE... ` of `mysql` , use placeholders or fixed values in `values`
tips: the tableName of sql should be consistent with the `dbTable`.
### duplicateIncs [string]
Configure when `saveMode` is specified as `update` , and when the specified key conflicts, the value is updated to the existing value plus the original value
### showSql
Configure when `saveMode` is specified as `update` , whether to show sql
</TabItem>
<TabItem value="flink">
| name | type | required | default value |
| -------------------------- | ------- | -------- | ------------- |
| driver | string | yes | - |
| url | string | yes | - |
| username | string | yes | - |
| password | string | no | - |
| query | string | yes | - |
| batch_size | int | no | - |
| source_table_name | string | yes | - |
| common-options | string | no | - |
| parallelism | int | no | - |
| pre_sql | string | no | - |
| post_sql | string | no | - |
| ignore_post_sql_exceptions | boolean | no | - |
### driver [string]
Driver name, such as `com.mysql.cj.jdbc.Driver` for MySQL.
Warn: for license compliance, you have to provide MySQL JDBC driver yourself, e.g. copy `mysql-connector-java-xxx.jar` to `$FLINK_HOME/lib` for Standalone.
### url [string]
The URL of the JDBC connection. Such as: `jdbc:mysql://localhost:3306/test`
### username [string]
username
### password [string]
password
### query [string]
Insert statement
### batch_size [int]
Number of writes per batch
### parallelism [int]
The parallelism of an individual operator, for JdbcSink.
### pre_sql [string]
This sql can be executed before output.
### post_sql [string]
This sql can be executed after output, and just supports for batch job.
### ignore_post_sql_exceptions [boolean]
Whether to ignore post_sql exceptions.
### common options [string]
Sink plugin common parameters, please refer to [Sink Plugin](common-options.md) for details
</TabItem>
</Tabs>
## Examples
<Tabs
groupId="engine-type"
defaultValue="spark"
values={[
{label: 'Spark', value: 'spark'},
{label: 'Flink', value: 'flink'},
]}>
<TabItem value="spark">
```bash
jdbc {
driver = "com.mysql.cj.jdbc.Driver",
saveMode = "update",
url = "jdbc:mysql://ip:3306/database",
user = "userName",
password = "***********",
dbTable = "tableName",
customUpdateStmt = "INSERT INTO table (column1, column2, created, modified, yn) values(?, ?, now(), now(), 1) ON DUPLICATE KEY UPDATE column1 = IFNULL(VALUES (column1), column1), column2 = IFNULL(VALUES (column2), column2)"
}
```
> Insert data through JDBC
```bash
jdbc {
driver = "com.mysql.cj.jdbc.Driver",
saveMode = "update",
truncate = "true",
url = "jdbc:mysql://ip:3306/database",
user = "userName",
password = "***********",
dbTable = "tableName",
customUpdateStmt = "INSERT INTO tableName (column1, column2, created, modified, yn) values(?, ?, now(), now(), 1) ON DUPLICATE KEY UPDATE column1 = IFNULL(VALUES (column1), column1), column2 = IFNULL(VALUES (column2), column2)"
jdbc.connect_timeout = 10000
jdbc.socket_timeout = 10000
}
```
> Timeout config
</TabItem>
<TabItem value="flink">
```conf
JdbcSink {
source_table_name = fake
driver = com.mysql.cj.jdbc.Driver
url = "jdbc:mysql://localhost/test"
username = root
query = "insert into test(name,age) values(?,?)"
batch_size = 2
}
```
</TabItem>
</Tabs>