import {siteVariables} from ‘../../version’;
The SQLServer Extract Node reads data and incremental data from the SQLServer database. The following will describe how to set up the SQLServer extraction node.
Extract Node | Version |
---|---|
SQLServer-cdc | SQLServer: 2014、2016、2017、2019、2022 |
Introduce related SQLServer Extract Node dependencies through maven. Of course, you can also use INLONG to provide jar packages.(sort-connector-sqlserver-cdc)
SQLServer Extract Node needs to open related libraries and tables, the steps are as follows:
if exists(select 1 from sys.databases where name='dbName' and is_cdc_enabled=0) begin exec sys.sp_cdc_enable_db end
select is_cdc_enabled from sys.databases where name='dbName'
note: 1 is running CDC of DB.
IF EXISTS(SELECT 1 FROM sys.tables WHERE name='tableName' AND is_tracked_by_cdc = 0) BEGIN EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', -- source_schema @source_name = 'tableName', -- table_name @capture_instance = NULL, -- capture_instance @supports_net_changes = 1, -- supports_net_changes @role_name = NULL, -- role_name @index_name = NULL, -- index_name @captured_column_list = NULL, -- captured_column_list @filegroup_name = 'PRIMARY' -- filegroup_name END
note: The table must have a primary key or unique index.
SELECT is_tracked_by_cdc FROM sys.tables WHERE name='tableName'
note: 1 is running CDC of table.
The example below shows how to create a SQLServer Extract Node with Flink SQL Cli
:
-- Set checkpoint every 3000 milliseconds Flink SQL> SET 'execution.checkpointing.interval' = '3s'; -- Create a SQLServer table 'sqlserver_extract_node' in Flink SQL Cli Flink SQL> CREATE TABLE sqlserver_extract_node ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'sqlserver-cdc-inlong', 'hostname' = 'YourHostname', 'port' = 'port', --default:1433 'username' = 'YourUsername', 'password' = 'YourPassword', 'database-name' = 'YourDatabaseName', 'schema-name' = 'YourSchemaName' -- default:dbo 'table-name' = 'YourTableName'); -- Read snapshot and binlog from sqlserver_extract_node Flink SQL> SELECT * FROM sqlserver_extract_node;
TODO
TODO
The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.
The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:
CREATE TABLE sqlserver_extract_node ( table_name STRING METADATA FROM 'table_name' VIRTUAL, schema_name STRING METADATA FROM 'schema_name' VIRTUAL, db_name STRING METADATA FROM 'database_name' VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, id INT NOT NULL ) WITH ( 'connector' = 'sqlserver-cdc-inlong', 'hostname' = 'localhost', 'port' = '1433', 'username' = 'sa', 'password' = 'password', 'database-name' = 'test', 'schema-name' = 'dbo', 'table-name' = 'worker' );