import {siteVariables} from ‘../../version’;
SQLServer 提取节点从 SQLServer 数据库中读取数据和增量数据。下面将介绍如何配置 SQLServer 抽取节点。
Extract Node | Version |
---|---|
SQLServer-cdc | SQLServer: 2014、2016、2017、2019、2022 |
通过 Maven 引入 sort-connector-sqlserver-cdc 构建自己的项目。 当然,你也可以直接使用 INLONG 提供的 jar 包。(sort-connector-sqlserver-cdc)
SQLServer 加载节点需要开启库和表的 CDC 功能,配置步骤如下:
if exists(select 1 from sys.databases where name='dbName' and is_cdc_enabled=0) begin exec sys.sp_cdc_enable_db end
select is_cdc_enabled from sys.databases where name='dbName'
备注: “1”表示数据库 CDC 开启
IF EXISTS(SELECT 1 FROM sys.tables WHERE name='tableName' AND is_tracked_by_cdc = 0) BEGIN EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', -- source_schema @source_name = 'tableName', -- table_name @capture_instance = NULL, -- capture_instance @supports_net_changes = 1, -- supports_net_changes @role_name = NULL, -- role_name @index_name = NULL, -- index_name @captured_column_list = NULL, -- captured_column_list @filegroup_name = 'PRIMARY' -- filegroup_name END
备注: 表必须有主键或者唯一索引。
SELECT is_tracked_by_cdc FROM sys.tables WHERE name='tableName'
备注: “1”表示表 CDC 开启
使用 Flink SQL Cli
:
-- Set checkpoint every 3000 milliseconds Flink SQL> SET 'execution.checkpointing.interval' = '3s'; -- Create a SqlServer table 'sqlserver_extract_node' in Flink SQL Cli Flink SQL> CREATE TABLE sqlserver_extract_node ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'sqlserver-cdc-inlong', 'hostname' = 'YourHostname', 'port' = 'port', --default:1433 'username' = 'YourUsername', 'password' = 'YourPassword', 'database-name' = 'YourDatabaseName', 'schema-name' = 'YourSchemaName' -- default:dbo 'table-name' = 'YourTableName'); -- Read snapshot and binlog from sqlserver_extract_node Flink SQL> SELECT * FROM sqlserver_extract_node;
TODO
TODO
以下格式元数据可以作为表定义中的只读 (VIRTUAL) 列公开。
使用元数据字段的例子:
CREATE TABLE sqlserver_extract_node ( table_name STRING METADATA FROM 'table_name' VIRTUAL, schema_name STRING METADATA FROM 'schema_name' VIRTUAL, db_name STRING METADATA FROM 'database_name' VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, id INT NOT NULL ) WITH ( 'connector' = 'sqlserver-cdc', 'hostname' = 'localhost', 'port' = '1433', 'username' = 'sa', 'password' = 'password', 'database-name' = 'test', 'schema-name' = 'dbo', 'table-name' = 'worker' );