import {siteVariables} from ‘../../version’;
The MySQL Extract Node allows for reading snapshot data and incremental data from MySQL database. This document describes how to set up the MySQL Extract Node to run SQL queries against MySQL databases.
Extract Node | Version | Driver |
---|---|---|
MySQL-CDC | MySQL: 5.6, 5.7, 8.0.x RDS MySQL: 5.6, 5.7, 8.0.x PolarDB MySQL: 5.6, 5.7, 8.0.x Aurora MySQL: 5.6, 5.7, 8.0.x MariaDB: 10.x PolarDB X: 2.0.1 | JDBC Driver: 8.0.21 |
In order to set up the MySQL Extract Node, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with Sort Connectors JAR bundles.
The MySQL driver dependency is also required to connect to MySQL database. Please download mysql-connector-java-8.0.21.jar and put it into FLINK_HOME/lib/
.
You have to define a MySQL user with appropriate permissions on all databases that the Debezium MySQL connector monitors.
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
mysql> GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
Note: The RELOAD permissions is not required any more when scan.incremental.snapshot.enabled
is enabled (enabled by default).
mysql> FLUSH PRIVILEGES;
See more about the permission explanation.
Every MySQL database client for reading binlog should have an unique id, called server id. MySQL server will use this id to maintain network connection and the binlog position. Therefore, if different jobs share a same server id, it may result to read from wrong binlog position. Thus, it is recommended to set different server id for each reader via the SQL Hints, e.g. assuming the source parallelism is 4, then we can use SELECT * FROM source_table /*+ OPTIONS('server-id'='5401-5404') */ ;
to assign unique server id for each of the 4 source readers.
When an initial consistent snapshot is made for large databases, your established connection could timeout while the tables are being read. You can prevent this behavior by configuring interactive_timeout and wait_timeout in your MySQL configuration file.
interactive_timeout
: The number of seconds the server waits for activity on an interactive connection before closing it. See MySQL documentations.wait_timeout
: The number of seconds the server waits for activity on a noninteractive connection before closing it. See MySQL documentations.The example below shows how to create an MySQL Extract Node with Flink SQL
:
-- Set checkpoint every 3000 milliseconds Flink SQL> SET 'execution.checkpointing.interval' = '3s'; -- Create a MySQL table 'mysql_extract_node' in Flink SQL Flink SQL> CREATE TABLE mysql_extract_node ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc-inlong', 'hostname' = 'YourHostname', 'port' = '3306', 'username' = 'YourUsername', 'password' = 'YourPassword', 'database-name' = 'YourDatabaseName', 'table-name' = 'YourTableName'); -- Read snapshot and binlogs from mysql_extract_node Flink SQL> SELECT * FROM mysql_extract_node;
Choose the BINLOG
Data Source
Configure the MySQL Source
TODO: It will be supported in the future.
The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.
The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:
CREATE TABLE `mysql_extract_node` ( `id` INT, `name` STRING, `database_name` string METADATA FROM 'meta.database_name', `table_name` string METADATA FROM 'meta.table_name', `op_ts` timestamp(3) METADATA FROM 'meta.op_ts', `op_type` string METADATA FROM 'meta.op_type', `batch_id` bigint METADATA FROM 'meta.batch_id', `is_ddl` boolean METADATA FROM 'meta.is_ddl', `update_before` ARRAY<MAP<STRING, STRING>> METADATA FROM 'meta.update_before', `mysql_type` MAP<STRING, STRING> METADATA FROM 'meta.mysql_type', `pk_names` ARRAY<STRING> METADATA FROM 'meta.pk_names', `data` STRING METADATA FROM 'meta.data_canal', `sql_type` MAP<STRING, INT> METADATA FROM 'meta.sql_type', `ingestion_ts` TIMESTAMP(3) METADATA FROM 'meta.ts', PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc-inlong', 'hostname' = 'YourHostname', 'migrate-all' = 'true', 'port' = '3306', 'username' = 'YourUsername', 'password' = 'YourPassword', 'database-name' = 'YourDatabase', 'table-name' = 'YourTable', 'row-kinds-filtered' = '+I' );
Mysql Extract node supports whole database and multi-table synchronization. After this function is enabled, the Mysql Extract node will compress the physical fields of the table into a special meta field ‘data_canal’ in the ‘canal-json’ format, and can also be configured as a metadata field ‘data_debezium’ in the ‘debezium-json’ format.
Configuration parameters:
| Parameter | Required | Default Value | Data Type | Description | |---------------| ---| ---| ---|--------------------- ----------------------------------------| | migrate-all |optional| false|String| Enable the entire database migration mode, all physical fields are obtained through the data_canal field | | table-name |optional| false|String| The regular expression of the table to be read, use “.” to separate between database and table, and use “,” to separate multiple regular expressions | | database-name |optional| false|String| The expression of the library to be read, multiple regular expressions are separated by “,” |
The CREATE TABLE example demonstrates the function syntax:
CREATE TABLE `table_1`( `data` STRING METADATA FROM 'meta.data_canal' VIRTUAL) WITH ( 'inlong.metric.labels' = 'groupId=1&streamId=1&nodeId=1', 'migrate-all' = 'true', 'connector' = 'mysql-cdc-inlong', 'hostname' = 'localhost', 'database-name' = 'test,test01', 'username' = 'root', 'password' = 'inlong', 'table-name' = 'test01\.a{2}[0-9]$, test\.[\s\S]*' )