import ChangeLog from ‘../changelog/connector-clickhouse.md’;

Clickhouse

Clickhouse source 连接器

支持引擎

Spark
Flink
SeaTunnel Zeta

核心特性

支持查询SQL,可以实现投影效果。

描述

用于从Clickhouse读取数据。

支持的数据源信息

为了使用 Clickhouse 连接器,需要以下依赖项。它们可以通过 install-plugin.sh 或从 Maven 中央存储库下载。

数据源支持的版本依赖
ClickhouseuniversalDownload

数据类型映射

Clickhouse 数据类型SeaTunnel 数据类型
String / Int128 / UInt128 / Int256 / UInt256 / Point / Ring / Polygon MultiPolygonSTRING
Int8 / UInt8 / Int16 / UInt16 / Int32INT
UInt64 / Int64 / IntervalYear / IntervalQuarter / IntervalMonth / IntervalWeek / IntervalDay / IntervalHour / IntervalMinute / IntervalSecondBIGINT
Float64DOUBLE
DecimalDECIMAL
Float32FLOAT
DateDATE
DateTimeTIME
ArrayARRAY
MapMAP

Source 选项

名称类型是否必须默认值描述
hostString-ClickHouse 集群地址, 格式是host:port , 允许多个hosts配置. 例如 "host1:8123,host2:8123" .
usernameString-ClickHouse user 用户账号.
passwordString-ClickHouse user 用户密码.
table_listArrayNO-要读取的数据表列表,支持配置多表.
clickhouse.configMap-除了上述必须由 clickhouse-jdbc 指定的必填参数外,用户还可以指定多个可选参数,这些参数涵盖了 clickhouse-jdbc 提供的所有参数.
server_time_zoneStringZoneId.systemDefault()数据库服务中的会话时区。如果未设置,则使用ZoneId.systemDefault()设置服务时区.
common-options-源插件常用参数,详见 源通用选项.

多表配置:

名称类型是否必须默认值描述
table_pathString-数据表的完整路径, 例如: default.table.
sqlString-用于通过Clickhouse服务搜索数据的查询sql.
filter_queryString-数据过滤条件. 格式为: “field = value”, 例如 : filter_query = “id > 2 and type = 1”
partition_listArray-指定分区列表过滤数据. 如果是分区表,该字段可以配置为过滤指定分区的数据。. 例如: partition_list = [“20250615”, “20250616”]
batch_sizeint1024从Clickhouse读取一次可以获得的最大数据行数。

注意: 当此配置对应于单个表时,您可以将table_list中的配置项展平到外层。

并行读取

Clickhouse源连接器支持并行读取数据。

当仅指定table_path参数时,连接器根据从system.parts系统表中获取的数据表的part文件实现并行读取。

当仅指定sql参数时,连接器在集群的每个分片上基于本地表执行查询来实现并发读取。如果sql参数指定了一个分布式表,则会根据分布式表引擎的集群名获取分片列表执行并发读取。如果sql指定了一个本地表,那么host参数配置的节点列表将被视作集群分片列表执行并发读取。

如果同时设置了table_pathsql参数,则将在sql模式下执行。推荐在指定sql参数时同时配置table_path参数以更好地识别表的元数据。

Tips

当指定table_path参数时,如果不想读取整个表,可以指定partition_listfilter_query参数过滤指定条件或分区的数据。

  • partition_list: 过滤指定分区的数据
  • filter_query: 根据指定条件对数据进行过滤

batch_size参数可用于控制每次查询读取的数据量,以避免在读取大量数据时出现OOM异常。适当增加这个值将有助于提高读取过程的性能。

当读取单个表的数据时,建议使用table_path参数替代sql参数。

如何创建Clickhouse数据同步作业

单表配置

下面的示例演示了如何创建一个数据同步作业,该作业从Clickhouse读取数据并在本地客户端上打印数据

案例1:基于part文件读取策略的并行读取

env {
  job.mode = "BATCH"
  parallelism = 5
}

source {
  Clickhouse {
    host = "localhost:8123"
    username = "xxx"
    password = "xxx"
    table_path = "default.table"
    server_time_zone = "UTC"
    partition_list = ["20250615", "20250616"]
    filter_query = "id > 2 and type = 1"
    batch_size = 1024
    clickhouse.config = {
      "socket_timeout": "300000"
    }
  }
}

# Console printing of the read Clickhouse data
sink {
  Console {
    parallelism = 1
  }
}

案例2:基于SQL读取策略的并行读取

注意:SQL模式下的并行读取方式目前仅支持单表和where条件查询

env {
  job.mode = "BATCH"
  parallelism = 5
}

source {
  Clickhouse {
    host = "localhost:8123"
    username = "xxx"
    password = "xxx"
    table_path = "default.table"
    server_time_zone = "UTC"
    sql = "select * from default.table where id > 2 and type = 1"
    batch_size = 1024
    clickhouse.config = {
      "socket_timeout": "300000"
    }
  }
}

# Console printing of the read Clickhouse data
sink {
  Console {
    parallelism = 1
  }
}

案例3:针对复杂SQL场景的单并发读取

当执行复杂SQL查询场景(例如带有join、group by、子查询等的查询)时,连接器将自动切换到单并发执行方式,即使配置了更高的并行度值。

env {
  job.mode = "BATCH"
  parallelism = 1
}

source {
  Clickhouse {
    host = "localhost:8123"
    username = "xxx"
    password = "xxx"
    server_time_zone = "UTC"
    sql = "select t1.id, t2.category from default.table1 t1 global join default.table2 t2 on t1.id = t2.id where t1.age > 18"
    batch_size = 1024
    clickhouse.config = {
      "socket_timeout": "300000"
    }
  }
}

# Console printing of the read Clickhouse data
sink {
  Console {
    parallelism = 1
  }
}

多表配置

env {
  job.mode = "BATCH"
  parallelism = 5
}

source {
  Clickhouse {
    host = "localhost:8123"
    username = "xxx"
    password = "xxx"
    table_list = [
      {
        table_path = "default.table1"
        sql = "select * from default.table1 where id > 2 and type = 1"
      },
      {
        table_path = "default.table2"
        sql = "select * from default.table2 where age > 18"
      }
    ]
    server_time_zone = "UTC"
    clickhouse.config = {
      "socket_timeout": "300000"
    }
  }
}

# Console printing of the read Clickhouse data
sink {
  Console {
    parallelism = 1
  }
}

变更日志