blob: 7bc92a564c50f3b45ceacb87a075419ad77a56da [file] [log] [blame] [view]
import ChangeLog from '../changelog/connector-file-local.md';
# LocalFile
> 本地文件接收器
## 描述
将数据输出到本地文件。
:::tip 提示
如果你使用的是 spark/flink,为了使用此连接器,你必须确保你的 spark/flink 集群已集成 hadoop。已测试的 hadoop 版本是 2.x
如果你使用 SeaTunnel Engine,它会在下载和安装 SeaTunnel Engine 时自动集成 hadoop jar。你可以在 ${SEATUNNEL_HOME}/lib 下检查 jar 包以确认这一点。
:::
## 主要特性
- [x] [多模态](../../concept/connector-v2-features.md#多模态multimodal)
使用二进制文件格式读取和写入任何格式的文件,例如视频、图片等。简而言之,任何文件都可以同步到目标位置。
- [x] [精确一次](../../concept/connector-v2-features.md)
默认情况下,我们使用 2PC 提交以确保`精确一次`
- [x] 文件格式类型
- [x] 文本
- [x] csv
- [x] parquet
- [x] orc
- [x] json
- [x] excel
- [x] xml
- [x] 二进制
- [x] canal_json
- [x] debezium_json
- [x] maxwell_json
## 选项
| 名称 | 类型 | 是否必需 | 默认值 | 描述 |
|---------------------------------------|---------|------|--------------------------------------------|-----------------------------------------------------------------|
| path | string | | - | 目标目录路径 |
| tmp_path | string | | /tmp/seatunnel | 结果文件将首先写入临时路径,然后使用 `mv` 将临时目录提交到目标目录。 |
| custom_filename | boolean | | false | 是否需要自定义文件名 |
| file_name_expression | string | | "${transactionId}" | 仅在 custom_filename true 时使用 |
| filename_time_format | string | | "yyyy.MM.dd" | 仅在 custom_filename true 时使用 |
| file_format_type | string | | "csv" | 文件格式类型 |
| filename_extension | string | | - | 使用自定义的文件扩展名覆盖默认的文件扩展名。 例如:`.xml`, `.json`, `dat`, `.customtype` |
| field_delimiter | string | | '\001' | 仅在 file_format_type text 时使用 |
| row_delimiter | string | | "\n" | 仅在 file_format_type `text``csv``json` 时使用 |
| have_partition | boolean | | false | 是否需要处理分区 |
| partition_by | array | | - | 仅在 have_partition true 时使用 |
| partition_dir_expression | string | | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | 仅在 have_partition true 时使用 |
| is_partition_field_write_in_file | boolean | | false | 仅在 have_partition true 时使用 |
| sink_columns | array | | | 当此参数为空时,所有字段都是 sink |
| is_enable_transaction | boolean | | true | 是否启用事务 |
| batch_size | int | | 1000000 | 批量大小 |
| single_file_mode | boolean | | false | 每个并行度只会输出一个文件,当此参数开启时,batch_size就不会生效。输出的文件名没有文件块后缀。 |
| create_empty_file_when_no_data | boolean | | false | 当上游没有数据同步时,依然生成对应的数据文件。 |
| compress_codec | string | | none | 压缩编码 |
| common-options | object | | - | 常见选项 |
| max_rows_in_memory | int | | - | 仅在 file_format_type excel 时使用 |
| sheet_name | string | | Sheet${随机数} | 仅在 file_format_type excel 时使用 |
| csv_string_quote_mode | enum | | MINIMAL | 仅在文件格式为 CSV 时使用。 |
| xml_root_tag | string | | RECORDS | 仅在 file_format xml 时使用 |
| xml_row_tag | string | | RECORD | 仅在 file_format xml 时使用 |
| xml_use_attr_format | boolean | | - | 仅在 file_format xml 时使用 |
| parquet_avro_write_timestamp_as_int96 | boolean | | false | 仅在 file_format parquet 时使用 |
| parquet_avro_write_fixed_as_int96 | array | | - | 仅在 file_format parquet 时使用 |
| enable_header_write | boolean | | false | 仅在 file_format_type text,csv 时使用。<br/> false:不写入表头,true:写入表头。 |
| encoding | string | | "UTF-8" | 仅在 file_format_type json,text,csv,xml 时使用 |
| merge_update_event | boolean | | false | 仅当file_format_typecanal_jsondebezium_jsonmaxwell_json. |
### path [string]
目标目录路径是必需的,你可以通过使用 `${database_name}``${table_name}` `${schema_name}` 将上游的 CatalogTable 注入到路径中。
### custom_filename [boolean]
是否自定义文件名
### file_name_expression [string]
仅在 `custom_filename` `true` 时使用
`file_name_expression` 描述将创建到 `path` 中的文件表达式。我们可以在 `file_name_expression` 中添加变量 `${now}` `${uuid}`,例如 `test_${uuid}_${now}``${now}` 表示当前时间,其格式可以通过指定 `filename_time_format` 选项来定义。
请注意,如果 `is_enable_transaction` `true`,我们将自动在文件名的头部添加 `${transactionId}_`
### filename_time_format [string]
仅在 `custom_filename` `true` 时使用
`file_name_expression` 参数中的格式为 `xxxx-${now}` 时,`filename_time_format` 可以指定路径的时间格式,默认值为 `yyyy.MM.dd`。常用的时间格式如下所示:
| 符号 | 描述 |
|----|-----------|
| y | |
| M | |
| d | |
| H | 小时 (0-23) |
| m | 分钟 |
| s | |
### file_format_type [string]
我们支持以下文件类型:
`text` `csv` `parquet` `orc` `json` `excel` `xml` `binary` `canal_json` `debezium_json` `maxwell_json`
请注意,最终的文件名将以 file_format_type 的后缀结尾,文本文件的后缀是 `txt`
### field_delimiter [string]
数据行中列之间的分隔符。仅在 `text` 文件格式下需要。
### row_delimiter [string]
文件中行之间的分隔符。仅在 `text``csv``json` 文件格式下需要。
### have_partition [boolean]
是否需要处理分区。
### partition_by [array]
仅在 `have_partition` `true` 时使用。
基于选定字段进行数据分区。
### partition_dir_expression [string]
仅在 `have_partition` `true` 时使用。
如果指定了 `partition_by`,我们将基于分区信息生成相应的分区目录,最终文件将放置在分区目录中。
默认的 `partition_dir_expression` `${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/``k0` 是第一个分区字段,`v0` 是第一个分区字段的值。
### is_partition_field_write_in_file [boolean]
仅在 `have_partition` `true` 时使用。
如果 `is_partition_field_write_in_file` `true`,分区字段及其值将写入数据文件。
例如,如果你想写入一个 Hive 数据文件,其值应该为 `false`
### sink_columns [array]
需要写入文件的列,默认值为从 `Transform` `Source` 获取的所有列。字段的顺序决定了实际写入文件的顺序。
### is_enable_transaction [boolean]
如果 `is_enable_transaction` true,我们将确保数据在写入目标目录时不会丢失或重复。
请注意,如果 `is_enable_transaction` true,我们将自动在文件名前添加 `${transactionId}_`
目前仅支持 `true`
### batch_size [int]
文件中的最大行数。对于 SeaTunnel Engine,文件中的行数由 `batch_size` `checkpoint.interval` 共同决定。如果 `checkpoint.interval` 的值足够大,sink writer 将在文件中的行数超过 `batch_size` 时写入文件。如果 `checkpoint.interval` 很小,当触发新检查点时,sink writer 将创建一个新文件。
### compress_codec [string]
文件的压缩编码,支持的压缩编码如下所示:
- txt: `lzo` `none`
- json: `lzo` `none`
- csv: `lzo` `none`
- orc: `lzo` `snappy` `lz4` `zlib` `none`
- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none`
提示:excel 类型不支持任何压缩格式
### 常见选项
Sink 插件的常见参数,请参阅 [Sink 常见选项](../sink-common-options.md) 获取详细信息。
### max_rows_in_memory [int]
当文件格式为 Excel 时,内存中可以缓存的数据项最大数量。
### sheet_name [string]
工作簿的表名。
### csv_string_quote_mode [string]
当文件格式为 CSV 时,CSV 的字符串引号模式。
- ALL:所有字符串字段都会加引号。
- MINIMAL:仅为包含特殊字符(如字段分隔符、引号字符或行分隔符字符串中的任何字符)的字段加引号。
- NONE:从不为字段加引号。当数据中包含分隔符时,输出会在前面加上转义字符。如果未设置转义字符,则格式验证会抛出异常。
### xml
_root_tag [string]
指定 XML 文件中根元素的标签名。
### xml_row_tag [string]
指定 XML 文件中数据行的标签名。
### xml_use_attr_format [boolean]
指定是否使用标签属性格式处理数据。
### parquet_avro_write_timestamp_as_int96 [boolean]
支持从时间戳写入 Parquet INT96,仅对 parquet 文件有效。
### parquet_avro_write_fixed_as_int96 [array]
支持从 12 字节字段写入 Parquet INT96,仅对 parquet 文件有效。
### enable_header_write [boolean]
仅在 file_format_type text,csv 时使用。false:不写入表头,true:写入表头。
### encoding [string]
仅在 file_format_type json,text,csv,xml 时使用。文件写入的编码。该参数将通过 `Charset.forName(encoding)` 解析。
### merge_update_event [boolean]
仅当file_format_typecanal_jsondebezium_jsonmaxwell_json时使用.
设置成true,序列化数据时,UPDATE_AFTER UPDATE_BEFORE 会合并成 UPDATE;
设置成false,序列化数据时,UPDATE_AFTER UPDATE_BEFORE 不会合并;
## 示例
对于 orc 文件格式的简单配置
```bash
LocalFile {
path = "/tmp/hive/warehouse/test2"
file_format_type = "orc"
}
```
对于带有 `encoding` jsontextcsv xml 文件格式
```hocon
LocalFile {
path = "/tmp/hive/warehouse/test2"
file_format_type = "text"
encoding = "gbk"
}
```
对于带有 `sink_columns` parquet 文件格式
```bash
LocalFile {
path = "/tmp/hive/warehouse/test2"
file_format_type = "parquet"
sink_columns = ["name","age"]
}
```
对于带有 `have_partition``custom_filename` `sink_columns` text 文件格式
```bash
LocalFile {
path = "/tmp/hive/warehouse/test2"
file_format_type = "text"
field_delimiter = "\t"
row_delimiter = "\n"
have_partition = true
partition_by = ["age"]
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = true
custom_filename = true
file_name_expression = "${transactionId}_${now}"
filename_time_format = "yyyy.MM.dd"
sink_columns = ["name","age"]
is_enable_transaction = true
}
```
对于带有 `sheet_name` `max_rows_in_memory` excel 文件格式
```bash
LocalFile {
path="/tmp/seatunnel/excel"
sheet_name = "Sheet1"
max_rows_in_memory = 1024
partition_dir_expression="${k0}=${v0}"
is_partition_field_write_in_file=true
file_name_expression="${transactionId}_${now}"
file_format_type="excel"
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
}
```
对于从上游提取源元数据,可以在路径中使用 `${database_name}``${table_name}` `${schema_name}`
```bash
LocalFile {
path = "/tmp/hive/warehouse/${table_name}"
file_format_type = "parquet"
sink_columns = ["name","age"]
}
```
## 变更日志
<ChangeLog />