import ChangeLog from ‘../changelog/connector-cdc-tidb.md’;
TiDB CDC模式的连接器
SeaTunnel Zeta
Flink
TiDB-CDC连接器允许从 TiDB 数据库读取快照数据和增量数据。本文将介绍如何设置 TiDB-CDC 连接器,在 TiDB 数据库中对数据进行快照和捕获流事件。
数据源 | 支持的版本 | 驱动 | Maven |
---|---|---|---|
MySQL | MySQL: 5.5, 5.6, 5.7, 8.0.x RDS MySQL: 5.6, 5.7, 8.0.x | com.mysql.cj.jdbc.Driver | https://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.28 |
tikv-client-java | 3.2.0 | - | https://mvnrepository.com/artifact/org.tikv/tikv-client-java/3.2.0 |
- 你需要确保 jdbc 驱动 jar 包 和 tikv-client-java jar 包 已经放在目录
${SEATUNNEL_HOME}/plugins/
。
- 你需要确保 jdbc 驱动 jar 包 和 tikv-client-java jar 包 已经放在目录
${SEATUNNEL_HOME}/lib/
。
请下载 Mysql 驱动和 tikv-java-client 并将其放在 ${SEATUNNEL_HOME}/lib/
目录中。例如:
cp mysql-connector-java-xxx.jar ${SEATUNNEL_HOME}/lib/
Mysql 数据类型 | SeaTunnel 数据类型 |
---|---|
BIT(1) TINYINT(1) | BOOLEAN |
TINYINT | TINYINT |
TINYINT UNSIGNED SMALLINT | SMALLINT |
SMALLINT UNSIGNED MEDIUMINT MEDIUMINT UNSIGNED INT INTEGER YEAR | INT |
INT UNSIGNED INTEGER UNSIGNED BIGINT | BIGINT |
BIGINT UNSIGNED | DECIMAL(20,0) |
DECIMAL(p, s) DECIMAL(p, s) UNSIGNED NUMERIC(p, s) NUMERIC(p, s) UNSIGNED | DECIMAL(p,s) |
FLOAT FLOAT UNSIGNED | FLOAT |
DOUBLE DOUBLE UNSIGNED REAL REAL UNSIGNED | DOUBLE |
CHAR VARCHAR TINYTEXT MEDIUMTEXT TEXT LONGTEXT ENUM JSON ENUM | STRING |
DATE | DATE |
TIME(s) | TIME(s) |
DATETIME TIMESTAMP(s) | TIMESTAMP(s) |
BINARY VARBINAR BIT(p) TINYBLOB MEDIUMBLOB BLOB LONGBLOB GEOMETRY | BYTES |
名称 | 类型 | 必需 | 默认 | 描述 |
---|---|---|---|---|
url | String | 是 | - | JDBC 连接的 URL,例如:jdbc:mysql://tidb0:4000/inventory 。 |
username | String | 是 | - | 连接数据库服务器时使用的用户名。 |
password | String | 是 | - | 连接数据库服务器时使用的密码。 |
pd-addresses | String | 是 | - | TiKV 集群的 PD 地址。 |
database-name | String | 是 | - | 要监控的数据库名称。 |
table-name | String | 是 | - | 要监控的表名称。表名称需要包含数据库名称。 |
startup.mode | Enum | 否 | INITIAL | TiDB CDC 消费器的可选启动模式,可选值有 initial 、earliest 、latest 和 specific 。initial :启动时同步历史数据,然后同步增量数据。earliest :从最早的可用偏移量开始启动。latest :从最新的偏移量开始启动。specific :从用户提供的特定偏移量开始启动。 |
batch-size-per-scan | Int | 否 | 1000 | 每次扫描的大小。 |
tikv.grpc.timeout_in_ms | Long | 否 | - | TiKV GRPC 超时时间(毫秒)。 |
tikv.grpc.scan_timeout_in_ms | Long | 否 | - | TiKV GRPC 扫描超时时间(毫秒)。 |
tikv.batch_get_concurrency | Integer | 否 | - | TiKV GRPC 批量获取并发度。 |
tikv.batch_scan_concurrency | Integer | 否 | - | TiKV GRPC 批量扫描并发度。 |
env { parallelism = 1 job.mode = "STREAMING" checkpoint.interval = 5000 } source { TiDB-CDC { plugin_output = "products_tidb_cdc" url = "jdbc:mysql://tidb0:4000/inventory" driver = "com.mysql.cj.jdbc.Driver" tikv.grpc.timeout_in_ms = 20000 pd-addresses = "pd0:2379" username = "root" password = "" database-name = "inventory" table-name = "products" } } transform { } sink { jdbc { plugin_input = "products_tidb_cdc" url = "jdbc:mysql://tidb0:4000/inventory" driver = "com.mysql.cj.jdbc.Driver" user = "root" password = "" database = "inventory" table = "products_sink" generate_sink_sql = true primary_keys = ["id"] } }