import ChangeLog from ‘../changelog/connector-clickhouse.md’;

Clickhouse

Clickhouse source connector

Support Those Engines

Spark
Flink
SeaTunnel Zeta

Key Features

supports query SQL and can achieve projection effect.

Description

Used to read data from Clickhouse.

Supported DataSource Info

In order to use the Clickhouse connector, the following dependencies are required. They can be downloaded via install-plugin.sh or from the Maven central repository.

DatasourceSupported VersionsDependency
ClickhouseuniversalDownload

Data Type Mapping

Clickhouse Data TypeSeaTunnel Data Type
String / Int128 / UInt128 / Int256 / UInt256 / Point / Ring / Polygon MultiPolygonSTRING
Int8 / UInt8 / Int16 / UInt16 / Int32INT
UInt64 / Int64 / IntervalYear / IntervalQuarter / IntervalMonth / IntervalWeek / IntervalDay / IntervalHour / IntervalMinute / IntervalSecondBIGINT
Float64DOUBLE
DecimalDECIMAL
Float32FLOAT
DateDATE
DateTimeTIME
ArrayARRAY
MapMAP

Source Options

NameTypeRequiredDefaultDescription
hostStringYes-ClickHouse cluster address, the format is host:port , allowing multiple hosts to be specified. Such as "host1:8123,host2:8123" .
usernameStringYes-ClickHouse user username.
passwordStringYes-ClickHouse user password.
table_listArrayNO-The list of tables to be read.
clickhouse.configMapNo-In addition to the above mandatory parameters that must be specified by clickhouse-jdbc , users can also specify multiple optional parameters, which cover all the parameters provided by clickhouse-jdbc.
server_time_zoneStringNoZoneId.systemDefault()The session time zone in database server. If not set, then ZoneId.systemDefault() is used to determine the server time zone.
common-optionsNo-Source plugin common parameters, please refer to Source Common Options for details.

Table list configuration:

NameTypeRequiredDefaultDescription
table_pathStringNO-The path to the full path of table, example: default.table
sqlStringNO-The query sql used to search data though Clickhouse server.
filter_queryStringNO-Data filtering in Clickhouse. the format is “field = value”, example : filter_query = “id > 2 and type = 1”
partition_listArrayNO-Table partition list to filter the specified partition. If it is a partitioned table, this field can be configured to filter the data of the specified partition. example: partition_list = [“20250615”, “20250616”]
batch_sizeintNO1024The maximum rows of data that can be obtained by reading from Clickhouse once.

Note: When this configuration corresponds to a single table, you can flatten the configuration items in table_list to the outer layer.

Parallel Reader

The Clickhouse source connector supports parallel reading of data.

For query table mode, the table_path parameter is set and the parallel reading is implemented based on the part file of table, which is obtained from the system.parts table.

For sql mode, the parallel reading is implemented based on the parallelism execution of local table-based queries on each shard of the cluster. If the sql parameter specifies a distributed table, the corresponding local table will be automatically converted to execute the query. If the sql specifies a local table, the node configured by the host parameter will be used as the shard to perform parallelism reading.

If both the table_path and sql parameters are set, it will be executed in sql mode, and the table_path parameter can be used to better identify the metadata of the table.

Tips

In query table mode, if you don't want to read the entire table, you can specify the partition_list or filter_query parameter.

  • partition_list: filter the data of the specified partition
  • filter_query: filter the data based on the specified conditions

The batch_size parameter can be used to control the amount of data read each time to avoid OOM exception when reading a large amount of data. Appropriately increasing this value will help to improve the performance of the reading process.

Use table_path to replace sql for single table reading.

How to Create a Clickhouse Data Synchronization Jobs

Single Table

The following example demonstrates how to create a data synchronization job that reads data from Clickhouse and prints it on the local client:

Case 1: Parallel reading based on the part read strategy

env {
  job.mode = "BATCH"
  parallelism = 5
}

source {
  Clickhouse {
    host = "localhost:8123"
    username = "xxx"
    password = "xxx"
    table_path = "default.table"
    server_time_zone = "UTC"
    partition_list = ["20250615", "20250616"]
    filter_query = "id > 2 and type = 1"
    batch_size = 1024
    clickhouse.config = {
      "socket_timeout": "300000"
    }
  }
}

# Console printing of the read Clickhouse data
sink {
  Console {
    parallelism = 1
  }
}

Case 2: Parallel reading based on the SQL read strategy

Parallel execution in SQL mode currently only supports single-table and WHERE-condition queries

env {
  job.mode = "BATCH"
  parallelism = 5
}

source {
  Clickhouse {
    host = "localhost:8123"
    username = "xxx"
    password = "xxx"
    table_path = "default.table"
    server_time_zone = "UTC"
    sql = "select * from default.table where id > 2 and type = 1"
    batch_size = 1024
    clickhouse.config = {
      "socket_timeout": "300000"
    }
  }
}

# Console printing of the read Clickhouse data
sink {
  Console {
    parallelism = 1
  }
}

Case 3: Complex SQL with single parallelism execution

When using complex SQL queries (such as queries with join, group by, subqueries, etc.), the connector will automatically switch to single parallel execution mode, even if a higher parallelism value is configured.

env {
  job.mode = "BATCH"
  parallelism = 1
}

source {
  Clickhouse {
    host = "localhost:8123"
    username = "xxx"
    password = "xxx"
    server_time_zone = "UTC"
    sql = "select t1.id, t2.category from default.table1 t1 global join default.table2 t2 on t1.id = t2.id where t1.age > 18"
    batch_size = 1024
    clickhouse.config = {
      "socket_timeout": "300000"
    }
  }
}

# Console printing of the read Clickhouse data
sink {
  Console {
    parallelism = 1
  }
}

Multiple table

env {
  job.mode = "BATCH"
  parallelism = 5
}

source {
  Clickhouse {
    host = "localhost:8123"
    username = "xxx"
    password = "xxx"
    table_list = [
      {
        table_path = "default.table1"
        sql = "select * from default.table1 where id > 2 and type = 1"
      },
      {
        table_path = "default.table2"
        sql = "select * from default.table2 where age > 18"
      }
    ]
    server_time_zone = "UTC"
    clickhouse.config = {
      "socket_timeout": "300000"
    }
  }
}

# Console printing of the read Clickhouse data
sink {
  Console {
    parallelism = 1
  }
}

Changelog