blob: 5ef70e33298f6bbf59c6898c4469976a3b206fbf [file] [log] [blame] [view]
import ChangeLog from '../changelog/connector-file-s3.md';
# S3File
> S3 File Source Connector
## Support Those Engines
> Spark<br/>
> Flink<br/>
> SeaTunnel Zeta<br/>
## Key Features
- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [x] [multimodal](../../concept/connector-v2-features.md#multimodal)
Use binary file format to read and write files in any format, such as videos, pictures, etc. In short, any files can be synchronized to the target place.
- [x] [exactly-once](../../concept/connector-v2-features.md)
Read all the data in a split in a pollNext call. What splits are read will be saved in snapshot.
- [x] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
- [x] file format type
- [x] text
- [x] csv
- [x] parquet
- [x] orc
- [x] json
- [x] excel
- [x] xml
- [x] binary
- [x] markdown
## Description
Read data from aws s3 file system.
## Supported DataSource Info
| Datasource | Supported versions |
|------------|--------------------|
| S3 | current |
## Dependency
> If you use spark/flink, In order to use this connector, You must ensure your spark/flink cluster already integrated hadoop. The tested hadoop version is 2.x.<br/>
>
> If you use SeaTunnel Zeta, It automatically integrated the hadoop jar when you download and install SeaTunnel Zeta. You can check the jar package under ${SEATUNNEL_HOME}/lib to confirm this.<br/>
> To use this connector you need put hadoop-aws-3.1.4.jar and aws-java-sdk-bundle-1.12.692.jar in ${SEATUNNEL_HOME}/lib dir.
## Data Type Mapping
Data type mapping is related to the type of file being read, We supported as the following file types:
`text` `csv` `parquet` `orc` `json` `excel` `xml`
### JSON File Type
If you assign file type to `json`, you should also assign schema option to tell connector how to parse data to the row you want.
For example:
upstream data is the following:
```json
{"code": 200, "data": "get success", "success": true}
```
You can also save multiple pieces of data in one file and split them by newline:
```json lines
{"code": 200, "data": "get success", "success": true}
{"code": 300, "data": "get failed", "success": false}
```
you should assign schema as the following:
```hocon
schema {
fields {
code = int
data = string
success = boolean
}
}
```
connector will generate data as the following:
| code | data | success |
|------|-------------|---------|
| 200 | get success | true |
### Text Or CSV File Type
If you set the `file_format_type` to `text`,`excel`,`csv`,`xml`. Then it's required to set the `schema` field to tell connector how to parse data to the row.
If you set the `schema` field, you should also set the option `field_delimiter`, except the `file_format_type` is `csv`, `xml`, `excel`
you can set schema and delimiter as the following:
```hocon
field_delimiter = "#"
schema {
fields {
name = string
age = int
gender = string
}
}
```
connector will generate data as the following:
| name | age | gender |
|---------------|-----|--------|
| tyrantlucifer | 26 | male |
### Orc File Type
If you assign file type to `parquet` `orc`, schema option not required, connector can find the schema of upstream data automatically.
| Orc Data type | SeaTunnel Data type |
|----------------------------------|----------------------------------------------------------------|
| BOOLEAN | BOOLEAN |
| INT | INT |
| BYTE | BYTE |
| SHORT | SHORT |
| LONG | LONG |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| BINARY | BINARY |
| STRING<br/>VARCHAR<br/>CHAR<br/> | STRING |
| DATE | LOCAL_DATE_TYPE |
| TIMESTAMP | LOCAL_DATE_TIME_TYPE |
| DECIMAL | DECIMAL |
| LIST(STRING) | STRING_ARRAY_TYPE |
| LIST(BOOLEAN) | BOOLEAN_ARRAY_TYPE |
| LIST(TINYINT) | BYTE_ARRAY_TYPE |
| LIST(SMALLINT) | SHORT_ARRAY_TYPE |
| LIST(INT) | INT_ARRAY_TYPE |
| LIST(BIGINT) | LONG_ARRAY_TYPE |
| LIST(FLOAT) | FLOAT_ARRAY_TYPE |
| LIST(DOUBLE) | DOUBLE_ARRAY_TYPE |
| Map<K,V> | MapType, This type of K and V will transform to SeaTunnel type |
| STRUCT | SeaTunnelRowType |
### Parquet File Type
If you assign file type to `parquet` `orc`, schema option not required, connector can find the schema of upstream data automatically.
| Parquet Data type | SeaTunnel Data type |
|----------------------|----------------------------------------------------------------|
| INT_8 | BYTE |
| INT_16 | SHORT |
| DATE | DATE |
| TIMESTAMP_MILLIS | TIMESTAMP |
| INT64 | LONG |
| INT96 | TIMESTAMP |
| BINARY | BYTES |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| BOOLEAN | BOOLEAN |
| FIXED_LEN_BYTE_ARRAY | TIMESTAMP<br/> DECIMAL |
| DECIMAL | DECIMAL |
| LIST(STRING) | STRING_ARRAY_TYPE |
| LIST(BOOLEAN) | BOOLEAN_ARRAY_TYPE |
| LIST(TINYINT) | BYTE_ARRAY_TYPE |
| LIST(SMALLINT) | SHORT_ARRAY_TYPE |
| LIST(INT) | INT_ARRAY_TYPE |
| LIST(BIGINT) | LONG_ARRAY_TYPE |
| LIST(FLOAT) | FLOAT_ARRAY_TYPE |
| LIST(DOUBLE) | DOUBLE_ARRAY_TYPE |
| Map<K,V> | MapType, This type of K and V will transform to SeaTunnel type |
| STRUCT | SeaTunnelRowType |
## Options
| name | type | required | default value | Description |
|---------------------------------|---------|----------|-------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| path | string | yes | - | The s3 path that needs to be read can have sub paths, but the sub paths need to meet certain format requirements. Specific requirements can be referred to "parse_partition_from_path" option |
| file_format_type | string | yes | - | File type, supported as the following file types: `text` `csv` `parquet` `orc` `json` `excel` `xml` `binary` `markdown` |
| bucket | string | yes | - | The bucket address of s3 file system, for example: `s3n://seatunnel-test`, if you use `s3a` protocol, this parameter should be `s3a://seatunnel-test`. |
| fs.s3a.endpoint | string | yes | - | fs s3a endpoint |
| fs.s3a.aws.credentials.provider | string | yes | com.amazonaws.auth.InstanceProfileCredentialsProvider | The way to authenticate s3a. We only support `org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider` and `com.amazonaws.auth.InstanceProfileCredentialsProvider` now. More information about the credential provider you can see [Hadoop AWS Document](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Simple_name.2Fsecret_credentials_with_SimpleAWSCredentialsProvider.2A) |
| read_columns | list | no | - | The read column list of the data source, user can use it to implement field projection. The file type supported column projection as the following shown: `text` `csv` `parquet` `orc` `json` `excel` `xml` . If the user wants to use this feature when reading `text` `json` `csv` files, the "schema" option must be configured. |
| access_key | string | no | - | Only used when `fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider ` |
| secret_key | string | no | - | Only used when `fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider ` |
| hadoop_s3_properties | map | no | - | If you need to add other option, you could add it here and refer to this [link](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html) |
| delimiter/field_delimiter | string | no | \001 for text and , for csv | Field delimiter, used to tell connector how to slice and dice fields when reading text files. Default `\001`, the same as hive's default delimiter. |
| row_delimiter | string | no | \n | Row delimiter, used to tell connector how to slice and dice rows when reading text files. Default `\n`. |
| parse_partition_from_path | boolean | no | true | Control whether parse the partition keys and values from file path. For example if you read a file from path `s3n://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26`. Every record data from file will be added these two fields: name="tyrantlucifer", age=16 |
| date_format | string | no | yyyy-MM-dd | Date type format, used to tell connector how to convert string to date, supported as the following formats:`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd`. default `yyyy-MM-dd` |
| datetime_format | string | no | yyyy-MM-dd HH:mm:ss | Datetime type format, used to tell connector how to convert string to datetime, supported as the following formats:`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss` |
| time_format | string | no | HH:mm:ss | Time type format, used to tell connector how to convert string to time, supported as the following formats:`HH:mm:ss` `HH:mm:ss.SSS` |
| skip_header_row_number | long | no | 0 | Skip the first few lines, but only for the txt and csv. For example, set like following:`skip_header_row_number = 2`. Then SeaTunnel will skip the first 2 lines from source files |
| csv_use_header_line | boolean | no | false | Whether to use the header line to parse the file, only used when the file_format is `csv` and the file contains the header line that match RFC 4180 |
| schema | config | no | - | The schema of upstream data. |
| sheet_name | string | no | - | Reader the sheet of the workbook,Only used when file_format is excel. |
| xml_row_tag | string | no | - | Specifies the tag name of the data rows within the XML file, only valid for XML files. |
| xml_use_attr_format | boolean | no | - | Specifies whether to process data using the tag attribute format, only valid for XML files. |
| csv_use_header_line | boolean | no | false | Whether to use the header line to parse the file, only used when the file_format is `csv` and the file contains the header line that match RFC 4180 |
| compress_codec | string | no | none | |
| archive_compress_codec | string | no | none | |
| encoding | string | no | UTF-8 | |
| null_format | string | no | - | Only used when file_format_type is text. null_format to define which strings can be represented as null. e.g: `\N` |
| binary_chunk_size | int | no | 1024 | Only used when file_format_type is binary. The chunk size (in bytes) for reading binary files. Default is 1024 bytes. Larger values may improve performance for large files but use more memory. |
| binary_complete_file_mode | boolean | no | false | Only used when file_format_type is binary. Whether to read the complete file as a single chunk instead of splitting into chunks. When enabled, the entire file content will be read into memory at once. Default is false. |
| file_filter_pattern | string | no | | Filter pattern, which used for filtering files. |
| filename_extension | string | no | - | Filter filename extension, which used for filtering files with specific extension. Example: `csv` `.txt` `json` `.xml`. |
| common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. |
### file_format_type [string]
File type, supported as the following file types:
`text` `csv` `parquet` `orc` `json` `excel` `xml` `binary` `markdown`
If you assign file type to `markdown`, SeaTunnel can parse markdown files and extract structured data.
The markdown parser extracts various elements including headings, paragraphs, lists, code blocks, tables, and more.
Each element is converted to a row with the following schema:
- `element_id`: Unique identifier for the element
- `element_type`: Type of the element (Heading, Paragraph, ListItem, etc.)
- `heading_level`: Level of heading (1-6, null for non-heading elements)
- `text`: Text content of the element
- `page_number`: Page number (default: 1)
- `position_index`: Position index within the document
- `parent_id`: ID of the parent element
- `child_ids`: Comma-separated list of child element IDs
Note: Markdown format only supports reading, not writing.
### delimiter/field_delimiter [string]
**delimiter** parameter will deprecate after version 2.3.5, please use **field_delimiter** instead.
### row_delimiter [string]
Only need to be configured when file_format is text
Row delimiter, used to tell connector how to slice and dice rows
default `\n`
### file_filter_pattern [string]
Filter pattern, which used for filtering files.
The pattern follows standard regular expressions. For details, please refer to https://en.wikipedia.org/wiki/Regular_expression.
There are some examples.
File Structure Example:
```
/data/seatunnel/20241001/report.txt
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
/data/seatunnel/20241005/old_data.csv
/data/seatunnel/20241012/logo.png
```
Matching Rules Example:
**Example 1**: *Match all .txt files*,Regular Expression:
```
/data/seatunnel/20241001/.*\.txt
```
The result of this example matching is:
```
/data/seatunnel/20241001/report.txt
```
**Example 2**: *Match all file starting with abc*,Regular Expression:
```
/data/seatunnel/20241002/abc.*
```
The result of this example matching is:
```
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
```
**Example 3**: *Match all file starting with abc,And the fourth character is either h or g*, the Regular Expression:
```
/data/seatunnel/20241007/abc[h,g].*
```
The result of this example matching is:
```
/data/seatunnel/20241007/abch202410.csv
```
**Example 4**: *Match third level folders starting with 202410 and files ending with .csv*, the Regular Expression:
```
/data/seatunnel/202410\d*/.*\.csv
```
The result of this example matching is:
```
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
/data/seatunnel/20241005/old_data.csv
```
### compress_codec [string]
The compress codec of files and the details that supported as the following shown:
- txt: `lzo` `none`
- json: `lzo` `none`
- csv: `lzo` `none`
- orc/parquet:
automatically recognizes the compression type, no additional settings required.
### archive_compress_codec [string]
The compress codec of archive files and the details that supported as the following shown:
| archive_compress_codec | file_format | archive_compress_suffix |
|------------------------|------------|-------------------------|
| ZIP | txt,json,excel,xml | .zip |
| TAR | txt,json,excel,xml | .tar |
| TAR_GZ | txt,json,excel,xml | .tar.gz |
| GZ | txt,json,excel,xml | .gz |
| NONE | all | .* |
Note: gz compressed excel file needs to compress the original file or specify the file suffix, such as e2e.xls ->e2e_test.xls.gz
### encoding [string]
Only used when file_format_type is json,text,csv,xml.
The encoding of the file to read. This param will be parsed by `Charset.forName(encoding)`.
### binary_chunk_size [int]
Only used when file_format_type is binary.
The chunk size (in bytes) for reading binary files. Default is 1024 bytes. Larger values may improve performance for large files but use more memory.
### binary_complete_file_mode [boolean]
Only used when file_format_type is binary.
Whether to read the complete file as a single chunk instead of splitting into chunks. When enabled, the entire file content will be read into memory at once. Default is false.
## Example
1. In this example, We read data from s3 path `s3a://seatunnel-test/seatunnel/text` and the file type is orc in this path.
We use `org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider` to authentication so `access_key` and `secret_key` is required.
All columns in the file will be read and send to sink.
```
# Defining the runtime environment
env {
parallelism = 1
job.mode = "BATCH"
}
source {
S3File {
path = "/seatunnel/text"
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
access_key = "xxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxx"
bucket = "s3a://seatunnel-test"
file_format_type = "orc"
}
}
transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform-v2
}
sink {
Console {}
}
```
2. Use `InstanceProfileCredentialsProvider` to authentication
The file type in S3 is json, so need config schema option.
```hocon
S3File {
path = "/seatunnel/json"
bucket = "s3a://seatunnel-test"
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
file_format_type = "json"
schema {
fields {
id = int
name = string
}
}
}
```
3. Use `InstanceProfileCredentialsProvider` to authentication
The file type in S3 is json and has five fields (`id`, `name`, `age`, `sex`, `type`), so need config schema option.
In this job, we only need send `id` and `name` column to mysql.
```
# Defining the runtime environment
env {
parallelism = 1
job.mode = "BATCH"
}
source {
S3File {
path = "/seatunnel/json"
bucket = "s3a://seatunnel-test"
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
file_format_type = "json"
read_columns = ["id", "name"]
schema {
fields {
id = int
name = string
age = int
sex = int
type = string
}
}
}
}
transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform-v2
}
sink {
Console {}
}
```
### Filter File
```hocon
env {
parallelism = 1
job.mode = "BATCH"
}
source {
S3File {
path = "/seatunnel/json"
bucket = "s3a://seatunnel-test"
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
file_format_type = "json"
read_columns = ["id", "name"]
// file example abcD2024.csv
file_filter_pattern = "abc[DX]*.*"
}
}
sink {
Console {
}
}
```
## Changelog
<ChangeLog />