import ChangeLog from ‘../changelog/connector-doris.md’;
Doris sink 连接器
Doris version is >= 1.1.x
Doris version is >= 1.2.x
Doris version is 2.x
Spark
Flink
SeaTunnel Zeta
用于发送数据到doris. 同时支持流模式和批模式处理. Doris Sink连接器的内部实现是通过stream load批量缓存和导入的。
- 你需要下载 jdbc driver jar package 并添加到目录
${SEATUNNEL_HOME}/plugins/
.
- 你需要下载 jdbc driver jar package 并添加到目录
${SEATUNNEL_HOME}/lib/
.
Name | Type | Required | Default | Description |
---|---|---|---|---|
fenodes | String | Yes | - | Doris 集群 fenodes 地址, 格式是 "fe_ip:fe_http_port, ..." |
query-port | int | No | 9030 | Doris Fenodes mysql协议查询端口 |
username | String | Yes | - | Doris 用户名 |
password | String | Yes | - | Doris 密码 |
database | String | Yes | - | Doris 数据库名称 , 使用 ${database_name} 表示上游数据库名称。 |
table | String | Yes | - | Doris 表名, 使用 ${table_name} 表示上游表名。 |
table.identifier | String | Yes | - | Doris 表的名称,2.3.5 版本后将弃用,请使用 database 和 table 代替。 |
sink.label-prefix | String | Yes | - | stream load导入使用的标签前缀。 在2pc场景下,需要全局唯一性来保证SeaTunnel的EOS语义。 |
sink.enable-2pc | bool | No | false | 是否启用两阶段提交(2pc),默认为 false。 对于两阶段提交,请参考此处。 |
sink.enable-delete | bool | No | - | 是否启用删除。 该选项需要Doris表开启批量删除功能(0.15+版本默认开启),且仅支持Unique模型。 您可以在此link获得更多详细信息 |
sink.check-interval | int | No | 10000 | 加载过程中检查异常时间间隔。 |
sink.max-retries | int | No | 3 | 向数据库写入记录失败时的最大重试次数。 |
sink.buffer-size | int | No | 256 * 1024 | 用于缓存stream load数据的缓冲区大小。 |
sink.buffer-count | int | No | 3 | 用于缓存stream load数据的缓冲区计数。 |
doris.batch.size | int | No | 1024 | 每次http请求写入doris的批量大小,当row达到该大小或者执行checkpoint时,缓存的数据就会写入服务器。 |
needs_unsupported_type_casting | boolean | No | false | 是否启用不支持的类型转换,例如 Decimal64 到 Double。 |
case_sensitive | boolean | No | true | 是否保留表名和字段名的原始大小写。当设置为 false 时,表名和字段名将被转换为小写。 |
schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | schema保存模式,请参考下面的schema_save_mode |
data_save_mode | Enum | no | APPEND_DATA | 数据保存模式,请参考下面的data_save_mode 。 |
save_mode_create_template | string | no | see below | 见下文。 |
custom_sql | String | no | - | 当data_save_mode选择CUSTOM_PROCESSING时,需要填写CUSTOM_SQL参数。 该参数通常填写一条可以执行的SQL。 SQL将在同步任务之前执行。 |
doris.config | map | yes | - | 该选项用于支持自动生成sql时的insert、delete、update等操作,以及支持的格式。 |
在开启同步任务之前,针对现有的表结构选择不同的处理方案。 选项介绍:RECREATE_SCHEMA
:表不存在时创建,表保存时删除并重建。 CREATE_SCHEMA_WHEN_NOT_EXIST
:表不存在时会创建,表存在时跳过。ERROR_WHEN_SCHEMA_NOT_EXIST
:表不存在时会报错。IGNORE
:忽略对表的处理。
在开启同步任务之前,针对目标端已有的数据选择不同的处理方案。 选项介绍:DROP_DATA
: 保留数据库结构并删除数据。APPEND_DATA
:保留数据库结构,保留数据。CUSTOM_PROCESSING
:用户自定义处理。ERROR_WHEN_DATA_EXISTS
:有数据时报错。
使用模板自动创建Doris表, 会根据上游数据类型和schema类型创建相应的建表语句, 默认模板可以根据情况进行修改。
默认模板:
CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` ( ${rowtype_primary_key}, ${rowtype_fields} ) ENGINE=OLAP UNIQUE KEY (${rowtype_primary_key}) COMMENT '${comment}' DISTRIBUTED BY HASH (${rowtype_primary_key}) PROPERTIES ( "replication_allocation" = "tag.location.default: 1", "in_memory" = "false", "storage_format" = "V2", "disable_auto_compaction" = "false" )
如果模板中填写了自定义字段,例如添加 id 字段
CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` ( id, ${rowtype_fields} ) ENGINE = OLAP UNIQUE KEY (${rowtype_primary_key}) COMMENT '${comment}' DISTRIBUTED BY HASH (${rowtype_primary_key}) PROPERTIES ( "replication_num" = "1" );
连接器会自动从上游获取对应类型完成填充, 并从“rowtype_fields”中删除 id 字段。 该方法可用于自定义字段类型和属性的修改。
可以使用以下占位符:
Doris 数据类型 | SeaTunnel 数据类型 |
---|---|
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT TINYINT |
INT | INT SMALLINT TINYINT |
BIGINT | BIGINT INT SMALLINT TINYINT |
LARGEINT | BIGINT INT SMALLINT TINYINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE FLOAT |
DECIMAL | DECIMAL DOUBLE FLOAT |
DATE | DATE |
DATETIME | TIMESTAMP |
CHAR | STRING |
VARCHAR | STRING |
STRING | STRING |
ARRAY | ARRAY |
MAP | MAP |
JSON | STRING |
HLL | 尚不支持 |
BITMAP | 尚不支持 |
QUANTILE_STATE | 尚不支持 |
STRUCT | 尚不支持 |
支持的格式包括 CSV 和 JSON。
适当增加sink.buffer-size
和doris.batch.size
的值可以提高写性能。
在流模式下,如果doris.batch.size
和checkpoint.interval
都配置为较大的值,最后到达的数据可能会有较大的延迟(延迟的时间就是检查点间隔的时间)。
这是因为最后到达的数据总量可能不会超过doris.batch.size指定的阈值。因此,在接收到数据的数据量没有超过该阈值之前只有检查点才会触发提交操作。因此,需要选择一个合适的检查点间隔。
此外,如果你通过sink.enable-2pc=true
属性启用2pc。sink.buffer-size
将会失去作用,只有检查点才能触发提交。
下面的例子描述了向Doris写入多种数据类型,用户需要在下游创建对应的表。
env { parallelism = 1 job.mode = "BATCH" checkpoint.interval = 10000 } source { FakeSource { row.num = 10 map.size = 10 array.size = 10 bytes.length = 10 string.length = 10 schema = { fields { c_map = "map<string, array<int>>" c_array = "array<int>" c_string = string c_boolean = boolean c_tinyint = tinyint c_smallint = smallint c_int = int c_bigint = bigint c_float = float c_double = double c_decimal = "decimal(16, 1)" c_null = "null" c_bytes = bytes c_date = date c_timestamp = timestamp } } } } sink { Doris { fenodes = "doris_cdc_e2e:8030" username = root password = "" database = "test" table = "e2e_table_sink" sink.label-prefix = "test-cdc" sink.enable-2pc = "true" sink.enable-delete = "true" doris.config { format = "json" read_json_by_line = "true" } } }
本示例定义了一个SeaTunnel同步任务,通过FakeSource自动生成数据并发送给Doris Sink,FakeSource使用schema、score(int类型)模拟CDC数据,Doris需要创建一个名为test.e2e_table_sink的sink任务及其对应的表 。
env { parallelism = 1 job.mode = "BATCH" checkpoint.interval = 10000 } source { FakeSource { schema = { fields { pk_id = bigint name = string score = int sex = boolean number = tinyint height = float sight = double create_time = date update_time = timestamp } } rows = [ { kind = INSERT fields = [1, "A", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"] }, { kind = INSERT fields = [2, "B", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"] }, { kind = INSERT fields = [3, "C", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"] }, { kind = UPDATE_BEFORE fields = [1, "A", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"] }, { kind = UPDATE_AFTER fields = [1, "A_1", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"] }, { kind = DELETE fields = [2, "B", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"] } ] } } sink { Doris { fenodes = "doris_cdc_e2e:8030" username = root password = "" database = "test" table = "e2e_table_sink" sink.label-prefix = "test-cdc" sink.enable-2pc = "true" sink.enable-delete = "true" doris.config { format = "json" read_json_by_line = "true" } } }
sink { Doris { fenodes = "e2e_dorisdb:8030" username = root password = "" database = "test" table = "e2e_table_sink" sink.enable-2pc = "true" sink.label-prefix = "test_json" doris.config = { format="json" read_json_by_line="true" } } }
sink { Doris { fenodes = "e2e_dorisdb:8030" username = root password = "" database = "test" table = "e2e_table_sink" sink.enable-2pc = "true" sink.label-prefix = "test_csv" doris.config = { format = "csv" column_separator = "," } } }
sink { Doris { fenodes = "e2e_dorisdb:8030" username = root password = "" database = "Test_DB" # 保留原始大小写 table = "Test_Table" # 保留原始大小写 case_sensitive = true # 默认值,保留原始大小写 sink.enable-2pc = "true" sink.label-prefix = "test_case_sensitive" doris.config = { format = "json" read_json_by_line = "true" } } }