import ChangeLog from ‘../changelog/connector-clickhouse.md’;
Clickhouse source 连接器
Spark
Flink
SeaTunnel Zeta
支持查询SQL,可以实现投影效果。
用于从Clickhouse读取数据。
为了使用 Clickhouse 连接器,需要以下依赖项。它们可以通过 install-plugin.sh 或从 Maven 中央存储库下载。
数据源 | 支持的版本 | 依赖 |
---|---|---|
Clickhouse | universal | Download |
Clickhouse 数据类型 | SeaTunnel 数据类型 |
---|---|
String / Int128 / UInt128 / Int256 / UInt256 / Point / Ring / Polygon MultiPolygon | STRING |
Int8 / UInt8 / Int16 / UInt16 / Int32 | INT |
UInt64 / Int64 / IntervalYear / IntervalQuarter / IntervalMonth / IntervalWeek / IntervalDay / IntervalHour / IntervalMinute / IntervalSecond | BIGINT |
Float64 | DOUBLE |
Decimal | DECIMAL |
Float32 | FLOAT |
Date | DATE |
DateTime | TIME |
Array | ARRAY |
Map | MAP |
名称 | 类型 | 是否必须 | 默认值 | 描述 |
---|---|---|---|---|
host | String | 是 | - | ClickHouse 集群地址, 格式是host:port , 允许多个hosts 配置. 例如 "host1:8123,host2:8123" . |
username | String | 是 | - | ClickHouse user 用户账号. |
password | String | 是 | - | ClickHouse user 用户密码. |
table_list | Array | NO | - | 要读取的数据表列表,支持配置多表. |
clickhouse.config | Map | 否 | - | 除了上述必须由 clickhouse-jdbc 指定的必填参数外,用户还可以指定多个可选参数,这些参数涵盖了 clickhouse-jdbc 提供的所有参数. |
server_time_zone | String | 否 | ZoneId.systemDefault() | 数据库服务中的会话时区。如果未设置,则使用ZoneId.systemDefault()设置服务时区. |
common-options | 否 | - | 源插件常用参数,详见 源通用选项. |
多表配置:
名称 | 类型 | 是否必须 | 默认值 | 描述 |
---|---|---|---|---|
table_path | String | 否 | - | 数据表的完整路径, 例如: default.table . |
sql | String | 否 | - | 用于通过Clickhouse服务搜索数据的查询sql. |
filter_query | String | 否 | - | 数据过滤条件. 格式为: “field = value”, 例如 : filter_query = “id > 2 and type = 1” |
partition_list | Array | 否 | - | 指定分区列表过滤数据. 如果是分区表,该字段可以配置为过滤指定分区的数据。. 例如: partition_list = [“20250615”, “20250616”] |
batch_size | int | 否 | 1024 | 从Clickhouse读取一次可以获得的最大数据行数。 |
注意: 当此配置对应于单个表时,您可以将table_list中的配置项展平到外层。
Clickhouse源连接器支持并行读取数据。
当仅指定table_path
参数时,连接器根据从system.parts
系统表中获取的数据表的part文件实现并行读取。
当仅指定sql
参数时,连接器在集群的每个分片上基于本地表执行查询来实现并发读取。如果sql
参数指定了一个分布式表,则会根据分布式表引擎的集群名获取分片列表执行并发读取。如果sql
指定了一个本地表,那么host
参数配置的节点列表将被视作集群分片列表执行并发读取。
如果同时设置了table_path
和sql
参数,则将在sql模式下执行。推荐在指定sql
参数时同时配置table_path
参数以更好地识别表的元数据。
当指定table_path
参数时,如果不想读取整个表,可以指定partition_list
或filter_query
参数过滤指定条件或分区的数据。
partition_list
: 过滤指定分区的数据filter_query
: 根据指定条件对数据进行过滤batch_size
参数可用于控制每次查询读取的数据量,以避免在读取大量数据时出现OOM异常。适当增加这个值将有助于提高读取过程的性能。
当读取单个表的数据时,建议使用table_path
参数替代sql
参数。
下面的示例演示了如何创建一个数据同步作业,该作业从Clickhouse读取数据并在本地客户端上打印数据
案例1:基于part文件读取策略的并行读取
env { job.mode = "BATCH" parallelism = 5 } source { Clickhouse { host = "localhost:8123" username = "xxx" password = "xxx" table_path = "default.table" server_time_zone = "UTC" partition_list = ["20250615", "20250616"] filter_query = "id > 2 and type = 1" batch_size = 1024 clickhouse.config = { "socket_timeout": "300000" } } } # Console printing of the read Clickhouse data sink { Console { parallelism = 1 } }
案例2:基于SQL读取策略的并行读取
注意:SQL模式下的并行读取方式目前仅支持单表和where条件查询
env { job.mode = "BATCH" parallelism = 5 } source { Clickhouse { host = "localhost:8123" username = "xxx" password = "xxx" table_path = "default.table" server_time_zone = "UTC" sql = "select * from default.table where id > 2 and type = 1" batch_size = 1024 clickhouse.config = { "socket_timeout": "300000" } } } # Console printing of the read Clickhouse data sink { Console { parallelism = 1 } }
案例3:针对复杂SQL场景的单并发读取
当执行复杂SQL查询场景(例如带有join、group by、子查询等的查询)时,连接器将自动切换到单并发执行方式,即使配置了更高的并行度值。
env { job.mode = "BATCH" parallelism = 1 } source { Clickhouse { host = "localhost:8123" username = "xxx" password = "xxx" server_time_zone = "UTC" sql = "select t1.id, t2.category from default.table1 t1 global join default.table2 t2 on t1.id = t2.id where t1.age > 18" batch_size = 1024 clickhouse.config = { "socket_timeout": "300000" } } } # Console printing of the read Clickhouse data sink { Console { parallelism = 1 } }
env { job.mode = "BATCH" parallelism = 5 } source { Clickhouse { host = "localhost:8123" username = "xxx" password = "xxx" table_list = [ { table_path = "default.table1" sql = "select * from default.table1 where id > 2 and type = 1" }, { table_path = "default.table2" sql = "select * from default.table2 where age > 18" } ] server_time_zone = "UTC" clickhouse.config = { "socket_timeout": "300000" } } } # Console printing of the read Clickhouse data sink { Console { parallelism = 1 } }