title: MySQL-CDC sidebar_position: 6

import {siteVariables} from ‘../../version’;

概述

MySQL Extract 节点允许从 MySQL 数据库中读取快照数据和增量数据。本文档介绍如何设置 MySQL Extract 节点以对 MySQL 数据库运行 SQL 查询。

支持的版本

Extract 节点版本Driver
MySQL-CDCMySQL: 5.6, 5.7, 8.0.x
RDS MySQL: 5.6, 5.7, 8.0.x
PolarDB MySQL: 5.6, 5.7, 8.0.x
Aurora MySQL: 5.6, 5.7, 8.0.x
MariaDB: 10.x
PolarDB X: 2.0.1
JDBC Driver: 8.0.21

依赖

为了设置 MySQL Extract 节点,下表提供了使用构建自动化工具(例如 Maven 或 SBT)和带有 Sort Connectors JAR 包的 SQL 客户端的两个项目的依赖关系信息。

Maven 依赖

连接 MySQL 数据库还需要 MySQL 驱动程序依赖项。请下载mysql-connector-java-8.0.21.jar 并将其放入 FLINK_HOME/lib/

设置 MySQL 服务器

你必须定义一个对 Debezium MySQL 连接器监控的所有数据库具有适当权限的 MySQL 用户。

  • 创建 MySQL 用户:
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
  • 向用户授予所需的权限:
mysql> GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';

注意: 启用 scan.incremental.snapshot.enabled 时不再需要 RELOAD 权限(默认启用)。

  • 刷新用户的权限:
mysql> FLUSH PRIVILEGES;

查看更多关于权限说明

注意事项

为每个 Reader 设置一个不同的 SERVER ID

每一个读取 Binlog 的 MySQL 数据库客户端都应该有一个唯一的 Id,称为 Server Id。 MySQL 服务器将使用此 Id 来维护网络连接和 Binlog 位置。因此,如果不同的作业共享相同的服务器 Id,可能会导致从错误的 Binlog 位置读取。 因此,建议通过 [SQL Hints](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/hints. html), 例如假设源并行度为 4,那么我们可以使用 SELECT * FROM source_table /*+ OPTIONS('server-id'='5401-5404') */ ; 为 4 个 Source Reader 中的每一个分配唯一的服务器 Id。

设置 MySQL 会话超时

当为大型数据库制作初始一致快照时,您建立的连接可能会在读取表时超时。您可以通过在 MySQL 配置文件中配置 interactive_timeoutwait_timeout 来防止这种行为。

  • interactive_timeout:服务器在关闭交互式连接之前等待其活动的秒数。请参阅 MySQL 文档
  • wait_timeout:服务器在关闭非交互式连接之前等待其活动的秒数。请参阅 MySQL 文档

如何创建一个 MySQL Extract 节点

SQL API 用法

下面这个例子展示了如何用 Flink SQL 创建一个 MySQL Extract 节点:

-- 设置 Checkpoint 为 3000 毫秒                       
Flink SQL> SET 'execution.checkpointing.interval' = '3s';   

Flink SQL> CREATE TABLE mysql_extract_node (
     order_id INT,
     order_date TIMESTAMP(0),
     customer_name STRING,
     price DECIMAL(10, 5),
     product_id INT,
     order_status BOOLEAN,
     PRIMARY KEY(order_id) NOT ENFORCED
     ) WITH (
     'connector' = 'mysql-cdc-inlong',
     'hostname' = 'YourHostname',
     'port' = '3306',
     'username' = 'YourUsername',
     'password' = 'YourPassword',
     'database-name' = 'YourDatabaseName',
     'table-name' = 'YourTableName');
  
Flink SQL> SELECT * FROM mysql_extract_node;

InLong Dashboard 用法

  • Choose the BINLOG Data Source MySQL BINLOG

  • Configure the MySQL Source MySQL SOURCE

InLong Manager Client 用法

TODO: 将在未来支持此功能。

MySQL Extract 节点参数

可用的元数据字段

以下格式元数据可以作为表定义中的只读 (VIRTUAL) 列公开。

扩展的 CREATE TABLE 示例演示了使用这些元数据字段的语法:

CREATE TABLE `mysql_extract_node` (
      `id` INT,
      `name` STRING,
      `database_name` string METADATA FROM 'meta.database_name',
      `table_name`    string METADATA FROM 'meta.table_name',
      `op_ts`         timestamp(3) METADATA FROM 'meta.op_ts',
      `op_type` string METADATA FROM 'meta.op_type',
      `batch_id` bigint METADATA FROM 'meta.batch_id',
      `is_ddl` boolean METADATA FROM 'meta.is_ddl',
      `update_before` ARRAY<MAP<STRING, STRING>> METADATA FROM 'meta.update_before',
      `mysql_type` MAP<STRING, STRING> METADATA FROM 'meta.mysql_type',
      `pk_names` ARRAY<STRING> METADATA FROM 'meta.pk_names',
      `data` STRING METADATA FROM 'meta.data',
      `sql_type` MAP<STRING, INT> METADATA FROM 'meta.sql_type',
      `ingestion_ts` TIMESTAMP(3) METADATA FROM 'meta.ts',
      PRIMARY KEY (`id`) NOT ENFORCED 
) WITH (
      'connector' = 'mysql-cdc-inlong', 
      'hostname' = 'YourHostname',
      'migrate-all' = 'true',
      'port' = '3306',                
      'username' = 'YourUsername',
      'password' = 'YourPassword',
      'database-name' = 'YourDatabase',
      'table-name' = 'YourTable' 
      );

数据类型映射