title: SQLServer-CDC sidebar_position: 10

import {siteVariables} from ‘../../version’;

概述

SQLServer 提取节点从 SQLServer 数据库中读取数据和增量数据。下面将介绍如何配置 SQLServer 抽取节点。

支持的版本

Extract NodeVersion
SQLServer-cdcSQLServer: 2014、2016、2017、2019、2022

依赖配置

通过 Maven 引入 sort-connector-sqlserver-cdc 构建自己的项目。 当然,你也可以直接使用 INLONG 提供的 jar 包。(sort-connector-sqlserver-cdc)

Maven依赖配置

配置 SQLServer 加载节点

SQLServer 加载节点需要开启库和表的 CDC 功能,配置步骤如下:

  1. 开启数据库 CDC 能力。
if exists(select 1 from sys.databases where name='dbName' and is_cdc_enabled=0)
begin
    exec sys.sp_cdc_enable_db
end
  1. 检查数据库 CDC 是否开启。
select is_cdc_enabled from sys.databases where name='dbName'

备注: “1”表示数据库 CDC 开启

  1. 开启表的 CDC 能力。
IF EXISTS(SELECT 1 FROM sys.tables WHERE name='tableName' AND is_tracked_by_cdc = 0)
BEGIN
    EXEC sys.sp_cdc_enable_table
        @source_schema = 'dbo', -- source_schema
        @source_name = 'tableName', -- table_name
        @capture_instance = NULL, -- capture_instance
        @supports_net_changes = 1, -- supports_net_changes
        @role_name = NULL, -- role_name
        @index_name = NULL, -- index_name
        @captured_column_list = NULL, -- captured_column_list
        @filegroup_name = 'PRIMARY' -- filegroup_name
END

备注: 表必须有主键或者唯一索引。

  1. 检查表 CDC 是否开启。
SELECT is_tracked_by_cdc FROM sys.tables WHERE name='tableName'

备注: “1”表示表 CDC 开启

如何创建一个 SQLServer 抽取节点

SQL API 的使用

使用 Flink SQL Cli :

-- Set checkpoint every 3000 milliseconds                       
Flink SQL> SET 'execution.checkpointing.interval' = '3s';   

-- Create a SqlServer table 'sqlserver_extract_node' in Flink SQL Cli
Flink SQL> CREATE TABLE sqlserver_extract_node (
     order_id INT,
     order_date TIMESTAMP(0),
     customer_name STRING,
     price DECIMAL(10, 5),
     product_id INT,
     order_status BOOLEAN,
     PRIMARY KEY(order_id) NOT ENFORCED
     ) WITH (
     'connector' = 'sqlserver-cdc-inlong',
     'hostname' = 'YourHostname',
     'port' = 'port', --default:1433
     'username' = 'YourUsername',
     'password' = 'YourPassword',
     'database-name' = 'YourDatabaseName',
     'schema-name' = 'YourSchemaName' -- default:dbo
     'table-name' = 'YourTableName');
  
-- Read snapshot and binlog from sqlserver_extract_node
Flink SQL> SELECT * FROM sqlserver_extract_node;

InLong Dashboard 方式

TODO

InLong Manager Client 方式

TODO

SQLServer 抽取节点参数信息

可用的元数据字段

以下格式元数据可以作为表定义中的只读 (VIRTUAL) 列公开。

使用元数据字段的例子:

CREATE TABLE sqlserver_extract_node (
    table_name STRING METADATA  FROM 'table_name' VIRTUAL,
    schema_name STRING METADATA  FROM 'schema_name' VIRTUAL,
    db_name STRING METADATA FROM 'database_name' VIRTUAL,
    operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    id INT NOT NULL
) WITH (
    'connector' = 'sqlserver-cdc',
    'hostname' = 'localhost',
    'port' = '1433',
    'username' = 'sa',
    'password' = 'password',
    'database-name' = 'test',
    'schema-name' = 'dbo',
    'table-name' = 'worker'
);

数据类型映射