| import ChangeLog from '../changelog/connector-file-oss.md'; |
| |
| # OssFile |
| |
| > Oss file source connector |
| |
| ## Support Those Engines |
| |
| > Spark<br/> |
| > Flink<br/> |
| > SeaTunnel Zeta<br/> |
| |
| ## Usage Dependency |
| |
| ### For Spark/Flink Engine |
| |
| 1. You must ensure your spark/flink cluster already integrated hadoop. The tested hadoop version is 2.x. |
| 2. You must ensure `hadoop-aliyun-xx.jar`, `aliyun-sdk-oss-xx.jar` and `jdom-xx.jar` in `${SEATUNNEL_HOME}/plugins/` dir and the version of `hadoop-aliyun` jar need equals your hadoop version which used in spark/flink and `aliyun-sdk-oss-xx.jar` and `jdom-xx.jar` version needs to be the version corresponding to the `hadoop-aliyun` version. Eg: `hadoop-aliyun-3.1.4.jar` dependency `aliyun-sdk-oss-3.4.1.jar` and `jdom-1.1.jar`. |
| |
| ### For SeaTunnel Zeta Engine |
| |
| 1. You must ensure `seatunnel-hadoop3-3.1.4-uber.jar`, `aliyun-sdk-oss-3.4.1.jar`, `hadoop-aliyun-3.1.4.jar` and `jdom-1.1.jar` in `${SEATUNNEL_HOME}/lib/` dir. |
| |
| ## Key features |
| |
| - [x] [batch](../../concept/connector-v2-features.md) |
| - [ ] [stream](../../concept/connector-v2-features.md) |
| - [x] [multimodal](../../concept/connector-v2-features.md#multimodal) |
| |
| Use binary file format to read and write files in any format, such as videos, pictures, etc. In short, any files can be synchronized to the target place. |
| |
| - [x] [exactly-once](../../concept/connector-v2-features.md) |
| |
| Read all the data in a split in a pollNext call. What splits are read will be saved in snapshot. |
| |
| - [x] [column projection](../../concept/connector-v2-features.md) |
| - [x] [parallelism](../../concept/connector-v2-features.md) |
| - [ ] [support user-defined split](../../concept/connector-v2-features.md) |
| - [x] file format type |
| - [x] text |
| - [x] csv |
| - [x] parquet |
| - [x] orc |
| - [x] json |
| - [x] excel |
| - [x] xml |
| - [x] binary |
| - [x] markdown |
| |
| ## Data Type Mapping |
| |
| Data type mapping is related to the type of file being read, We supported as the following file types: |
| |
| `text` `csv` `parquet` `orc` `json` `excel` `xml` `markdown` |
| |
| ### JSON File Type |
| |
| If you assign file type to `json`, you should also assign schema option to tell connector how to parse data to the row you want. |
| |
| For example: |
| |
| upstream data is the following: |
| |
| ```json |
| |
| {"code": 200, "data": "get success", "success": true} |
| |
| ``` |
| |
| You can also save multiple pieces of data in one file and split them by newline: |
| |
| ```json lines |
| |
| {"code": 200, "data": "get success", "success": true} |
| {"code": 300, "data": "get failed", "success": false} |
| |
| ``` |
| |
| you should assign schema as the following: |
| |
| ```hocon |
| |
| schema { |
| fields { |
| code = int |
| data = string |
| success = boolean |
| } |
| } |
| |
| ``` |
| |
| connector will generate data as the following: |
| |
| | code | data | success | |
| |------|-------------|---------| |
| | 200 | get success | true | |
| |
| ### Text Or CSV File Type |
| |
| If you set the `file_format_type` to `text`,`excel`,`csv`,`xml`. Then it's required to set the `schema` field to tell connector how to parse data to the row. |
| |
| If you set the `schema` field, you should also set the option `field_delimiter`, except the `file_format_type` is `csv`, `xml`, `excel` |
| |
| you can set schema and delimiter as the following: |
| |
| ```hocon |
| |
| field_delimiter = "#" |
| schema { |
| fields { |
| name = string |
| age = int |
| gender = string |
| } |
| } |
| |
| ``` |
| |
| connector will generate data as the following: |
| |
| | name | age | gender | |
| |---------------|-----|--------| |
| | tyrantlucifer | 26 | male | |
| |
| ### Orc File Type |
| |
| If you assign file type to `parquet` `orc`, schema option not required, connector can find the schema of upstream data automatically. |
| |
| | Orc Data type | SeaTunnel Data type | |
| |----------------------------------|----------------------------------------------------------------| |
| | BOOLEAN | BOOLEAN | |
| | INT | INT | |
| | BYTE | BYTE | |
| | SHORT | SHORT | |
| | LONG | LONG | |
| | FLOAT | FLOAT | |
| | DOUBLE | DOUBLE | |
| | BINARY | BINARY | |
| | STRING<br/>VARCHAR<br/>CHAR<br/> | STRING | |
| | DATE | LOCAL_DATE_TYPE | |
| | TIMESTAMP | LOCAL_DATE_TIME_TYPE | |
| | DECIMAL | DECIMAL | |
| | LIST(STRING) | STRING_ARRAY_TYPE | |
| | LIST(BOOLEAN) | BOOLEAN_ARRAY_TYPE | |
| | LIST(TINYINT) | BYTE_ARRAY_TYPE | |
| | LIST(SMALLINT) | SHORT_ARRAY_TYPE | |
| | LIST(INT) | INT_ARRAY_TYPE | |
| | LIST(BIGINT) | LONG_ARRAY_TYPE | |
| | LIST(FLOAT) | FLOAT_ARRAY_TYPE | |
| | LIST(DOUBLE) | DOUBLE_ARRAY_TYPE | |
| | Map<K,V> | MapType, This type of K and V will transform to SeaTunnel type | |
| | STRUCT | SeaTunnelRowType | |
| |
| ### Parquet File Type |
| |
| If you assign file type to `parquet` `orc`, schema option not required, connector can find the schema of upstream data automatically. |
| |
| | Parquet Data type | SeaTunnel Data type | |
| |----------------------|----------------------------------------------------------------| |
| | INT_8 | BYTE | |
| | INT_16 | SHORT | |
| | DATE | DATE | |
| | TIMESTAMP_MILLIS | TIMESTAMP | |
| | INT64 | LONG | |
| | INT96 | TIMESTAMP | |
| | BINARY | BYTES | |
| | FLOAT | FLOAT | |
| | DOUBLE | DOUBLE | |
| | BOOLEAN | BOOLEAN | |
| | FIXED_LEN_BYTE_ARRAY | TIMESTAMP<br/> DECIMAL | |
| | DECIMAL | DECIMAL | |
| | LIST(STRING) | STRING_ARRAY_TYPE | |
| | LIST(BOOLEAN) | BOOLEAN_ARRAY_TYPE | |
| | LIST(TINYINT) | BYTE_ARRAY_TYPE | |
| | LIST(SMALLINT) | SHORT_ARRAY_TYPE | |
| | LIST(INT) | INT_ARRAY_TYPE | |
| | LIST(BIGINT) | LONG_ARRAY_TYPE | |
| | LIST(FLOAT) | FLOAT_ARRAY_TYPE | |
| | LIST(DOUBLE) | DOUBLE_ARRAY_TYPE | |
| | Map<K,V> | MapType, This type of K and V will transform to SeaTunnel type | |
| | STRUCT | SeaTunnelRowType | |
| |
| ## Options |
| |
| | name | type | required | default value | Description | |
| |----------------------------|---------|----------|---------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |
| | path | string | yes | - | The Oss path that needs to be read can have sub paths, but the sub paths need to meet certain format requirements. Specific requirements can be referred to "parse_partition_from_path" option | |
| | file_format_type | string | yes | - | File type, supported as the following file types: `text` `csv` `parquet` `orc` `json` `excel` `xml` `binary` `markdown` | |
| | bucket | string | yes | - | The bucket address of oss file system, for example: `oss://seatunnel-test`. | |
| | endpoint | string | yes | - | fs oss endpoint | |
| | read_columns | list | no | - | The read column list of the data source, user can use it to implement field projection. The file type supported column projection as the following shown: `text` `csv` `parquet` `orc` `json` `excel` `xml` . If the user wants to use this feature when reading `text` `json` `csv` files, the "schema" option must be configured. | |
| | access_key | string | no | - | | |
| | access_secret | string | no | - | | |
| | delimiter | string | no | \001 | Field delimiter, used to tell connector how to slice and dice fields when reading text files. Default `\001`, the same as hive's default delimiter. | |
| | row_delimiter | string | no | \n | Row delimiter, used to tell connector how to slice and dice rows when reading text files. Default `\n`. | |
| | parse_partition_from_path | boolean | no | true | Control whether parse the partition keys and values from file path. For example if you read a file from path `oss://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26`. Every record data from file will be added these two fields: name="tyrantlucifer", age=16 | |
| | date_format | string | no | yyyy-MM-dd | Date type format, used to tell connector how to convert string to date, supported as the following formats:`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd`. default `yyyy-MM-dd` | |
| | datetime_format | string | no | yyyy-MM-dd HH:mm:ss | Datetime type format, used to tell connector how to convert string to datetime, supported as the following formats:`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss` | |
| | time_format | string | no | HH:mm:ss | Time type format, used to tell connector how to convert string to time, supported as the following formats:`HH:mm:ss` `HH:mm:ss.SSS` | |
| | filename_extension | string | no | - | Filter filename extension, which used for filtering files with specific extension. Example: `csv` `.txt` `json` `.xml`. | |
| | skip_header_row_number | long | no | 0 | Skip the first few lines, but only for the txt and csv. For example, set like following:`skip_header_row_number = 2`. Then SeaTunnel will skip the first 2 lines from source files | |
| | csv_use_header_line | boolean | no | false | Whether to use the header line to parse the file, only used when the file_format is `csv` and the file contains the header line that match RFC 4180 | |
| | schema | config | no | - | The schema of upstream data. | |
| | sheet_name | string | no | - | Reader the sheet of the workbook,Only used when file_format is excel. | |
| | xml_row_tag | string | no | - | Specifies the tag name of the data rows within the XML file, only used when file_format is xml. | |
| | xml_use_attr_format | boolean | no | - | Specifies whether to process data using the tag attribute format, only used when file_format is xml. | |
| | csv_use_header_line | boolean | no | false | Whether to use the header line to parse the file, only used when the file_format is `csv` and the file contains the header line that match RFC 4180 | |
| | compress_codec | string | no | none | Which compress codec the files used. | |
| | encoding | string | no | UTF-8 | |
| | null_format | string | no | - | Only used when file_format_type is text. null_format to define which strings can be represented as null. e.g: `\N` | |
| | binary_chunk_size | int | no | 1024 | Only used when file_format_type is binary. The chunk size (in bytes) for reading binary files. Default is 1024 bytes. Larger values may improve performance for large files but use more memory. | |
| | binary_complete_file_mode | boolean | no | false | Only used when file_format_type is binary. Whether to read the complete file as a single chunk instead of splitting into chunks. When enabled, the entire file content will be read into memory at once. Default is false. | |
| | file_filter_pattern | string | no | | Filter pattern, which used for filtering files. | |
| | common-options | config | no | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. | |
| | file_filter_modified_start | string | no | - | File modification time filter. The connector will filter some files base on the last modification start time (include start time). The default data format is `yyyy-MM-dd HH:mm:ss`. | |
| | file_filter_modified_end | string | no | - | File modification time filter. The connector will filter some files base on the last modification end time (not include end time). The default data format is `yyyy-MM-dd HH:mm:ss`. | |
| |
| ### file_format_type [string] |
| |
| File type, supported as the following file types: |
| |
| `text` `csv` `parquet` `orc` `json` `excel` `xml` `binary` `markdown` |
| |
| If you assign file type to `markdown`, SeaTunnel can parse markdown files and extract structured data. |
| The markdown parser extracts various elements including headings, paragraphs, lists, code blocks, tables, and more. |
| Each element is converted to a row with the following schema: |
| - `element_id`: Unique identifier for the element |
| - `element_type`: Type of the element (Heading, Paragraph, ListItem, etc.) |
| - `heading_level`: Level of heading (1-6, null for non-heading elements) |
| - `text`: Text content of the element |
| - `page_number`: Page number (default: 1) |
| - `position_index`: Position index within the document |
| - `parent_id`: ID of the parent element |
| - `child_ids`: Comma-separated list of child element IDs |
| |
| Note: Markdown format only supports reading, not writing. |
| |
| ### compress_codec [string] |
| |
| The compress codec of files and the details that supported as the following shown: |
| |
| - txt: `lzo` `none` |
| - json: `lzo` `none` |
| - csv: `lzo` `none` |
| - orc/parquet: |
| automatically recognizes the compression type, no additional settings required. |
| |
| ### encoding [string] |
| |
| Only used when file_format_type is json,text,csv,xml. |
| The encoding of the file to read. This param will be parsed by `Charset.forName(encoding)`. |
| |
| ### binary_chunk_size [int] |
| |
| Only used when file_format_type is binary. |
| |
| The chunk size (in bytes) for reading binary files. Default is 1024 bytes. Larger values may improve performance for large files but use more memory. |
| |
| ### binary_complete_file_mode [boolean] |
| |
| Only used when file_format_type is binary. |
| |
| Whether to read the complete file as a single chunk instead of splitting into chunks. When enabled, the entire file content will be read into memory at once. Default is false. |
| |
| ### file_filter_pattern [string] |
| |
| Filter pattern, which used for filtering files. |
| |
| The pattern follows standard regular expressions. For details, please refer to https://en.wikipedia.org/wiki/Regular_expression. |
| There are some examples. |
| |
| File Structure Example: |
| ``` |
| /data/seatunnel/20241001/report.txt |
| /data/seatunnel/20241007/abch202410.csv |
| /data/seatunnel/20241002/abcg202410.csv |
| /data/seatunnel/20241005/old_data.csv |
| /data/seatunnel/20241012/logo.png |
| ``` |
| Matching Rules Example: |
| |
| **Example 1**: *Match all .txt files*,Regular Expression: |
| ``` |
| /data/seatunnel/20241001/.*\.txt |
| ``` |
| The result of this example matching is: |
| ``` |
| /data/seatunnel/20241001/report.txt |
| ``` |
| **Example 2**: *Match all file starting with abc*,Regular Expression: |
| ``` |
| /data/seatunnel/20241002/abc.* |
| ``` |
| The result of this example matching is: |
| ``` |
| /data/seatunnel/20241007/abch202410.csv |
| /data/seatunnel/20241002/abcg202410.csv |
| ``` |
| **Example 3**: *Match all file starting with abc,And the fourth character is either h or g*, the Regular Expression: |
| ``` |
| /data/seatunnel/20241007/abc[h,g].* |
| ``` |
| The result of this example matching is: |
| ``` |
| /data/seatunnel/20241007/abch202410.csv |
| ``` |
| **Example 4**: *Match third level folders starting with 202410 and files ending with .csv*, the Regular Expression: |
| ``` |
| /data/seatunnel/202410\d*/.*\.csv |
| ``` |
| The result of this example matching is: |
| ``` |
| /data/seatunnel/20241007/abch202410.csv |
| /data/seatunnel/20241002/abcg202410.csv |
| /data/seatunnel/20241005/old_data.csv |
| ``` |
| |
| ### schema [config] |
| |
| Only need to be configured when the file_format_type are text, json, excel, xml or csv ( Or other format we can't read the schema from metadata). |
| |
| #### fields [Config] |
| |
| The schema of upstream data. |
| |
| ## How to Create a Oss Data Synchronization Jobs |
| |
| The following example demonstrates how to create a data synchronization job that reads data from Oss and prints it on the local client: |
| |
| ```bash |
| # Set the basic configuration of the task to be performed |
| env { |
| parallelism = 1 |
| job.mode = "BATCH" |
| } |
| |
| # Create a source to connect to Oss |
| source { |
| OssFile { |
| path = "/seatunnel/orc" |
| bucket = "oss://tyrantlucifer-image-bed" |
| access_key = "xxxxxxxxxxxxxxxxx" |
| access_secret = "xxxxxxxxxxxxxxxxxxxxxx" |
| endpoint = "oss-cn-beijing.aliyuncs.com" |
| file_format_type = "orc" |
| } |
| } |
| |
| # Console printing of the read Oss data |
| sink { |
| Console { |
| } |
| } |
| ``` |
| |
| ```bash |
| # Set the basic configuration of the task to be performed |
| env { |
| parallelism = 1 |
| job.mode = "BATCH" |
| } |
| |
| # Create a source to connect to Oss |
| source { |
| OssFile { |
| path = "/seatunnel/json" |
| bucket = "oss://tyrantlucifer-image-bed" |
| access_key = "xxxxxxxxxxxxxxxxx" |
| access_secret = "xxxxxxxxxxxxxxxxxxxxxx" |
| endpoint = "oss-cn-beijing.aliyuncs.com" |
| file_format_type = "json" |
| schema { |
| fields { |
| id = int |
| name = string |
| } |
| } |
| } |
| } |
| |
| # Console printing of the read Oss data |
| sink { |
| Console { |
| } |
| } |
| ``` |
| |
| ### Multiple Table |
| |
| No need to config schema file type, eg: `orc`. |
| |
| ``` |
| env { |
| parallelism = 1 |
| spark.app.name = "SeaTunnel" |
| spark.executor.instances = 2 |
| spark.executor.cores = 1 |
| spark.executor.memory = "1g" |
| spark.master = local |
| job.mode = "BATCH" |
| } |
| |
| source { |
| OssFile { |
| tables_configs = [ |
| { |
| schema = { |
| table = "fake01" |
| } |
| bucket = "oss://whale-ops" |
| access_key = "xxxxxxxxxxxxxxxxxxx" |
| access_secret = "xxxxxxxxxxxxxxxxxxx" |
| endpoint = "https://oss-accelerate.aliyuncs.com" |
| path = "/test/seatunnel/read/orc" |
| file_format_type = "orc" |
| }, |
| { |
| schema = { |
| table = "fake02" |
| } |
| bucket = "oss://whale-ops" |
| access_key = "xxxxxxxxxxxxxxxxxxx" |
| access_secret = "xxxxxxxxxxxxxxxxxxx" |
| endpoint = "https://oss-accelerate.aliyuncs.com" |
| path = "/test/seatunnel/read/orc" |
| file_format_type = "orc" |
| } |
| ] |
| plugin_output = "fake" |
| } |
| } |
| |
| sink { |
| Assert { |
| rules { |
| table-names = ["fake01", "fake02"] |
| } |
| } |
| } |
| ``` |
| |
| Need config schema file type, eg: `json` |
| |
| ``` |
| |
| env { |
| execution.parallelism = 1 |
| spark.app.name = "SeaTunnel" |
| spark.executor.instances = 2 |
| spark.executor.cores = 1 |
| spark.executor.memory = "1g" |
| spark.master = local |
| job.mode = "BATCH" |
| } |
| |
| source { |
| OssFile { |
| tables_configs = [ |
| { |
| bucket = "oss://whale-ops" |
| access_key = "xxxxxxxxxxxxxxxxxxx" |
| access_secret = "xxxxxxxxxxxxxxxxxxx" |
| endpoint = "https://oss-accelerate.aliyuncs.com" |
| path = "/test/seatunnel/read/json" |
| file_format_type = "json" |
| schema = { |
| table = "fake01" |
| fields { |
| c_map = "map<string, string>" |
| c_array = "array<int>" |
| c_string = string |
| c_boolean = boolean |
| c_tinyint = tinyint |
| c_smallint = smallint |
| c_int = int |
| c_bigint = bigint |
| c_float = float |
| c_double = double |
| c_bytes = bytes |
| c_date = date |
| c_decimal = "decimal(38, 18)" |
| c_timestamp = timestamp |
| c_row = { |
| C_MAP = "map<string, string>" |
| C_ARRAY = "array<int>" |
| C_STRING = string |
| C_BOOLEAN = boolean |
| C_TINYINT = tinyint |
| C_SMALLINT = smallint |
| C_INT = int |
| C_BIGINT = bigint |
| C_FLOAT = float |
| C_DOUBLE = double |
| C_BYTES = bytes |
| C_DATE = date |
| C_DECIMAL = "decimal(38, 18)" |
| C_TIMESTAMP = timestamp |
| } |
| } |
| } |
| }, |
| { |
| bucket = "oss://whale-ops" |
| access_key = "xxxxxxxxxxxxxxxxxxx" |
| access_secret = "xxxxxxxxxxxxxxxxxxx" |
| endpoint = "https://oss-accelerate.aliyuncs.com" |
| path = "/test/seatunnel/read/json" |
| file_format_type = "json" |
| schema = { |
| table = "fake02" |
| fields { |
| c_map = "map<string, string>" |
| c_array = "array<int>" |
| c_string = string |
| c_boolean = boolean |
| c_tinyint = tinyint |
| c_smallint = smallint |
| c_int = int |
| c_bigint = bigint |
| c_float = float |
| c_double = double |
| c_bytes = bytes |
| c_date = date |
| c_decimal = "decimal(38, 18)" |
| c_timestamp = timestamp |
| c_row = { |
| C_MAP = "map<string, string>" |
| C_ARRAY = "array<int>" |
| C_STRING = string |
| C_BOOLEAN = boolean |
| C_TINYINT = tinyint |
| C_SMALLINT = smallint |
| C_INT = int |
| C_BIGINT = bigint |
| C_FLOAT = float |
| C_DOUBLE = double |
| C_BYTES = bytes |
| C_DATE = date |
| C_DECIMAL = "decimal(38, 18)" |
| C_TIMESTAMP = timestamp |
| } |
| } |
| } |
| } |
| ] |
| plugin_output = "fake" |
| } |
| } |
| |
| sink { |
| Assert { |
| rules { |
| table-names = ["fake01", "fake02"] |
| } |
| } |
| } |
| ``` |
| |
| ### Filter File |
| |
| ```hocon |
| env { |
| parallelism = 1 |
| job.mode = "BATCH" |
| } |
| |
| source { |
| OssFile { |
| path = "/seatunnel/orc" |
| bucket = "oss://tyrantlucifer-image-bed" |
| access_key = "xxxxxxxxxxxxxxxxx" |
| access_secret = "xxxxxxxxxxxxxxxxxxxxxx" |
| endpoint = "oss-cn-beijing.aliyuncs.com" |
| file_format_type = "orc" |
| // file example abcD2024.csv |
| file_filter_pattern = "abc[DX]*.*" |
| // file filter by modified date between 20240101 and 20240105(not include), actually 20240104 is end date |
| file_filter_modified_start = "2024-01-01 00:00:00" |
| file_filter_modified_end = "2024-01-05 00:00:00" |
| } |
| } |
| |
| sink { |
| Console { |
| } |
| } |
| ``` |
| |
| ## Changelog |
| |
| <ChangeLog /> |