import {siteVariables} from ‘../../version’;
MySQL Extract 节点允许从 MySQL 数据库中读取快照数据和增量数据。本文档介绍如何设置 MySQL Extract 节点以对 MySQL 数据库运行 SQL 查询。
Extract 节点 | 版本 | Driver |
---|---|---|
MySQL-CDC | MySQL: 5.6, 5.7, 8.0.x RDS MySQL: 5.6, 5.7, 8.0.x PolarDB MySQL: 5.6, 5.7, 8.0.x Aurora MySQL: 5.6, 5.7, 8.0.x MariaDB: 10.x PolarDB X: 2.0.1 | JDBC Driver: 8.0.21 |
为了设置 MySQL Extract 节点,下表提供了使用构建自动化工具(例如 Maven 或 SBT)和带有 Sort Connectors JAR 包的 SQL 客户端的两个项目的依赖关系信息。
连接 MySQL 数据库还需要 MySQL 驱动程序依赖项。请下载mysql-connector-java-8.0.21.jar 并将其放入 FLINK_HOME/lib/
。
你必须定义一个对 Debezium MySQL 连接器监控的所有数据库具有适当权限的 MySQL 用户。
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
mysql> GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
注意: 启用 scan.incremental.snapshot.enabled
时不再需要 RELOAD 权限(默认启用)。
mysql> FLUSH PRIVILEGES;
查看更多关于权限说明。
每一个读取 Binlog 的 MySQL 数据库客户端都应该有一个唯一的 Id,称为 Server Id。 MySQL 服务器将使用此 Id 来维护网络连接和 Binlog 位置。因此,如果不同的作业共享相同的服务器 Id,可能会导致从错误的 Binlog 位置读取。 因此,建议通过 [SQL Hints](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/hints. html), 例如假设源并行度为 4,那么我们可以使用 SELECT * FROM source_table /*+ OPTIONS('server-id'='5401-5404') */ ;
为 4 个 Source Reader 中的每一个分配唯一的服务器 Id。
当为大型数据库制作初始一致快照时,您建立的连接可能会在读取表时超时。您可以通过在 MySQL 配置文件中配置 interactive_timeout
和 wait_timeout
来防止这种行为。
interactive_timeout
:服务器在关闭交互式连接之前等待其活动的秒数。请参阅 MySQL 文档。wait_timeout
:服务器在关闭非交互式连接之前等待其活动的秒数。请参阅 MySQL 文档。下面这个例子展示了如何用 Flink SQL
创建一个 MySQL Extract 节点:
-- 设置 Checkpoint 为 3000 毫秒 Flink SQL> SET 'execution.checkpointing.interval' = '3s'; Flink SQL> CREATE TABLE mysql_extract_node ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc-inlong', 'hostname' = 'YourHostname', 'port' = '3306', 'username' = 'YourUsername', 'password' = 'YourPassword', 'database-name' = 'YourDatabaseName', 'table-name' = 'YourTableName'); Flink SQL> SELECT * FROM mysql_extract_node;
Choose the BINLOG
Data Source
Configure the MySQL Source
TODO: 将在未来支持此功能。
以下格式元数据可以作为表定义中的只读 (VIRTUAL) 列公开。
扩展的 CREATE TABLE 示例演示了使用这些元数据字段的语法:
CREATE TABLE `mysql_extract_node` ( `id` INT, `name` STRING, `database_name` string METADATA FROM 'meta.database_name', `table_name` string METADATA FROM 'meta.table_name', `op_ts` timestamp(3) METADATA FROM 'meta.op_ts', `op_type` string METADATA FROM 'meta.op_type', `batch_id` bigint METADATA FROM 'meta.batch_id', `is_ddl` boolean METADATA FROM 'meta.is_ddl', `update_before` ARRAY<MAP<STRING, STRING>> METADATA FROM 'meta.update_before', `mysql_type` MAP<STRING, STRING> METADATA FROM 'meta.mysql_type', `pk_names` ARRAY<STRING> METADATA FROM 'meta.pk_names', `data` STRING METADATA FROM 'meta.data', `sql_type` MAP<STRING, INT> METADATA FROM 'meta.sql_type', `ingestion_ts` TIMESTAMP(3) METADATA FROM 'meta.ts', PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc-inlong', 'hostname' = 'YourHostname', 'migrate-all' = 'true', 'port' = '3306', 'username' = 'YourUsername', 'password' = 'YourPassword', 'database-name' = 'YourDatabase', 'table-name' = 'YourTable' );