title: SQLServer-CDC sidebar_position: 10

import {siteVariables} from ‘../../version’;

Overview

The SQLServer Extract Node reads data and incremental data from the SQLServer database. The following will describe how to set up the SQLServer extraction node.

Supported Version

Extract NodeVersion
SQLServer-cdcSQLServer: 2014、2016、2017、2019、2022

Dependencies

Introduce related SQLServer Extract Node dependencies through maven. Of course, you can also use INLONG to provide jar packages.(sort-connector-sqlserver-cdc)

Maven dependency

Setup SQLServer Extract Node

SQLServer Extract Node needs to open related libraries and tables, the steps are as follows:

  1. Enable the CDC function for the database.
if exists(select 1 from sys.databases where name='dbName' and is_cdc_enabled=0)
begin
    exec sys.sp_cdc_enable_db
end
  1. Check the database CDC capability status.
select is_cdc_enabled from sys.databases where name='dbName'

note: 1 is running CDC of DB.

  1. Turn on CDC for the table
IF EXISTS(SELECT 1 FROM sys.tables WHERE name='tableName' AND is_tracked_by_cdc = 0)
BEGIN
    EXEC sys.sp_cdc_enable_table
        @source_schema = 'dbo', -- source_schema
        @source_name = 'tableName', -- table_name
        @capture_instance = NULL, -- capture_instance
        @supports_net_changes = 1, -- supports_net_changes
        @role_name = NULL, -- role_name
        @index_name = NULL, -- index_name
        @captured_column_list = NULL, -- captured_column_list
        @filegroup_name = 'PRIMARY' -- filegroup_name
END

note: The table must have a primary key or unique index.

  1. Check the table CDC capability status.
SELECT is_tracked_by_cdc FROM sys.tables WHERE name='tableName'

note: 1 is running CDC of table.

How to create a SQLServer Extract Node

Usage for SQL API

The example below shows how to create a SQLServer Extract Node with Flink SQL Cli :

-- Set checkpoint every 3000 milliseconds                       
Flink SQL> SET 'execution.checkpointing.interval' = '3s';   

-- Create a SQLServer table 'sqlserver_extract_node' in Flink SQL Cli
Flink SQL> CREATE TABLE sqlserver_extract_node (
     order_id INT,
     order_date TIMESTAMP(0),
     customer_name STRING,
     price DECIMAL(10, 5),
     product_id INT,
     order_status BOOLEAN,
     PRIMARY KEY(order_id) NOT ENFORCED
     ) WITH (
     'connector' = 'sqlserver-cdc-inlong',
     'hostname' = 'YourHostname',
     'port' = 'port', --default:1433
     'username' = 'YourUsername',
     'password' = 'YourPassword',
     'database-name' = 'YourDatabaseName',
     'schema-name' = 'YourSchemaName' -- default:dbo
     'table-name' = 'YourTableName');
  
-- Read snapshot and binlog from sqlserver_extract_node
Flink SQL> SELECT * FROM sqlserver_extract_node;

Usage for InLong Dashboard

TODO

Usage for InLong Manager Client

TODO

SQLServer Extract Node Options

Available Metadata

The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.

The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:

CREATE TABLE sqlserver_extract_node (
    table_name STRING METADATA  FROM 'table_name' VIRTUAL,
    schema_name STRING METADATA  FROM 'schema_name' VIRTUAL,
    db_name STRING METADATA FROM 'database_name' VIRTUAL,
    operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    id INT NOT NULL
) WITH (
    'connector' = 'sqlserver-cdc-inlong',
    'hostname' = 'localhost',
    'port' = '1433',
    'username' = 'sa',
    'password' = 'password',
    'database-name' = 'test',
    'schema-name' = 'dbo',
    'table-name' = 'worker'
);

Data Type Mapping