import ChangeLog from ‘../changelog/connector-jdbc.md’;
JDBC SQL Server Sink Connector
Spark
Flink
Seatunnel Zeta
Write data through jdbc. Support Batch mode and Streaming mode, support concurrent writing, support exactly-once semantics (using XA transaction guarantee).
- You need to ensure that the jdbc driver jar package has been placed in directory
${SEATUNNEL_HOME}/plugins/
.
- You need to ensure that the jdbc driver jar package has been placed in directory
${SEATUNNEL_HOME}/lib/
.
Use
Xa transactions
to ensureexactly-once
. So only supportexactly-once
for the database which is supportXa transactions
. You can setis_exactly_once=true
to enable it.
Datasource | Supported Versions | Driver | Url | Maven |
---|---|---|---|---|
SQL Server | support version >= 2008 | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:sqlserver://localhost:1433 | Download |
Please download the support list corresponding to ‘Maven’ and copy it to the ‘$SEATUNNEL_HOME/plugins/jdbc/lib/’ working directory
For example SQL Server datasource: cp mssql-jdbc-xxx.jar $SEATUNNEL_HOME/plugins/jdbc/lib/
SQLserver Data Type | SeaTunnel Data Type |
---|---|
BIT | BOOLEAN |
TINYINT SMALLINT | SHORT |
INTEGER | INT |
BIGINT | LONG |
DECIMAL NUMERIC MONEY SMALLMONEY | DECIMAL((Get the designated column‘s specified column size)+1, (Gets the designated column’s number of digits to right of the decimal point.))) |
REAL | FLOAT |
FLOAT | DOUBLE |
CHAR NCHAR VARCHAR NTEXT NVARCHAR TEXT | STRING |
DATE | LOCAL_DATE |
TIME | LOCAL_TIME |
DATETIME DATETIME2 SMALLDATETIME DATETIMEOFFSET | LOCAL_DATE_TIME |
TIMESTAMP BINARY VARBINARY IMAGE UNKNOWN | Not supported yet |
Name | Type | Required | Default | Description |
---|---|---|---|---|
url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:sqlserver://localhost:1433;databaseName=mydatabase |
driver | String | Yes | - | The jdbc class name used to connect to the remote data source, if you use sqlServer the value is com.microsoft.sqlserver.jdbc.SQLServerDriver . |
username | String | No | - | Connection instance user name |
password | String | No | - | Connection instance password |
query | String | No | - | Use this sql write upstream input datas to database. e.g INSERT ... ,query have the higher priority |
database | String | No | - | Use this database and table-name auto-generate sql and receive upstream input datas write to database.This option is mutually exclusive with query and has a higher priority. |
table | String | No | - | Use database and this table-name auto-generate sql and receive upstream input datas write to database. This option is mutually exclusive with query and has a higher priority. |
primary_keys | Array | No | - | This option is used to support operations such as insert , delete , and update when automatically generate sql. |
connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete. |
max_retries | Int | No | 0 | The number of retries to submit failed (executeBatch) |
batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of batch_size or the time reaches checkpoint.interval , the data will be flushed into the database |
is_exactly_once | Boolean | No | false | Whether to enable exactly-once semantics, which will use Xa transactions. If on, you need to set xa_data_source_class_name . |
generate_sink_sql | Boolean | No | false | Generate sql statements based on the database table you want to write to |
xa_data_source_class_name | String | No | - | The xa data source class name of the database Driver, for example, SqlServer is com.microsoft.sqlserver.jdbc.SQLServerXADataSource , andplease refer to appendix for other data sources |
max_commit_attempts | Int | No | 3 | The number of retries for transaction commit failures |
transaction_timeout_sec | Int | No | -1 | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect exactly-once semantics |
auto_commit | Boolean | No | true | Automatic transaction commit is enabled by default |
common-options | no | - | Sink plugin common parameters, please refer to Sink Common Options for details | |
enable_upsert | Boolean | No | true | Enable upsert by primary_keys exist, If the task has no key duplicate data, setting this parameter to false can speed up data import |
If partition_column is not set, it will run in single concurrency, and if partition_column is set, it will be executed in parallel according to the concurrency of tasks.
This is one that reads Sqlserver data and inserts it directly into another table
env { # You can set engine configuration here parallelism = 10 } source { # This is a example source plugin **only for test and demonstrate the feature source plugin** Jdbc { driver = com.microsoft.sqlserver.jdbc.SQLServerDriver url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test" username = SA password = "Y.sa123456" query = "select * from column_type_test.dbo.full_types_jdbc" # Parallel sharding reads fields partition_column = "id" # Number of fragments partition_num = 10 } # If you would like to get more information about how to configure seatunnel and see full list of source plugins, # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc } transform { # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, # please go to https://seatunnel.apache.org/docs/transform-v2/sql } sink { Jdbc { driver = com.microsoft.sqlserver.jdbc.SQLServerDriver url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test" username = SA password = "Y.sa123456" query = "insert into full_types_jdbc_sink( id, val_char, val_varchar, val_text, val_nchar, val_nvarchar, val_ntext, val_decimal, val_numeric, val_float, val_real, val_smallmoney, val_money, val_bit, val_tinyint, val_smallint, val_int, val_bigint, val_date, val_time, val_datetime2, val_datetime, val_smalldatetime ) values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )" } # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc }
CDC change data is also supported by us In this case, you need config database, table and primary_keys.
Jdbc { plugin_input = "customers" driver = com.microsoft.sqlserver.jdbc.SQLServerDriver url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test" username = SA password = "Y.sa123456" generate_sink_sql = true database = "column_type_test" table = "dbo.full_types_sink" batch_size = 100 primary_keys = ["id"] }
Transactional writes may be slower but more accurate to the data
Jdbc { driver = com.microsoft.sqlserver.jdbc.SQLServerDriver url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test" username = SA password = "Y.sa123456" query = "insert into full_types_jdbc_sink( id, val_char, val_varchar, val_text, val_nchar, val_nvarchar, val_ntext, val_decimal, val_numeric, val_float, val_real, val_smallmoney, val_money, val_bit, val_tinyint, val_smallint, val_int, val_bigint, val_date, val_time, val_datetime2, val_datetime, val_smalldatetime ) values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )" is_exactly_once = "true" xa_data_source_class_name = "com.microsoft.sqlserver.jdbc.SQLServerXADataSource" } # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc