import ChangeLog from ‘../changelog/connector-databend.md’;

Databend

Databend sink 连接器

支持的引擎

Spark
Flink
SeaTunnel Zeta

主要特性

描述

用于向 Databend 写入数据的 sink 连接器。支持批处理和流处理模式。 Databend sink 内部通过 stage attachment 实现数据的批量导入。

依赖

对于 Spark/Flink

  1. 你需要下载 Databend JDBC driver jar package 并添加到目录 ${SEATUNNEL_HOME}/plugins/.

对于 SeaTunnel Zeta

  1. 你需要下载 Databend JDBC driver jar package 并添加到目录 ${SEATUNNEL_HOME}/lib/.

Sink 选项

名称类型是否必须默认值描述
urlString-Databend JDBC 连接 URL
usernameString-Databend 数据库用户名
passwordString-Databend 数据库密码
databaseString-Databend 数据库名称,默认使用连接 URL 中指定的数据库名
tableString-Databend 表名称
batch_sizeInteger1000批量写入的记录数
auto_commitBooleantrue是否自动提交事务
max_retriesInteger3写入失败时的最大重试次数
schema_save_modeEnumCREATE_SCHEMA_WHEN_NOT_EXIST保存 Schema 的模式
data_save_modeEnumAPPEND_DATA保存数据的模式
custom_sqlString-自定义写入 SQL,通常用于复杂的写入场景
execute_timeout_secInteger300执行SQL的超时时间(秒)
jdbc_configMap-额外的 JDBC 连接配置,如连接超时参数等
conflict_keyString-cdc 模式下的冲突键,用于确定冲突解决的主键
enable_deleteBooleanfalsecdc 模式下是否允许删除操作

schema_save_mode [Enum]

在开启同步任务之前,针对现有的表结构选择不同的处理方案。 选项介绍:
RECREATE_SCHEMA :表不存在时创建,表存在时删除并重建。
CREATE_SCHEMA_WHEN_NOT_EXIST :表不存在时会创建,表存在时跳过。
ERROR_WHEN_SCHEMA_NOT_EXIST :表不存在时会报错。
IGNORE :忽略对表的处理。

data_save_mode [Enum]

在开启同步任务之前,针对目标端已有的数据选择不同的处理方案。 选项介绍:
DROP_DATA: 保留数据库结构并删除数据。
APPEND_DATA:保留数据库结构,保留数据。
CUSTOM_PROCESSING:用户自定义处理。
ERROR_WHEN_DATA_EXISTS:有数据时报错。

数据类型映射

SeaTunnel 数据类型Databend 数据类型
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DECIMALDECIMAL
STRINGSTRING
BYTESVARBINARY
DATEDATE
TIMETIME
TIMESTAMPTIMESTAMP

任务示例

简单示例

env {
  execution.parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    row.num = 10
    schema = {
      fields {
        name = string
        age = int
        score = double
      }
    }
  }
}

sink {
  Databend {
    url = "jdbc:databend://localhost:8000"
    username = "root"
    password = ""
    database = "default"
    table = "target_table"
    batch_size = 1000
  }
}

使用自定义 SQL 写入

sink {
  Databend {
    url = "jdbc:databend://localhost:8000"
    username = "root"
    password = ""
    database = "default"
    table = "target_table"
    custom_sql = "INSERT INTO default.target_table(name, age, score) VALUES(?, ?, ?)"
  }
}

使用 Schema 保存模式

sink {
  Databend {
    url = "jdbc:databend://localhost:8000"
    username = "root"
    password = ""
    database = "default"
    table = "target_table"
    schema_save_mode = "RECREATE_SCHEMA"
    data_save_mode = "APPEND_DATA"
  }
}

CDC mode

sink {
  Databend {
    url = "jdbc:databend://databend:8000/default?ssl=false"
    username = "root"
    password = ""
    database = "default"
    table = "sink_table"
    
    # Enable CDC mode
    batch_size = 1
    interval = 3
    conflict_key = "id"
    enable_delete = true
  }
}

相关链接

Changelog