import {siteVariables} from ‘../../version’;
MongoDB CDC 连接器允许从 MongoDB 读取快照数据和增量数据。本文档介绍如何设置 MongoDB CDC 连接器以对 MongoDB 运行 SQL 查询。
Extract 节点 | 版本 |
---|---|
MongoDB-CDC | MongoDB: >= 3.6 |
I.为了设置 MongoDB CDC 连接器,下表提供了使用构建自动化工具(例如 Maven 或 SBT)的依赖关系信息
MongoDB 版本
MongoDB 版本 >= 3.6 我们使用 更改流功能(3.6 版中的新功能)来捕获更改数据。
集群部署
存储引擎
需要 WiredTiger存储引擎。
需要副本集协议版本 1 (pv1)。 从版本 4.0 开始,MongoDB 仅支持 pv1。pv1 是使用 MongoDB 3.2 或更高版本创建的所有新副本集的默认值。
特权
changeStream
MongoDB Kafka 连接器 read
需要权限。
您可以使用以下示例进行简单授权。 更详细的授权请参考 MongoDB 数据库用户角色。
use admin; db.createUser({ user: "flinkuser", pwd: "flinkpw", roles: [ { role: "read", db: "admin" }, // read role includes changeStream privilege { role: "readAnyDatabase", db: "admin" } // for snapshot reading ] });
这个例子展示了如何使用 Flink SQL
创建一个 MongoDB Extract 节点:
-- Set checkpoint every 3000 milliseconds Flink SQL> SET 'execution.checkpointing.interval' = '3s'; -- Create a MySQL table 'mongodb_extract_node' in Flink SQL Flink SQL> CREATE TABLE mongodb_extract_node ( _id STRING, // must be declared name STRING, weight DECIMAL(10,3), tags ARRAY<STRING>, -- array price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents PRIMARY KEY(_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb-cdc-inlong', 'hosts' = 'localhost:27017,localhost:27018,localhost:27019', 'username' = 'flinkuser', 'password' = 'flinkpw', 'database' = 'inventory', 'collection' = 'mongodb_extract_node' ); -- Read snapshot and binlogs from mongodb_extract_node Flink SQL> SELECT * FROM mongodb_extract_node;
注意
MongoDB 的更改事件记录在消息之前没有更新。所以,我们只能将其转换为 Flink 的 UPSERT 变更日志流。UPSERT 流需要唯一键,因此我们必须声明 _id
为主键。我们不能将其他列声明为主键,因为删除操作不包含除 _id
和 sharding key
之外的键和值。
TODO: 未来会支持
TODO: 未来会支持
选项 | 是否必须 | 默认 | 类型 | 描述 |
---|---|---|---|---|
connector | 必须 | (none) | String | 指定要使用的连接器,这里应该是mongodb-cdc-inlong . |
hosts | 必须 | (none) | String | MongoDB 服务器的主机名和端口对的逗号分隔列表。例如。localhost:27017,localhost:27018 |
username | 可选 | (none) | String | 连接到 MongoDB 时要使用的数据库用户的名称。仅当 MongoDB 配置为使用身份验证时才需要这样做。 |
password | 可选 | (none) | String | 连接 MongoDB 时使用的密码。仅当 MongoDB 配置为使用身份验证时才需要这样做。 |
database | 必须 | (none) | String | 要监视更改的数据库的名称。 |
collection | 必须 | (none) | String | 数据库中要监视更改的集合的名称。 |
connection.options | 可选 | (none) | String | MongoDB的 & 分隔连接选项。例如。replicaSet=test&connectTimeoutMS=300000 |
copy.existing | 可选 | true | Boolean | 是否从源集合中复制现有数据。 |
copy.existing.queue.size | 可选 | 10240 | Integer | 执行数据复制时使用的线程数。 |
batch.size | 可选 | 1024 | Integer | Cursor 批次大小。 |
poll.max.batch.size | 可选 | 1024 | Integer | 轮询新数据时,单个批次中包含的最大更改流文档数。 |
poll.await.time.ms | 可选 | 1000 | Integer | 在更改流上检查新结果之前等待的时间量。 |
heartbeat.interval.ms | 可选 | 0 | Integer | 发送心跳消息之间的时间长度(以毫秒为单位)。使用 0 禁用。 |
inlong.metric.labels | 可选 | (none) | String | inlong metric 的标签值,该值的构成为groupId={groupId} &streamId={streamId} &nodeId={nodeId} 。 |
scan.incremental.snapshot.enabled | 可选 | false | Boolean | 是否启用增量快照。增量快照功能仅支持 MongoDB 4.0之后的版本。 |
scan.incremental.snapshot.chunk.size.mb | 可选 | 64 | Integer | 增量快照的块大小,单位: mb。 |
chunk-meta.group.size | 可选 | 1000 | Integer | chunk meta 的组大小,如果 meta 大小超过组大小,则 meta 将被分成多个组。 |
以下格式元数据可以作为表定义中的只读 (VIRTUAL) 列公开。
Key | 数据类型 | 描述 |
---|---|---|
database_name | STRING NOT NULL | 包含该行的数据库的名称。 |
collection_name | STRING NOT NULL | 包含该行的集合的名称。 |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | 它指示在数据库中进行更改的时间。如果记录是从表的快照而不是更改流中读取的,则该值始终为 0。 |
扩展的 CREATE TABLE 示例演示了公开这些元数据字段的语法:
CREATE TABLE `mysql_extract_node` ( db_name STRING METADATA FROM 'database_name' VIRTUAL, collection_name STRING METADATA FROM 'collection_name' VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, _id STRING, // must be declared name STRING, weight DECIMAL(10,3), tags ARRAY<STRING>, -- array price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents PRIMARY KEY(_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb-cdc-inlong', 'hostname' = 'YourHostname', 'username' = 'YourUsername', 'password' = 'YourPassword', 'database' = 'YourDatabase', 'collection' = 'YourTable' );
BSON 类型 | Flink SQL 类型 |
---|---|
TINYINT | |
SMALLINT | |
Int | INT |
Long | BIGINT |
FLOAT | |
Double | DOUBLE |
Decimal128 | DECIMAL(p, s) |
Boolean | BOOLEAN |
Date Timestamp | DATE |
Date Timestamp | TIME |
Date | TIMESTAMP(3) TIMESTAMP_LTZ(3) |
Timestamp | TIMESTAMP(0) TIMESTAMP_LTZ(0) |
String ObjectId UUID Symbol MD5 JavaScript Regex | STRING |
BinData | BYTES |
Object | ROW |
Array | ARRAY |
DBPointer | ROW<$ref STRING, $id STRING> |
GeoJSON | Point : ROW<type STRING, coordinates ARRAY<DOUBLE>> Line : ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>> ... |