blob: dc69c628958952bc2318a3f5e4ea82065fa54e1f [file] [log] [blame] [view]
import ChangeLog from '../changelog/connector-clickhouse.md';
# Clickhouse
> Clickhouse source 连接器
## 支持引擎
> Spark<br/>
> Flink<br/>
> SeaTunnel Zeta<br/>
## 核心特性
- [x] [批处理](../../concept/connector-v2-features.md)
- [ ] [流处理](../../concept/connector-v2-features.md)
- [ ] [精确一次](../../concept/connector-v2-features.md)
- [x] [列映射](../../concept/connector-v2-features.md)
- [ ] [并行度](../../concept/connector-v2-features.md)
- [ ] [支持用户自定义拆分](../../concept/connector-v2-features.md)
- [x] [支持多表读](../../concept/connector-v2-features.md)
> 支持查询SQL,可以实现投影效果。
## 描述
用于从Clickhouse读取数据。
## 支持的数据源信息
为了使用 Clickhouse 连接器,需要以下依赖项。它们可以通过 install-plugin.sh 或从 Maven 中央存储库下载。
| 数据源 | 支持的版本 | 依赖 |
|------------|--------------------|------------------------------------------------------------------------------------------|
| Clickhouse | universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/connector-clickhouse) |
## 数据类型映射
| Clickhouse 数据类型 | SeaTunnel 数据类型 |
|-----------------------------------------------------------------------------------------------------------------------------------------------|---------------------|
| String / Int128 / UInt128 / Int256 / UInt256 / Point / Ring / Polygon MultiPolygon | STRING |
| Int8 / UInt8 / Int16 / UInt16 / Int32 | INT |
| UInt64 / Int64 / IntervalYear / IntervalQuarter / IntervalMonth / IntervalWeek / IntervalDay / IntervalHour / IntervalMinute / IntervalSecond | BIGINT |
| Float64 | DOUBLE |
| Decimal | DECIMAL |
| Float32 | FLOAT |
| Date | DATE |
| DateTime | TIME |
| Array | ARRAY |
| Map | MAP |
## Source 选项
| 名称 | 类型 | 是否必须 | 默认值 | 描述 |
|-------------------|--------|----------|------------------------|-----------------------------------------------------------------------------------|
| host | String | | - | `ClickHouse` 集群地址, 格式是`host:port` , 允许多个`hosts`配置. 例如 `"host1:8123,host2:8123"` . |
| username | String | | - | `ClickHouse` user 用户账号. |
| password | String | | - | `ClickHouse` user 用户密码. |
| table_list | Array | NO | - | 要读取的数据表列表,支持配置多表. |
| clickhouse.config | Map | | - | 除了上述必须由 `clickhouse-jdbc` 指定的必填参数外,用户还可以指定多个可选参数,这些参数涵盖了 `clickhouse-jdbc` 提供的所有[参数](https://github.com/ClickHouse/clickhouse-jdbc/tree/master/clickhouse-client#configuration). |
| server_time_zone | String | | ZoneId.systemDefault() | 数据库服务中的会话时区。如果未设置,则使用ZoneId.systemDefault()设置服务时区. |
| common-options | | | - | 源插件常用参数,详见 [源通用选项](../source-common-options.md). |
多表配置:
| 名称 | 类型 | 是否必须 | 默认值 | 描述 |
|----------------|--------|------|------|--------------------------------------------------------------------------------------|
| table_path | String | | - | 数据表的完整路径, 例如: `default.table`. |
| sql | String | | - | 用于通过Clickhouse服务搜索数据的查询sql. |
| filter_query | String | | - | 数据过滤条件. 格式为: "field = value", 例如 : filter_query = "id > 2 and type = 1" |
| partition_list | Array | | - | 指定分区列表过滤数据. 如果是分区表,该字段可以配置为过滤指定分区的数据。. 例如: partition_list = ["20250615", "20250616"] |
| batch_size | int | | 1024 | Clickhouse读取一次可以获得的最大数据行数。 |
注意: 当此配置对应于单个表时,您可以将table_list中的配置项展平到外层。
## 并行读取
Clickhouse源连接器支持并行读取数据。
当仅指定`table_path`参数时,连接器根据从`system.parts`系统表中获取的数据表的part文件实现并行读取。
当仅指定`sql`参数时,连接器在集群的每个分片上基于本地表执行查询来实现并发读取。如果`sql`参数指定了一个分布式表,则会根据分布式表引擎的集群名获取分片列表执行并发读取。如果`sql`指定了一个本地表,那么`host`参数配置的节点列表将被视作集群分片列表执行并发读取。
如果同时设置了`table_path``sql`参数,则将在sql模式下执行。推荐在指定`sql`参数时同时配置`table_path`参数以更好地识别表的元数据。
## Tips
当指定`table_path`参数时,如果不想读取整个表,可以指定`partition_list``filter_query`参数过滤指定条件或分区的数据。
* `partition_list`: 过滤指定分区的数据
* `filter_query`: 根据指定条件对数据进行过滤
`batch_size`参数可用于控制每次查询读取的数据量,以避免在读取大量数据时出现OOM异常。适当增加这个值将有助于提高读取过程的性能。
当读取单个表的数据时,建议使用`table_path`参数替代`sql`参数。
## 如何创建Clickhouse数据同步作业
### 单表配置
下面的示例演示了如何创建一个数据同步作业,该作业从Clickhouse读取数据并在本地客户端上打印数据
**案例1:基于part文件读取策略的并行读取**
```hocon
env {
job.mode = "BATCH"
parallelism = 5
}
source {
Clickhouse {
host = "localhost:8123"
username = "xxx"
password = "xxx"
table_path = "default.table"
server_time_zone = "UTC"
partition_list = ["20250615", "20250616"]
filter_query = "id > 2 and type = 1"
batch_size = 1024
clickhouse.config = {
"socket_timeout": "300000"
}
}
}
# Console printing of the read Clickhouse data
sink {
Console {
parallelism = 1
}
}
```
**案例2:基于SQL读取策略的并行读取**
> 注意:SQL模式下的并行读取方式目前仅支持单表和where条件查询
```hocon
env {
job.mode = "BATCH"
parallelism = 5
}
source {
Clickhouse {
host = "localhost:8123"
username = "xxx"
password = "xxx"
table_path = "default.table"
server_time_zone = "UTC"
sql = "select * from default.table where id > 2 and type = 1"
batch_size = 1024
clickhouse.config = {
"socket_timeout": "300000"
}
}
}
# Console printing of the read Clickhouse data
sink {
Console {
parallelism = 1
}
}
```
**案例3:针对复杂SQL场景的单并发读取**
当执行复杂SQL查询场景(例如带有joingroup by、子查询等的查询)时,连接器将自动切换到单并发执行方式,即使配置了更高的并行度值。
```hocon
env {
job.mode = "BATCH"
parallelism = 1
}
source {
Clickhouse {
host = "localhost:8123"
username = "xxx"
password = "xxx"
server_time_zone = "UTC"
sql = "select t1.id, t2.category from default.table1 t1 global join default.table2 t2 on t1.id = t2.id where t1.age > 18"
batch_size = 1024
clickhouse.config = {
"socket_timeout": "300000"
}
}
}
# Console printing of the read Clickhouse data
sink {
Console {
parallelism = 1
}
}
```
### 多表配置
```hocon
env {
job.mode = "BATCH"
parallelism = 5
}
source {
Clickhouse {
host = "localhost:8123"
username = "xxx"
password = "xxx"
table_list = [
{
table_path = "default.table1"
sql = "select * from default.table1 where id > 2 and type = 1"
},
{
table_path = "default.table2"
sql = "select * from default.table2 where age > 18"
}
]
server_time_zone = "UTC"
clickhouse.config = {
"socket_timeout": "300000"
}
}
}
# Console printing of the read Clickhouse data
sink {
Console {
parallelism = 1
}
}
```
## 变更日志
<ChangeLog />