import ChangeLog from ‘../changelog/connector-cdc-tidb.md’;

TiDB CDC

TiDB CDC模式的连接器

支持的引擎

SeaTunnel Zeta
Flink

主要功能

描述

TiDB-CDC连接器允许从 TiDB 数据库读取快照数据和增量数据。本文将介绍如何设置 TiDB-CDC 连接器,在 TiDB 数据库中对数据进行快照和捕获流事件。

支持的数据源信息

数据源支持的版本驱动Maven
MySQL MySQL: 5.5, 5.6, 5.7, 8.0.x RDS MySQL: 5.6, 5.7, 8.0.x com.mysql.cj.jdbc.Driverhttps://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.28
tikv-client-java3.2.0-https://mvnrepository.com/artifact/org.tikv/tikv-client-java/3.2.0

Using Dependency

安装驱动

在 Flink 引擎下

  1. 你需要确保 jdbc 驱动 jar 包tikv-client-java jar 包 已经放在目录 ${SEATUNNEL_HOME}/plugins/

在 SeaTunnel Zeta 引擎下

  1. 你需要确保 jdbc 驱动 jar 包tikv-client-java jar 包 已经放在目录 ${SEATUNNEL_HOME}/lib/

请下载 Mysql 驱动和 tikv-java-client 并将其放在 ${SEATUNNEL_HOME}/lib/ 目录中。例如:

cp mysql-connector-java-xxx.jar ${SEATUNNEL_HOME}/lib/

数据类型映射

Mysql 数据类型SeaTunnel 数据类型
BIT(1)
TINYINT(1)
BOOLEAN
TINYINTTINYINT
TINYINT UNSIGNED
SMALLINT
SMALLINT
SMALLINT UNSIGNED
MEDIUMINT
MEDIUMINT UNSIGNED
INT
INTEGER
YEAR
INT
INT UNSIGNED
INTEGER UNSIGNED
BIGINT
BIGINT
BIGINT UNSIGNEDDECIMAL(20,0)
DECIMAL(p, s)
DECIMAL(p, s) UNSIGNED
NUMERIC(p, s)
NUMERIC(p, s) UNSIGNED
DECIMAL(p,s)
FLOAT
FLOAT UNSIGNED
FLOAT
DOUBLE
DOUBLE UNSIGNED
REAL
REAL UNSIGNED
DOUBLE
CHAR
VARCHAR
TINYTEXT
MEDIUMTEXT
TEXT
LONGTEXT
ENUM
JSON
ENUM
STRING
DATEDATE
TIME(s)TIME(s)
DATETIME
TIMESTAMP(s)
TIMESTAMP(s)
BINARY
VARBINAR
BIT(p)
TINYBLOB
MEDIUMBLOB
BLOB
LONGBLOB
GEOMETRY
BYTES

源选项

名称类型必需默认描述
urlString-JDBC 连接的 URL,例如:jdbc:mysql://tidb0:4000/inventory
usernameString-连接数据库服务器时使用的用户名。
passwordString-连接数据库服务器时使用的密码。
pd-addressesString-TiKV 集群的 PD 地址。
database-nameString-要监控的数据库名称。
table-nameString-要监控的表名称。表名称需要包含数据库名称。
startup.modeEnumINITIALTiDB CDC 消费器的可选启动模式,可选值有 initialearliestlatestspecific
initial:启动时同步历史数据,然后同步增量数据。
earliest:从最早的可用偏移量开始启动。
latest:从最新的偏移量开始启动。
specific:从用户提供的特定偏移量开始启动。
batch-size-per-scanInt1000每次扫描的大小。
tikv.grpc.timeout_in_msLong-TiKV GRPC 超时时间(毫秒)。
tikv.grpc.scan_timeout_in_msLong-TiKV GRPC 扫描超时时间(毫秒)。
tikv.batch_get_concurrencyInteger-TiKV GRPC 批量获取并发度。
tikv.batch_scan_concurrencyInteger-TiKV GRPC 批量扫描并发度。

任务示例

简单示例

env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  TiDB-CDC {
    plugin_output = "products_tidb_cdc"
    url = "jdbc:mysql://tidb0:4000/inventory"
    driver = "com.mysql.cj.jdbc.Driver"
    tikv.grpc.timeout_in_ms = 20000
    pd-addresses = "pd0:2379"
    username = "root"
    password = ""
    database-name = "inventory"
    table-name = "products"
  }
}

transform {
}

sink {
  jdbc {
    plugin_input = "products_tidb_cdc"
    url = "jdbc:mysql://tidb0:4000/inventory"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "root"
    password = ""
    database = "inventory"
    table = "products_sink"
    generate_sink_sql = true
    primary_keys = ["id"]
  }
}

变更日志