title: MongoDB-CDC sidebar_position: 5

import {siteVariables} from ‘../../version’;

概述

MongoDB CDC 连接器允许从 MongoDB 读取快照数据和增量数据。本文档介绍如何设置 MongoDB CDC 连接器以对 MongoDB 运行 SQL 查询。

支持的版本

Extract 节点版本
MongoDB-CDCMongoDB: >= 3.6

依赖项

I.为了设置 MongoDB CDC 连接器,下表提供了使用构建自动化工具(例如 Maven 或 SBT)的依赖关系信息

Maven依赖

设置 MongoDB

可用性

  • MongoDB 版本

    MongoDB 版本 >= 3.6 我们使用 更改流功能(3.6 版中的新功能)来捕获更改数据。

  • 集群部署

    需要 副本集分片集群

  • 存储引擎

    需要 WiredTiger存储引擎。

  • 副本集协议版本

    需要副本集协议版本 1 (pv1)。 从版本 4.0 开始,MongoDB 仅支持 pv1。pv1 是使用 MongoDB 3.2 或更高版本创建的所有新副本集的默认值。

  • 特权

    changeStream MongoDB Kafka 连接器 read 需要权限。

    您可以使用以下示例进行简单授权。 更详细的授权请参考 MongoDB 数据库用户角色

    use admin;
    db.createUser({
      user: "flinkuser",
      pwd: "flinkpw",
      roles: [
        { role: "read", db: "admin" }, // read role includes changeStream privilege 
        { role: "readAnyDatabase", db: "admin" } // for snapshot reading
      ]
    });
    

如何创建 MongoDB Extract 节点

SQL API 用法

这个例子展示了如何使用 Flink SQL 创建一个 MongoDB Extract 节点:

-- Set checkpoint every 3000 milliseconds                       
Flink SQL> SET 'execution.checkpointing.interval' = '3s';   

-- Create a MySQL table 'mongodb_extract_node' in Flink SQL
Flink SQL> CREATE TABLE mongodb_extract_node (
  _id STRING, // must be declared
  name STRING,
  weight DECIMAL(10,3),
  tags ARRAY<STRING>, -- array
  price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
  suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb-cdc-inlong',
  'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
  'username' = 'flinkuser',
  'password' = 'flinkpw',
  'database' = 'inventory',
  'collection' = 'mongodb_extract_node'
);

-- Read snapshot and binlogs from mongodb_extract_node
Flink SQL> SELECT * FROM mongodb_extract_node;

注意

MongoDB 的更改事件记录在消息之前没有更新。所以,我们只能将其转换为 Flink 的 UPSERT 变更日志流。UPSERT 流需要唯一键,因此我们必须声明 _id 为主键。我们不能将其他列声明为主键,因为删除操作不包含除 _idsharding key 之外的键和值。

InLong Dashboard 用法

TODO: 未来会支持

InLong Manager 用法

TODO: 未来会支持

MongoDB Extract 节点参数

选项是否必须默认类型描述
connector必须(none)String指定要使用的连接器,这里应该是mongodb-cdc-inlong.
hosts必须(none)StringMongoDB 服务器的主机名和端口对的逗号分隔列表。例如。localhost:27017,localhost:27018
username可选(none)String连接到 MongoDB 时要使用的数据库用户的名称。仅当 MongoDB 配置为使用身份验证时才需要这样做。
password可选(none)String连接 MongoDB 时使用的密码。仅当 MongoDB 配置为使用身份验证时才需要这样做。
database必须(none)String要监视更改的数据库的名称。
collection必须(none)String数据库中要监视更改的集合的名称。
connection.options可选(none)StringMongoDB的 & 分隔连接选项。例如。replicaSet=test&connectTimeoutMS=300000
copy.existing可选trueBoolean是否从源集合中复制现有数据。
copy.existing.queue.size可选10240Integer执行数据复制时使用的线程数。
batch.size可选1024IntegerCursor 批次大小。
poll.max.batch.size可选1024Integer轮询新数据时,单个批次中包含的最大更改流文档数。
poll.await.time.ms可选1000Integer在更改流上检查新结果之前等待的时间量。
heartbeat.interval.ms可选0Integer发送心跳消息之间的时间长度(以毫秒为单位)。使用 0 禁用。
inlong.metric.labels可选(none)Stringinlong metric 的标签值,该值的构成为groupId={groupId}&streamId={streamId}&nodeId={nodeId}
scan.incremental.snapshot.enabled可选falseBoolean是否启用增量快照。增量快照功能仅支持 MongoDB 4.0之后的版本。
scan.incremental.snapshot.chunk.size.mb可选64Integer增量快照的块大小,单位: mb。
chunk-meta.group.size可选1000Integerchunk meta 的组大小,如果 meta 大小超过组大小,则 meta 将被分成多个组。

可用元数据

以下格式元数据可以作为表定义中的只读 (VIRTUAL) 列公开。

Key数据类型描述
database_nameSTRING NOT NULL包含该行的数据库的名称。
collection_nameSTRING NOT NULL包含该行的集合的名称。
op_tsTIMESTAMP_LTZ(3) NOT NULL它指示在数据库中进行更改的时间。如果记录是从表的快照而不是更改流中读取的,则该值始终为 0。

扩展的 CREATE TABLE 示例演示了公开这些元数据字段的语法:

CREATE TABLE `mysql_extract_node` (
    db_name STRING METADATA FROM 'database_name' VIRTUAL,
    collection_name STRING METADATA  FROM 'collection_name' VIRTUAL,
    operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    _id STRING, // must be declared
    name STRING,
    weight DECIMAL(10,3),
    tags ARRAY<STRING>, -- array
    price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
    suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
    PRIMARY KEY(_id) NOT ENFORCED
) WITH (
      'connector' = 'mongodb-cdc-inlong', 
      'hostname' = 'YourHostname',
      'username' = 'YourUsername',
      'password' = 'YourPassword',
      'database' = 'YourDatabase',
      'collection' = 'YourTable' 
);

数据类型映射

BSON 类型Flink SQL 类型
TINYINT
SMALLINT
IntINT
LongBIGINT
FLOAT
DoubleDOUBLE
Decimal128DECIMAL(p, s)
BooleanBOOLEAN
Date TimestampDATE
Date TimestampTIME
DateTIMESTAMP(3) TIMESTAMP_LTZ(3)
TimestampTIMESTAMP(0) TIMESTAMP_LTZ(0)
String ObjectId UUID Symbol MD5 JavaScript RegexSTRING
BinDataBYTES
ObjectROW
ArrayARRAY
DBPointerROW<$ref STRING, $id STRING>
GeoJSONPoint : ROW<type STRING, coordinates ARRAY<DOUBLE>> Line : ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>> ...