title: MySQL-CDC sidebar_position: 6

import {siteVariables} from ‘../../version’;

Overview

The MySQL Extract Node allows for reading snapshot data and incremental data from MySQL database. This document describes how to set up the MySQL Extract Node to run SQL queries against MySQL databases.

Supported Version

Extract NodeVersionDriver
MySQL-CDCMySQL: 5.6, 5.7, 8.0.x
RDS MySQL: 5.6, 5.7, 8.0.x
PolarDB MySQL: 5.6, 5.7, 8.0.x
Aurora MySQL: 5.6, 5.7, 8.0.x
MariaDB: 10.x
PolarDB X: 2.0.1
JDBC Driver: 8.0.21

Dependencies

In order to set up the MySQL Extract Node, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with Sort Connectors JAR bundles.

Maven dependency

The MySQL driver dependency is also required to connect to MySQL database. Please download mysql-connector-java-8.0.21.jar and put it into FLINK_HOME/lib/.

Setup MySQL server

You have to define a MySQL user with appropriate permissions on all databases that the Debezium MySQL connector monitors.

  1. Create the MySQL user:
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
  1. Grant the required permissions to the user:
mysql> GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';

Note: The RELOAD permissions is not required any more when scan.incremental.snapshot.enabled is enabled (enabled by default).

  1. Finalize the user’s permissions:
mysql> FLUSH PRIVILEGES;

See more about the permission explanation.

Notes

Set a different SERVER ID for each reader

Every MySQL database client for reading binlog should have an unique id, called server id. MySQL server will use this id to maintain network connection and the binlog position. Therefore, if different jobs share a same server id, it may result to read from wrong binlog position. Thus, it is recommended to set different server id for each reader via the SQL Hints, e.g. assuming the source parallelism is 4, then we can use SELECT * FROM source_table /*+ OPTIONS('server-id'='5401-5404') */ ; to assign unique server id for each of the 4 source readers.

Setting up MySQL session timeouts

When an initial consistent snapshot is made for large databases, your established connection could timeout while the tables are being read. You can prevent this behavior by configuring interactive_timeout and wait_timeout in your MySQL configuration file.

  • interactive_timeout: The number of seconds the server waits for activity on an interactive connection before closing it. See MySQL documentations.
  • wait_timeout: The number of seconds the server waits for activity on a noninteractive connection before closing it. See MySQL documentations.

How to create a MySQL Extract Node

Usage for SQL API

The example below shows how to create an MySQL Extract Node with Flink SQL :

-- Set checkpoint every 3000 milliseconds                       
Flink SQL> SET 'execution.checkpointing.interval' = '3s';   

-- Create a MySQL table 'mysql_extract_node' in Flink SQL
Flink SQL> CREATE TABLE mysql_extract_node (
     order_id INT,
     order_date TIMESTAMP(0),
     customer_name STRING,
     price DECIMAL(10, 5),
     product_id INT,
     order_status BOOLEAN,
     PRIMARY KEY(order_id) NOT ENFORCED
     ) WITH (
     'connector' = 'mysql-cdc-inlong',
     'hostname' = 'YourHostname',
     'port' = '3306',
     'username' = 'YourUsername',
     'password' = 'YourPassword',
     'database-name' = 'YourDatabaseName',
     'table-name' = 'YourTableName');
  
-- Read snapshot and binlogs from mysql_extract_node
Flink SQL> SELECT * FROM mysql_extract_node;

Usage for InLong Dashboard

  • Choose the BINLOG Data Source MySQL BINLOG

  • Configure the MySQL Source MySQL SOURCE

Usage for InLong Manager Client

TODO: It will be supported in the future.

MySQL Extract Node Options

Available Metadata

The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.

The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:

CREATE TABLE `mysql_extract_node` (
     `id` INT,
     `name` STRING,
     `database_name` string METADATA FROM 'meta.database_name',
     `table_name`    string METADATA FROM 'meta.table_name',
     `op_ts`         timestamp(3) METADATA FROM 'meta.op_ts',
     `op_type` string METADATA FROM 'meta.op_type',
     `batch_id` bigint METADATA FROM 'meta.batch_id',
     `is_ddl` boolean METADATA FROM 'meta.is_ddl',
     `update_before` ARRAY<MAP<STRING, STRING>> METADATA FROM 'meta.update_before',
     `mysql_type` MAP<STRING, STRING> METADATA FROM 'meta.mysql_type',
     `pk_names` ARRAY<STRING> METADATA FROM 'meta.pk_names',
     `data` STRING METADATA FROM 'meta.data_canal',
     `sql_type` MAP<STRING, INT> METADATA FROM 'meta.sql_type',
     `ingestion_ts` TIMESTAMP(3) METADATA FROM 'meta.ts',
     PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
      'connector' = 'mysql-cdc-inlong',
      'hostname' = 'YourHostname',
      'migrate-all' = 'true',
      'port' = '3306',
      'username' = 'YourUsername',
      'password' = 'YourPassword',
      'database-name' = 'YourDatabase',
      'table-name' = 'YourTable',
      'row-kinds-filtered' = '+I'
      );

Data Type Mapping

Features

Multi-database multi-table synchronization

Mysql Extract node supports whole database and multi-table synchronization. After this function is enabled, the Mysql Extract node will compress the physical fields of the table into a special meta field ‘data_canal’ in the ‘canal-json’ format, and can also be configured as a metadata field ‘data_debezium’ in the ‘debezium-json’ format.

Configuration parameters:

| Parameter | Required | Default Value | Data Type | Description | |---------------| ---| ---| ---|--------------------- ----------------------------------------| | migrate-all |optional| false|String| Enable the entire database migration mode, all physical fields are obtained through the data_canal field | | table-name |optional| false|String| The regular expression of the table to be read, use “.” to separate between database and table, and use “,” to separate multiple regular expressions | | database-name |optional| false|String| The expression of the library to be read, multiple regular expressions are separated by “,” |

The CREATE TABLE example demonstrates the function syntax:

CREATE TABLE `table_1`(
`data` STRING METADATA FROM 'meta.data_canal' VIRTUAL)
WITH (
'inlong.metric.labels' = 'groupId=1&streamId=1&nodeId=1',
'migrate-all' = 'true',
'connector' = 'mysql-cdc-inlong',
'hostname' = 'localhost',
'database-name' = 'test,test01',
'username' = 'root',
'password' = 'inlong',
'table-name' = 'test01\.a{2}[0-9]$, test\.[\s\S]*'
)