import ChangeLog from ‘../changelog/connector-file-local.md’;
本地文件接收器
将数据输出到本地文件。
:::tip 提示
如果你使用的是 spark/flink,为了使用此连接器,你必须确保你的 spark/flink 集群已集成 hadoop。已测试的 hadoop 版本是 2.x。
如果你使用 SeaTunnel Engine,它会在下载和安装 SeaTunnel Engine 时自动集成 hadoop jar。你可以在 ${SEATUNNEL_HOME}/lib 下检查 jar 包以确认这一点。
:::
[x] 多模态
使用二进制文件格式读取和写入任何格式的文件,例如视频、图片等。简而言之,任何文件都可以同步到目标位置。
[x] 精确一次
默认情况下,我们使用 2PC 提交以确保精确一次
。
[x] 文件格式类型
名称 | 类型 | 是否必需 | 默认值 | 描述 |
---|---|---|---|---|
path | string | 是 | - | 目标目录路径 |
tmp_path | string | 否 | /tmp/seatunnel | 结果文件将首先写入临时路径,然后使用 mv 将临时目录提交到目标目录。 |
custom_filename | boolean | 否 | false | 是否需要自定义文件名 |
file_name_expression | string | 否 | “${transactionId}” | 仅在 custom_filename 为 true 时使用 |
filename_time_format | string | 否 | “yyyy.MM.dd” | 仅在 custom_filename 为 true 时使用 |
file_format_type | string | 否 | “csv” | 文件格式类型 |
filename_extension | string | 否 | - | 使用自定义的文件扩展名覆盖默认的文件扩展名。 例如:.xml , .json , dat , .customtype |
field_delimiter | string | 否 | ‘\001’ | 仅在 file_format_type 为 text 时使用 |
row_delimiter | string | 否 | “\n” | 仅在 file_format_type 为 text 、csv 、json 时使用 |
have_partition | boolean | 否 | false | 是否需要处理分区 |
partition_by | array | 否 | - | 仅在 have_partition 为 true 时使用 |
partition_dir_expression | string | 否 | “${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/” | 仅在 have_partition 为 true 时使用 |
is_partition_field_write_in_file | boolean | 否 | false | 仅在 have_partition 为 true 时使用 |
sink_columns | array | 否 | 当此参数为空时,所有字段都是 sink 列 | |
is_enable_transaction | boolean | 否 | true | 是否启用事务 |
batch_size | int | 否 | 1000000 | 批量大小 |
single_file_mode | boolean | 否 | false | 每个并行度只会输出一个文件,当此参数开启时,batch_size就不会生效。输出的文件名没有文件块后缀。 |
create_empty_file_when_no_data | boolean | 否 | false | 当上游没有数据同步时,依然生成对应的数据文件。 |
compress_codec | string | 否 | none | 压缩编码 |
common-options | object | 否 | - | 常见选项 |
max_rows_in_memory | int | 否 | - | 仅在 file_format_type 为 excel 时使用 |
sheet_name | string | 否 | Sheet${随机数} | 仅在 file_format_type 为 excel 时使用 |
csv_string_quote_mode | enum | 否 | MINIMAL | 仅在文件格式为 CSV 时使用。 |
xml_root_tag | string | 否 | RECORDS | 仅在 file_format 为 xml 时使用 |
xml_row_tag | string | 否 | RECORD | 仅在 file_format 为 xml 时使用 |
xml_use_attr_format | boolean | 否 | - | 仅在 file_format 为 xml 时使用 |
parquet_avro_write_timestamp_as_int96 | boolean | 否 | false | 仅在 file_format 为 parquet 时使用 |
parquet_avro_write_fixed_as_int96 | array | 否 | - | 仅在 file_format 为 parquet 时使用 |
enable_header_write | boolean | 否 | false | 仅在 file_format_type 为 text,csv 时使用。 false:不写入表头,true:写入表头。 |
encoding | string | 否 | “UTF-8” | 仅在 file_format_type 为 json,text,csv,xml 时使用 |
merge_update_event | boolean | 否 | false | 仅当file_format_type为canal_json、debezium_json、maxwell_json. |
目标目录路径是必需的,你可以通过使用 ${database_name}
、${table_name}
和 ${schema_name}
将上游的 CatalogTable 注入到路径中。
是否自定义文件名
仅在 custom_filename
为 true
时使用
file_name_expression
描述将创建到 path
中的文件表达式。我们可以在 file_name_expression
中添加变量 ${now}
或 ${uuid}
,例如 test_${uuid}_${now}
,${now}
表示当前时间,其格式可以通过指定 filename_time_format
选项来定义。
请注意,如果 is_enable_transaction
为 true
,我们将自动在文件名的头部添加 ${transactionId}_
。
仅在 custom_filename
为 true
时使用
当 file_name_expression
参数中的格式为 xxxx-${now}
时,filename_time_format
可以指定路径的时间格式,默认值为 yyyy.MM.dd
。常用的时间格式如下所示:
符号 | 描述 |
---|---|
y | 年 |
M | 月 |
d | 日 |
H | 小时 (0-23) |
m | 分钟 |
s | 秒 |
我们支持以下文件类型:
text
csv
parquet
orc
json
excel
xml
binary
canal_json
debezium_json
maxwell_json
请注意,最终的文件名将以 file_format_type 的后缀结尾,文本文件的后缀是 txt
。
数据行中列之间的分隔符。仅在 text
文件格式下需要。
文件中行之间的分隔符。仅在 text
、csv
、json
文件格式下需要。
是否需要处理分区。
仅在 have_partition
为 true
时使用。
基于选定字段进行数据分区。
仅在 have_partition
为 true
时使用。
如果指定了 partition_by
,我们将基于分区信息生成相应的分区目录,最终文件将放置在分区目录中。
默认的 partition_dir_expression
是 ${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/
。k0
是第一个分区字段,v0
是第一个分区字段的值。
仅在 have_partition
为 true
时使用。
如果 is_partition_field_write_in_file
为 true
,分区字段及其值将写入数据文件。
例如,如果你想写入一个 Hive 数据文件,其值应该为 false
。
需要写入文件的列,默认值为从 Transform
或 Source
获取的所有列。字段的顺序决定了实际写入文件的顺序。
如果 is_enable_transaction
为 true,我们将确保数据在写入目标目录时不会丢失或重复。
请注意,如果 is_enable_transaction
为 true,我们将自动在文件名前添加 ${transactionId}_
。
目前仅支持 true
。
文件中的最大行数。对于 SeaTunnel Engine,文件中的行数由 batch_size
和 checkpoint.interval
共同决定。如果 checkpoint.interval
的值足够大,sink writer 将在文件中的行数超过 batch_size
时写入文件。如果 checkpoint.interval
很小,当触发新检查点时,sink writer 将创建一个新文件。
文件的压缩编码,支持的压缩编码如下所示:
lzo
none
lzo
none
lzo
none
lzo
snappy
lz4
zlib
none
lzo
snappy
lz4
gzip
brotli
zstd
none
提示:excel 类型不支持任何压缩格式
Sink 插件的常见参数,请参阅 Sink 常见选项 获取详细信息。
当文件格式为 Excel 时,内存中可以缓存的数据项最大数量。
工作簿的表名。
当文件格式为 CSV 时,CSV 的字符串引号模式。
_root_tag [string]
指定 XML 文件中根元素的标签名。
指定 XML 文件中数据行的标签名。
指定是否使用标签属性格式处理数据。
支持从时间戳写入 Parquet INT96,仅对 parquet 文件有效。
支持从 12 字节字段写入 Parquet INT96,仅对 parquet 文件有效。
仅在 file_format_type 为 text,csv 时使用。false:不写入表头,true:写入表头。
仅在 file_format_type 为 json,text,csv,xml 时使用。文件写入的编码。该参数将通过 Charset.forName(encoding)
解析。
仅当file_format_type为canal_json、debezium_json、maxwell_json时使用. 设置成true,序列化数据时,UPDATE_AFTER 和 UPDATE_BEFORE 会合并成 UPDATE; 设置成false,序列化数据时,UPDATE_AFTER 和 UPDATE_BEFORE 不会合并;
对于 orc 文件格式的简单配置
LocalFile { path = "/tmp/hive/warehouse/test2" file_format_type = "orc" }
对于带有 encoding
的 json、text、csv 或 xml 文件格式
LocalFile { path = "/tmp/hive/warehouse/test2" file_format_type = "text" encoding = "gbk" }
对于带有 sink_columns
的 parquet 文件格式
LocalFile { path = "/tmp/hive/warehouse/test2" file_format_type = "parquet" sink_columns = ["name","age"] }
对于带有 have_partition
、custom_filename
和 sink_columns
的 text 文件格式
LocalFile { path = "/tmp/hive/warehouse/test2" file_format_type = "text" field_delimiter = "\t" row_delimiter = "\n" have_partition = true partition_by = ["age"] partition_dir_expression = "${k0}=${v0}" is_partition_field_write_in_file = true custom_filename = true file_name_expression = "${transactionId}_${now}" filename_time_format = "yyyy.MM.dd" sink_columns = ["name","age"] is_enable_transaction = true }
对于带有 sheet_name
和 max_rows_in_memory
的 excel 文件格式
LocalFile { path="/tmp/seatunnel/excel" sheet_name = "Sheet1" max_rows_in_memory = 1024 partition_dir_expression="${k0}=${v0}" is_partition_field_write_in_file=true file_name_expression="${transactionId}_${now}" file_format_type="excel" filename_time_format="yyyy.MM.dd" is_enable_transaction=true }
对于从上游提取源元数据,可以在路径中使用 ${database_name}
、${table_name}
和 ${schema_name}
。
LocalFile { path = "/tmp/hive/warehouse/${table_name}" file_format_type = "parquet" sink_columns = ["name","age"] }