import ChangeLog from ‘../changelog/connector-clickhouse.md’;
Clickhouse 数据连接器
Spark
Flink
SeaTunnel Zeta
Clickhouse sink 插件通过实现幂等写入可以达到精准一次,需要配合 aggregating merge tree 支持重复数据删除的引擎。
用于将数据写入 Clickhouse。
为了使用 Clickhouse 连接器,需要以下依赖项。它们可以通过 install-plugin.sh 或从 Maven 中央存储库下载。
| 数据源 | 支持的版本 | 依赖 |
|---|---|---|
| Clickhouse | universal | 下载 |
| SeaTunnel 数据类型 | Clickhouse 数据类型 |
|---|---|
| STRING | String / Int128 / UInt128 / Int256 / UInt256 / Point / Ring / Polygon MultiPolygon |
| INT | Int8 / UInt8 / Int16 / UInt16 / Int32 |
| BIGINT | UInt64 / Int64 / IntervalYear / IntervalQuarter / IntervalMonth / IntervalWeek / IntervalDay / IntervalHour / IntervalMinute / IntervalSecond |
| DOUBLE | Float64 |
| DECIMAL | Decimal |
| FLOAT | Float32 |
| DATE | Date |
| TIME | DateTime |
| ARRAY | Array |
| MAP | Map |
| 名称 | 类型 | 是否必须 | 默认值 | 描述 |
|---|---|---|---|---|
| host | String | Yes | - | ClickHouse 集群地址, 格式是host:port , 允许多个hosts配置. 例如 "host1:8123,host2:8123". |
| database | String | Yes | - | ClickHouse 数据库名称. |
| table | String | Yes | - | 表名称. |
| username | String | Yes | - | ClickHouse 用户账号. |
| password | String | Yes | - | ClickHouse 用户密码. |
| clickhouse.config | Map | No | 除了上述必须由 clickhouse-jdbc 指定的必填参数外,用户还可以指定多个可选参数,这些参数涵盖了 clickhouse-jdbc 提供的所有参数. | |
| bulk_size | String | No | 20000 | 每次通过Clickhouse-jdbc 写入的行数,即默认是20000. |
| split_mode | String | No | false | 此模式仅支持引擎为Distributed的 clickhouse 表。选项 internal_replication 应该是 true 。他们将在 seatunnel 中拆分分布式表数据,并直接对每个分片进行写入。分片权重定义为 clickhouse 将计算在内。 |
| sharding_key | String | No | - | 使用 split_mode 时,将数据发送到哪个节点是个问题,默认为随机选择,但可以使用sharding_key参数来指定分片算法的字段。此选项仅在split_mode为 true 时有效. |
| primary_key | String | No | - | 标记clickhouse表中的主键列,并根据主键执行INSERT/UPDATE/DELETE到clickhouse表. |
| support_upsert | Boolean | No | false | 支持按查询主键更新插入行. |
| allow_experimental_lightweight_delete | Boolean | No | false | 允许基于MergeTree表引擎实验性轻量级删除. |
| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | schema保存模式,请参考下面的schema_save_mode |
| data_save_mode | Enum | no | APPEND_DATA | 数据保存模式,请参考下面的data_save_mode。 |
| custom_sql | String | no | - | 当data_save_mode设置为CUSTOM_PROCESSING时,必须同时设置CUSTOM_SQL参数。CUSTOM_SQL的值为可执行的SQL语句,在同步任务开启前SQL将会被执行 |
| save_mode_create_template | string | no | see below | 见下文。 |
| common-options | No | - | Sink插件查用参数,详见Sink常用选项. |
在开启同步任务之前,针对现有的表结构选择不同的处理方案。 选项介绍:RECREATE_SCHEMA :表不存在时创建,表保存时删除并重建。CREATE_SCHEMA_WHEN_NOT_EXIST :表不存在时会创建,表存在时跳过。ERROR_WHEN_SCHEMA_NOT_EXIST :表不存在时会报错。IGNORE :忽略对表的处理。
在开启同步任务之前,针对目标端已有的数据选择不同的处理方案。 选项介绍:DROP_DATA: 保留数据库结构并删除数据。APPEND_DATA:保留数据库结构,保留数据。CUSTOM_PROCESSING:用户自定义处理。ERROR_WHEN_DATA_EXISTS:有数据时报错。
使用模板自动创建 Clickhouse 表, 会根据上游数据类型和schema类型创建相应的建表语句, 默认模板可以根据情况进行修改。
默认模板:
CREATE TABLE IF NOT EXISTS `${database}`.`${table}` ( ${rowtype_primary_key}, ${rowtype_fields} ) ENGINE = MergeTree() ORDER BY (${rowtype_primary_key}) PRIMARY KEY (${rowtype_primary_key}) SETTINGS index_granularity = 8192 COMMENT '${comment}';
如果模板中填写了自定义字段,例如添加 id 字段
CREATE TABLE IF NOT EXISTS `${database}`.`${table}` ( id, ${rowtype_fields} ) ENGINE = MergeTree() ORDER BY (${rowtype_primary_key}) PRIMARY KEY (${rowtype_primary_key}) SETTINGS index_granularity = 8192 COMMENT '${comment}';
连接器会自动从上游获取对应类型完成填充, 并从“rowtype_fields”中删除 id 字段。 该方法可用于自定义字段类型和属性的修改。
可以使用以下占位符:
以下示例演示如何创建将随机生成的数据写入Clickhouse数据库的数据同步作业。
# Set the basic configuration of the task to be performed env { parallelism = 1 job.mode = "BATCH" checkpoint.interval = 1000 } source { FakeSource { row.num = 2 bigint.min = 0 bigint.max = 10000000 split.num = 1 split.read-interval = 300 schema { fields { c_bigint = bigint } } } } sink { Clickhouse { host = "127.0.0.1:9092" database = "default" table = "test" username = "xxxxx" password = "xxxxx" } }
1.SeaTunnel 部署文档.
2.需要在同步前提前创建要写入的表.
3.当写入 ClickHouse 表,无需设置其结构,因为连接器会在写入前向 ClickHouse 查询当前表的结构信息.
sink { Clickhouse { host = "localhost:8123" database = "default" table = "fake_all" username = "xxxxx" password = "xxxxx" clickhouse.config = { max_rows_to_read = "100" read_overflow_mode = "throw" } } }
sink { Clickhouse { host = "localhost:8123" database = "default" table = "fake_all" username = "xxxxx" password = "xxxxx" # split mode options split_mode = true sharding_key = "age" } }
sink { Clickhouse { host = "localhost:8123" database = "default" table = "fake_all" username = "xxxxx" password = "xxxxx" # cdc options primary_key = "id" support_upsert = true } }
sink { Clickhouse { host = "localhost:8123" database = "default" table = "fake_all" username = "xxxxx" password = "xxxxx" # cdc options primary_key = "id" support_upsert = true allow_experimental_lightweight_delete = true } }