import ChangeLog from ‘../changelog/connector-clickhouse.md’;
Clickhouse source connector
Spark
Flink
SeaTunnel Zeta
supports query SQL and can achieve projection effect.
Used to read data from Clickhouse.
In order to use the Clickhouse connector, the following dependencies are required. They can be downloaded via install-plugin.sh or from the Maven central repository.
Datasource | Supported Versions | Dependency |
---|---|---|
Clickhouse | universal | Download |
Clickhouse Data Type | SeaTunnel Data Type |
---|---|
String / Int128 / UInt128 / Int256 / UInt256 / Point / Ring / Polygon MultiPolygon | STRING |
Int8 / UInt8 / Int16 / UInt16 / Int32 | INT |
UInt64 / Int64 / IntervalYear / IntervalQuarter / IntervalMonth / IntervalWeek / IntervalDay / IntervalHour / IntervalMinute / IntervalSecond | BIGINT |
Float64 | DOUBLE |
Decimal | DECIMAL |
Float32 | FLOAT |
Date | DATE |
DateTime | TIME |
Array | ARRAY |
Map | MAP |
Name | Type | Required | Default | Description |
---|---|---|---|---|
host | String | Yes | - | ClickHouse cluster address, the format is host:port , allowing multiple hosts to be specified. Such as "host1:8123,host2:8123" . |
username | String | Yes | - | ClickHouse user username. |
password | String | Yes | - | ClickHouse user password. |
table_list | Array | NO | - | The list of tables to be read. |
clickhouse.config | Map | No | - | In addition to the above mandatory parameters that must be specified by clickhouse-jdbc , users can also specify multiple optional parameters, which cover all the parameters provided by clickhouse-jdbc . |
server_time_zone | String | No | ZoneId.systemDefault() | The session time zone in database server. If not set, then ZoneId.systemDefault() is used to determine the server time zone. |
common-options | No | - | Source plugin common parameters, please refer to Source Common Options for details. |
Table list configuration:
Name | Type | Required | Default | Description |
---|---|---|---|---|
table_path | String | NO | - | The path to the full path of table, example: default.table |
sql | String | NO | - | The query sql used to search data though Clickhouse server. |
filter_query | String | NO | - | Data filtering in Clickhouse. the format is “field = value”, example : filter_query = “id > 2 and type = 1” |
partition_list | Array | NO | - | Table partition list to filter the specified partition. If it is a partitioned table, this field can be configured to filter the data of the specified partition. example: partition_list = [“20250615”, “20250616”] |
batch_size | int | NO | 1024 | The maximum rows of data that can be obtained by reading from Clickhouse once. |
Note: When this configuration corresponds to a single table, you can flatten the configuration items in table_list to the outer layer.
The Clickhouse source connector supports parallel reading of data.
For query table mode, the table_path
parameter is set and the parallel reading is implemented based on the part file of table, which is obtained from the system.parts
table.
For sql mode, the parallel reading is implemented based on the parallelism execution of local table-based queries on each shard of the cluster. If the sql
parameter specifies a distributed table, the corresponding local table will be automatically converted to execute the query. If the sql
specifies a local table, the node configured by the host
parameter will be used as the shard to perform parallelism reading.
If both the table_path
and sql
parameters are set, it will be executed in sql mode, and the table_path
parameter can be used to better identify the metadata of the table.
In query table mode, if you don't want to read the entire table, you can specify the partition_list
or filter_query
parameter.
partition_list
: filter the data of the specified partitionfilter_query
: filter the data based on the specified conditionsThe batch_size
parameter can be used to control the amount of data read each time to avoid OOM exception when reading a large amount of data. Appropriately increasing this value will help to improve the performance of the reading process.
Use table_path
to replace sql
for single table reading.
The following example demonstrates how to create a data synchronization job that reads data from Clickhouse and prints it on the local client:
Case 1: Parallel reading based on the part read strategy
env { job.mode = "BATCH" parallelism = 5 } source { Clickhouse { host = "localhost:8123" username = "xxx" password = "xxx" table_path = "default.table" server_time_zone = "UTC" partition_list = ["20250615", "20250616"] filter_query = "id > 2 and type = 1" batch_size = 1024 clickhouse.config = { "socket_timeout": "300000" } } } # Console printing of the read Clickhouse data sink { Console { parallelism = 1 } }
Case 2: Parallel reading based on the SQL read strategy
Parallel execution in SQL mode currently only supports single-table and WHERE-condition queries
env { job.mode = "BATCH" parallelism = 5 } source { Clickhouse { host = "localhost:8123" username = "xxx" password = "xxx" table_path = "default.table" server_time_zone = "UTC" sql = "select * from default.table where id > 2 and type = 1" batch_size = 1024 clickhouse.config = { "socket_timeout": "300000" } } } # Console printing of the read Clickhouse data sink { Console { parallelism = 1 } }
Case 3: Complex SQL with single parallelism execution
When using complex SQL queries (such as queries with join, group by, subqueries, etc.), the connector will automatically switch to single parallel execution mode, even if a higher parallelism value is configured.
env { job.mode = "BATCH" parallelism = 1 } source { Clickhouse { host = "localhost:8123" username = "xxx" password = "xxx" server_time_zone = "UTC" sql = "select t1.id, t2.category from default.table1 t1 global join default.table2 t2 on t1.id = t2.id where t1.age > 18" batch_size = 1024 clickhouse.config = { "socket_timeout": "300000" } } } # Console printing of the read Clickhouse data sink { Console { parallelism = 1 } }
env { job.mode = "BATCH" parallelism = 5 } source { Clickhouse { host = "localhost:8123" username = "xxx" password = "xxx" table_list = [ { table_path = "default.table1" sql = "select * from default.table1 where id > 2 and type = 1" }, { table_path = "default.table2" sql = "select * from default.table2 where age > 18" } ] server_time_zone = "UTC" clickhouse.config = { "socket_timeout": "300000" } } } # Console printing of the read Clickhouse data sink { Console { parallelism = 1 } }