title: 总览 sidebar_position: 1

简介

Extract 节点列表是一组基于 Apache Flink® 的 Source Connectors 用于从不同的源系统抽取数据。

支持的 Extract 节点列表

Extract 节点版本驱动包
KafkaKafka: 0.10+None
PulsarPulsar: 2.8.x+None
MongoDB-CDCMongoDB: 3.6, 4.x, 5.0None
MySQL-CDCMySQL: 5.6, 5.7, 8.0.x
RDS MySQL: 5.6, 5.7, 8.0.x
PolarDB MySQL: 5.6, 5.7, 8.0.x
Aurora MySQL: 5.6, 5.7, 8.0.x
MariaDB: 10.x
PolarDB X: 2.0.1
JDBC Driver: 8.0.21
Oracle-CDCOracle: 11, 12, 19Oracle Driver: 19.3.0.0
PostgreSQL-CDCPostgreSQL: 9.6, 10, 11, 12JDBC Driver: 42.2.12
SQLServer-CDCSQLServer: 2012, 2014, 2016, 2017, 2019None

支持的 Flink 版本列表

下表展示了 InLong® Extract 节点 和 Flink® 版本之间的对应关系。

InLong® Extract 节点版本Flink® 版本
1.2.01.13.5

SQL API 用法

我们需要几个步骤来使用提供的连接器设置 Flink 集群。

  • 设置一个安装了 1.13.5 版本和 Java 8+ 的 Flink 集群。
  • 下载 页面下载并解压 Sort Connectors jars (或者参考 如何编译 编译需要的版本)。
  • 将下载并解压后的 Sort Connectors jars 放到 FLINK_HOME/lib/
  • 重启 Flink 集群。

下面例子展示了如何在 Flink SQL Client 创建 MySQL Extract 节点,并从中查询数据:

-- 创建一个 MySQL Extract 节点
CREATE TABLE mysql_extract_node (
 id INT NOT NULL,
 name STRING,
 age INT,
 weight DECIMAL(10,3),
 PRIMARY KEY(id) NOT ENFORCED
) WITH (
 'connector' = 'mysql-cdc-inlong',
 'hostname' = 'YourHostname',
 'port' = '3306',
 'username' = 'YourUsername',
 'password' = 'YourPassword',
 'database-name' = 'YourDatabaseName',
 'table-name' = 'YourTableName'
);

SELECT id, name, age, weight FROM mysql_extract_node;