blob: a7fa4055860f022ef006ddf64f70a5c91ddb52dc [file] [log] [blame] [view]
import ChangeLog from '../changelog/connector-file-oss.md';
# OssFile
> Oss文件数据源连接器
## 支持的引擎
> Spark<br/>
> Flink<br/>
> SeaTunnel Zeta<br/>
## 使用依赖
### 对于Spark/Flink引擎
1. 您必须确保您的spark/flink集群已经集成了hadoop。测试过的hadoop版本是2.x。
2. 您必须确保`hadoop-aliyun-xx.jar`、`aliyun-sdk-oss-xx.jar`和`jdom-xx.jar`在`${SEATUNNEL_HOME}/plugins/`目录中,并且`hadoop-aliyun` jar的版本需要与您在spark/flink中使用的hadoop版本相等,`aliyun-sdk-oss-xx.jar`和`jdom-xx.jar`版本需要是与`hadoop-aliyun`版本对应的版本。例如:`hadoop-aliyun-3.1.4.jar`依赖`aliyun-sdk-oss-3.4.1.jar`和`jdom-1.1.jar`。
### 对于SeaTunnel Zeta引擎
1. 您必须确保`seatunnel-hadoop3-3.1.4-uber.jar`、`aliyun-sdk-oss-3.4.1.jar`、`hadoop-aliyun-3.1.4.jar`和`jdom-1.1.jar`在`${SEATUNNEL_HOME}/lib/`目录中。
## 主要特性
- [x] [多模态](../../concept/connector-v2-features.md#多模态multimodal)
使用二进制文件格式读取和写入任何格式的文件,例如视频、图片等。简而言之,任何文件都可以同步到目标位置。
- [x] [批处理](../../concept/connector-v2-features.md)
- [ ] [流处理](../../concept/connector-v2-features.md)
- [x] [精确一次](../../concept/connector-v2-features.md)
在一次pollNext调用中读取分片中的所有数据。将读取的分片保存在快照中。
- [x] [列投影](../../concept/connector-v2-features.md)
- [x] [并行度](../../concept/connector-v2-features.md)
- [ ] [支持用户定义的分片](../../concept/connector-v2-features.md)
- [x] 文件格式类型
- [x] text
- [x] csv
- [x] parquet
- [x] orc
- [x] json
- [x] excel
- [x] xml
- [x] binary
- [x] markdown
## 数据类型映射
数据类型映射与正在读取的文件类型相关,我们支持以下文件类型:
`text` `csv` `parquet` `orc` `json` `excel` `xml` `markdown`
### JSON文件类型
如果您将文件类型指定为`json`,您还应该指定schema选项来告诉连接器如何将数据解析为您想要的行。
例如:
上游数据如下:
```json
{"code": 200, "data": "get success", "success": true}
```
您也可以在一个文件中保存多条数据,并用换行符分隔:
```json lines
{"code": 200, "data": "get success", "success": true}
{"code": 300, "data": "get failed", "success": false}
```
您应该按如下方式指定schema:
```hocon
schema {
fields {
code = int
data = string
success = boolean
}
}
```
连接器将生成如下数据:
| code | data | success |
|------|-------------|---------|
| 200 | get success | true |
### 文本或CSV文件类型
如果您将`file_format_type`设置为`text`、`excel`、`csv`、`xml`。那么需要设置`schema`字段来告诉连接器如何将数据解析为行。
如果您设置了`schema`字段,您还应该设置选项`field_delimiter`,除非`file_format_type`是`csv`、`xml`、`excel`
您可以按如下方式设置schema和分隔符:
```hocon
field_delimiter = "#"
schema {
fields {
name = string
age = int
gender = string
}
}
```
连接器将生成如下数据:
| name | age | gender |
|---------------|-----|--------|
| tyrantlucifer | 26 | male |
### Orc文件类型
如果您将文件类型指定为`parquet` `orc`,则不需要schema选项,连接器可以自动找到上游数据的schema。
| Orc数据类型 | SeaTunnel数据类型 |
|----------------------------------|----------------------------------------------------------------|
| BOOLEAN | BOOLEAN |
| INT | INT |
| BYTE | BYTE |
| SHORT | SHORT |
| LONG | LONG |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| BINARY | BINARY |
| STRING<br/>VARCHAR<br/>CHAR<br/> | STRING |
| DATE | LOCAL_DATE_TYPE |
| TIMESTAMP | LOCAL_DATE_TIME_TYPE |
| DECIMAL | DECIMAL |
| LIST(STRING) | STRING_ARRAY_TYPE |
| LIST(BOOLEAN) | BOOLEAN_ARRAY_TYPE |
| LIST(TINYINT) | BYTE_ARRAY_TYPE |
| LIST(SMALLINT) | SHORT_ARRAY_TYPE |
| LIST(INT) | INT_ARRAY_TYPE |
| LIST(BIGINT) | LONG_ARRAY_TYPE |
| LIST(FLOAT) | FLOAT_ARRAY_TYPE |
| LIST(DOUBLE) | DOUBLE_ARRAY_TYPE |
| Map<K,V> | MapType,K和V的类型将转换为SeaTunnel类型 |
| STRUCT | SeaTunnelRowType |
### Parquet文件类型
如果您将文件类型指定为`parquet` `orc`,则不需要schema选项,连接器可以自动找到上游数据的schema。
| Parquet数据类型 | SeaTunnel数据类型 |
|----------------------|----------------------------------------------------------------|
| INT_8 | BYTE |
| INT_16 | SHORT |
| DATE | DATE |
| TIMESTAMP_MILLIS | TIMESTAMP |
| INT64 | LONG |
| INT96 | TIMESTAMP |
| BINARY | BYTES |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| BOOLEAN | BOOLEAN |
| FIXED_LEN_BYTE_ARRAY | TIMESTAMP<br/> DECIMAL |
| DECIMAL | DECIMAL |
| LIST(STRING) | STRING_ARRAY_TYPE |
| LIST(BOOLEAN) | BOOLEAN_ARRAY_TYPE |
| LIST(TINYINT) | BYTE_ARRAY_TYPE |
| LIST(SMALLINT) | SHORT_ARRAY_TYPE |
| LIST(INT) | INT_ARRAY_TYPE |
| LIST(BIGINT) | LONG_ARRAY_TYPE |
| LIST(FLOAT) | FLOAT_ARRAY_TYPE |
| LIST(DOUBLE) | DOUBLE_ARRAY_TYPE |
| Map<K,V> | MapType,K和V的类型将转换为SeaTunnel类型 |
| STRUCT | SeaTunnelRowType |
## 选项
| 名称 | 类型 | 是否必需 | 默认值 | 描述 |
|----------------------------|---------|------|---------------------|------------------------------------------------------------------------------------------------------------------------------------------------------|
| path | string | 是 | - | 需要读取的Oss路径,可以有子路径,但子路径需要满足一定的格式要求。具体要求可以参考"parse_partition_from_path"选项 |
| file_format_type | string | 是 | - | 文件类型,支持以下文件类型:`text` `csv` `parquet` `orc` `json` `excel` `xml` `binary` `markdown` |
| bucket | string | 是 | - | oss文件系统的bucket地址,例如:`oss://seatunnel-test`。 |
| endpoint | string | 是 | - | fs oss端点 |
| read_columns | list | 否 | - | 数据源的读取列列表,用户可以使用它来实现字段投影。支持列投影的文件类型如下所示:`text` `csv` `parquet` `orc` `json` `excel` `xml`。如果用户想在读取`text` `json` `csv`文件时使用此功能,必须配置"schema"选项。 |
| access_key | string | 否 | - | |
| access_secret | string | 否 | - | |
| delimiter | string | 否 | \001 | 字段分隔符,用于告诉连接器在读取文本文件时如何切分字段。默认`\001`,与hive的默认分隔符相同。 |
| row_delimiter | string | 否 | \n | 行分隔符,用于告诉连接器在读取文本文件时如何切分行。默认`\n`。 |
| parse_partition_from_path | boolean | 否 | true | 控制是否从文件路径解析分区键和值。例如,如果您从路径`oss://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26`读取文件。文件中的每条记录数据都将添加这两个字段:name="tyrantlucifer",age=16 |
| date_format | string | 否 | yyyy-MM-dd | 日期类型格式,用于告诉连接器如何将字符串转换为日期,支持以下格式:`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd`。默认`yyyy-MM-dd` |
| datetime_format | string | 否 | yyyy-MM-dd HH:mm:ss | 日期时间类型格式,用于告诉连接器如何将字符串转换为日期时间,支持以下格式:`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss` |
| time_format | string | 否 | HH:mm:ss | 时间类型格式,用于告诉连接器如何将字符串转换为时间,支持以下格式:`HH:mm:ss` `HH:mm:ss.SSS` |
| filename_extension | string | 否 | - | 过滤文件名扩展名,用于过滤具有特定扩展名的文件。例如:`csv` `.txt` `json` `.xml`。 |
| skip_header_row_number | long | 否 | 0 | 跳过前几行,但仅适用于txt和csv。例如,设置如下:`skip_header_row_number = 2`。然后SeaTunnel将跳过源文件的前2行 |
| csv_use_header_line | boolean | 否 | false | 是否使用标题行来解析文件,仅在file_format为`csv`且文件包含符合RFC 4180的标题行时使用 |
| schema | config | 否 | - | 上游数据的schema。 |
| sheet_name | string | 否 | - | 读取工作簿的工作表,仅在file_format为excel时使用。 |
| xml_row_tag | string | 否 | - | 指定XML文件中数据行的标签名称,仅在file_format为xml时使用。 |
| xml_use_attr_format | boolean | 否 | - | 指定是否使用标签属性格式处理数据,仅在file_format为xml时使用。 |
| compress_codec | string | 否 | none | 文件使用的压缩编解码器。 |
| encoding | string | 否 | UTF-8 |
| null_format | string | 否 | - | 仅在file_format_type为text时使用。null_format用于定义哪些字符串可以表示为null。例如:`\N` |
| binary_chunk_size | int | 否 | 1024 | 仅在file_format_type为binary时使用。读取二进制文件的块大小(以字节为单位)。默认为1024字节。较大的值可能会提高大文件的性能,但会使用更多内存。 |
| binary_complete_file_mode | boolean | 否 | false | 仅在file_format_type为binary时使用。是否将完整文件作为单个块读取,而不是分割成块。启用时,整个文件内容将一次性读入内存。默认为false。 |
| file_filter_pattern | string | 否 | | 过滤模式,用于过滤文件。 |
| common-options | config | 否 | - | 数据源插件通用参数,请参考[数据源通用选项](../source-common-options.md)了解详情。 |
| file_filter_modified_start | string | 否 | - | 按照最后修改时间过滤文件。 要过滤的开始时间(包括改时间),时间格式是:`yyyy-MM-dd HH:mm:ss` |
| file_filter_modified_end | string | 否 | - | 按照最后修改时间过滤文件。 要过滤的结束时间(不包括改时间),时间格式是:`yyyy-MM-dd HH:mm:ss` |
### compress_codec [string]
文件的压缩编解码器,支持的详细信息如下所示:
- txt: `lzo` `none`
- json: `lzo` `none`
- csv: `lzo` `none`
- orc/parquet:
自动识别压缩类型,无需额外设置。
### encoding [string]
仅在file_format_type为json、text、csv、xml时使用。
要读取的文件的编码。此参数将由`Charset.forName(encoding)`解析。
### binary_chunk_size [int]
仅在file_format_type为binary时使用。
读取二进制文件的块大小(以字节为单位)。默认为1024字节。较大的值可能会提高大文件的性能,但会使用更多内存。
### binary_complete_file_mode [boolean]
仅在file_format_type为binary时使用。
是否将完整文件作为单个块读取,而不是分割成块。启用时,整个文件内容将一次性读入内存。默认为false。
### file_format_type [string]
文件类型,支持以下文件类型:
`text` `csv` `parquet` `orc` `json` `excel` `xml` `binary` `markdown`
如果您将文件类型指定为 `markdown`,SeaTunnel 可以解析 markdown 文件并提取结构化数据。
markdown 解析器提取各种元素,包括标题、段落、列表、代码块、表格等。
每个元素都转换为具有以下架构的行:
- `element_id`:元素的唯一标识符
- `element_type`:元素类型(Heading、Paragraph、ListItem 等)
- `heading_level`:标题级别(1-6,非标题元素为 null)
- `text`:元素的文本内容
- `page_number`:页码(默认:1)
- `position_index`:文档中的位置索引
- `parent_id`:父元素的 ID
- `child_ids`:子元素 ID 的逗号分隔列表
注意:Markdown 格式仅支持读取,不支持写入。
### file_filter_pattern [string]
过滤模式,用于过滤文件。
该模式遵循标准正则表达式。详情请参考 https://en.wikipedia.org/wiki/Regular_expression。
以下是一些示例。
文件结构示例:
```
/data/seatunnel/20241001/report.txt
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
/data/seatunnel/20241005/old_data.csv
/data/seatunnel/20241012/logo.png
```
匹配规则示例:
**示例1**:*匹配所有.txt文件*,正则表达式:
```
/data/seatunnel/20241001/.*\.txt
```
此示例匹配的结果是:
```
/data/seatunnel/20241001/report.txt
```
**示例2**:*匹配所有以abc开头的文件*,正则表达式:
```
/data/seatunnel/20241002/abc.*
```
此示例匹配的结果是:
```
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
```
**示例3**:*匹配所有以abc开头,且第四个字符是h或g的文件*,正则表达式:
```
/data/seatunnel/20241007/abc[h,g].*
```
此示例匹配的结果是:
```
/data/seatunnel/20241007/abch202410.csv
```
**示例4**:*匹配以202410开头的第三级文件夹和以.csv结尾的文件*,正则表达式:
```
/data/seatunnel/202410\d*/.*\.csv
```
此示例匹配的结果是:
```
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
/data/seatunnel/20241005/old_data.csv
```
### schema [config]
仅在file_format_type为text、json、excel、xml或csv时需要配置(或其他我们无法从元数据读取schema的格式)。
#### fields [Config]
上游数据的schema。
## 如何创建Oss数据同步作业
以下示例演示如何创建从Oss读取数据并在本地客户端打印的数据同步作业:
```bash
# 设置要执行的任务的基本配置
env {
parallelism = 1
job.mode = "BATCH"
}
# 创建连接到Oss的数据源
source {
OssFile {
path = "/seatunnel/orc"
bucket = "oss://tyrantlucifer-image-bed"
access_key = "xxxxxxxxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
file_format_type = "orc"
}
}
# 控制台打印读取的Oss数据
sink {
Console {
}
}
```
```bash
# 设置要执行的任务的基本配置
env {
parallelism = 1
job.mode = "BATCH"
}
# 创建连接到Oss的数据源
source {
OssFile {
path = "/seatunnel/json"
bucket = "oss://tyrantlucifer-image-bed"
access_key = "xxxxxxxxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
file_format_type = "json"
schema {
fields {
id = int
name = string
}
}
}
}
# 控制台打印读取的Oss数据
sink {
Console {
}
}
```
### 多表
无需配置schema文件类型,例如:`orc`。
```
env {
parallelism = 1
spark.app.name = "SeaTunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
job.mode = "BATCH"
}
source {
OssFile {
tables_configs = [
{
schema = {
table = "fake01"
}
bucket = "oss://whale-ops"
access_key = "xxxxxxxxxxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxxxx"
endpoint = "https://oss-accelerate.aliyuncs.com"
path = "/test/seatunnel/read/orc"
file_format_type = "orc"
},
{
schema = {
table = "fake02"
}
bucket = "oss://whale-ops"
access_key = "xxxxxxxxxxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxxxx"
endpoint = "https://oss-accelerate.aliyuncs.com"
path = "/test/seatunnel/read/orc"
file_format_type = "orc"
}
]
plugin_output = "fake"
}
}
sink {
Assert {
rules {
table-names = ["fake01", "fake02"]
}
}
}
```
需要配置schema文件类型,例如:`json`
```
env {
execution.parallelism = 1
spark.app.name = "SeaTunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
job.mode = "BATCH"
}
source {
OssFile {
tables_configs = [
{
bucket = "oss://whale-ops"
access_key = "xxxxxxxxxxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxxxx"
endpoint = "https://oss-accelerate.aliyuncs.com"
path = "/test/seatunnel/read/json"
file_format_type = "json"
schema = {
table = "fake01"
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
c_row = {
C_MAP = "map<string, string>"
C_ARRAY = "array<int>"
C_STRING = string
C_BOOLEAN = boolean
C_TINYINT = tinyint
C_SMALLINT = smallint
C_INT = int
C_BIGINT = bigint
C_FLOAT = float
C_DOUBLE = double
C_BYTES = bytes
C_DATE = date
C_DECIMAL = "decimal(38, 18)"
C_TIMESTAMP = timestamp
}
}
}
},
{
bucket = "oss://whale-ops"
access_key = "xxxxxxxxxxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxxxx"
endpoint = "https://oss-accelerate.aliyuncs.com"
path = "/test/seatunnel/read/json"
file_format_type = "json"
schema = {
table = "fake02"
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
c_row = {
C_MAP = "map<string, string>"
C_ARRAY = "array<int>"
C_STRING = string
C_BOOLEAN = boolean
C_TINYINT = tinyint
C_SMALLINT = smallint
C_INT = int
C_BIGINT = bigint
C_FLOAT = float
C_DOUBLE = double
C_BYTES = bytes
C_DATE = date
C_DECIMAL = "decimal(38, 18)"
C_TIMESTAMP = timestamp
}
}
}
}
]
plugin_output = "fake"
}
}
sink {
Assert {
rules {
table-names = ["fake01", "fake02"]
}
}
}
```
### 过滤文件
```hocon
env {
parallelism = 1
job.mode = "BATCH"
}
source {
OssFile {
path = "/seatunnel/orc"
bucket = "oss://tyrantlucifer-image-bed"
access_key = "xxxxxxxxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
file_format_type = "orc"
// 文件示例 abcD2024.csv
file_filter_pattern = "abc[DX]*.*"
// 筛选最后修改日期在 20240101 和 20240105 (不包括该日期) 之间的文件
file_filter_modified_start = "2024-01-01 00:00:00"
file_filter_modified_end = "2024-01-05 00:00:00"
}
}
sink {
Console {
}
}
```
## 变更日志
<ChangeLog />