import ChangeLog from ‘../changelog/connector-file-s3.md’;
S3 文件 Sink 连接器
Spark
Flink
SeaTunnel Zeta
[x] 多模态
使用二进制文件格式读取和写入任何格式的文件,例如视频、图片等。简而言之,任何文件都可以同步到目标位置。
[x] 精确一次
默认情况下,我们使用 2PC 提交来确保 精确一次
。
[ ] cdc
[x] 支持多表写入
[x] 文件格式类型
将数据输出到 AWS S3 文件系统。
数据源 | 支持的版本 |
---|---|
S3 | 当前版本 |
如果您使用 Spark/Flink,为了使用此连接器,您必须确保您的 Spark/Flink 集群已经集成了 Hadoop。测试的 Hadoop 版本为 2.x。
如果您使用 SeaTunnel引擎,当您下载并安装 SeaTunnel引擎时,它会自动集成 Hadoop jar 包。您可以在
${SEATUNNEL_HOME}/lib
下检查 jar 包以确认这一点。 要使用此连接器,您需要将hadoop-aws-3.1.4.jar
和aws-java-sdk-bundle-1.12.692.jar
放在${SEATUNNEL_HOME}/lib
目录下。
如果写入 csv
、text
文件类型,所有列都将为字符串类型。
SeaTunnel 数据类型 | Orc 数据类型 |
---|---|
STRING | STRING |
BOOLEAN | BOOLEAN |
TINYINT | BYTE |
SMALLINT | SHORT |
INT | INT |
BIGINT | LONG |
FLOAT | FLOAT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL | DECIMAL |
BYTES | BINARY |
DATE | DATE |
TIME TIMESTAMP | TIMESTAMP |
ROW | STRUCT |
NULL | 不支持的数据类型 |
ARRAY | LIST |
Map | Map |
SeaTunnel 数据类型 | Parquet 数据类型 |
---|---|
STRING | STRING |
BOOLEAN | BOOLEAN |
TINYINT | INT_8 |
SMALLINT | INT_16 |
INT | INT32 |
BIGINT | INT64 |
FLOAT | FLOAT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL | DECIMAL |
BYTES | BINARY |
DATE | DATE |
TIME TIMESTAMP | TIMESTAMP_MILLIS |
ROW | GroupType |
NULL | 不支持的数据类型 |
ARRAY | LIST |
Map | Map |
名称 | 类型 | 是否必填 | 默认值 | 描述 |
---|---|---|---|---|
path | string | 是 | - | |
tmp_path | string | 否 | /tmp/seatunnel | 结果文件将首先写入临时路径,然后使用 mv 将临时目录提交到目标目录。需要一个 S3 目录。 |
bucket | string | 是 | - | |
fs.s3a.endpoint | string | 是 | - | |
fs.s3a.aws.credentials.provider | string | 是 | com.amazonaws.auth.InstanceProfileCredentialsProvider | 认证 s3a 的方式。目前仅支持 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 和 com.amazonaws.auth.InstanceProfileCredentialsProvider 。 |
access_key | string | 否 | - | 仅当 fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 时使用 |
secret_key | string | 否 | - | 仅当 fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 时使用 |
custom_filename | boolean | 否 | false | 是否需要自定义文件名 |
file_name_expression | string | 否 | “${transactionId}” | 仅当 custom_filename 为 true 时使用 |
filename_time_format | string | 否 | “yyyy.MM.dd” | 仅当 custom_filename 为 true 时使用 |
file_format_type | string | 否 | “csv” | |
field_delimiter | string | 否 | ‘\001’ | 仅当 file_format 为 text 时使用 |
row_delimiter | string | 否 | “\n” | 仅当 file_format 为 text 、csv 、json 时使用 |
have_partition | boolean | 否 | false | 是否需要处理分区。 |
partition_by | array | 否 | - | 仅当 have_partition 为 true 时使用 |
partition_dir_expression | string | 否 | “${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/” | 仅当 have_partition 为 true 时使用 |
is_partition_field_write_in_file | boolean | 否 | false | 仅当 have_partition 为 true 时使用 |
sink_columns | array | 否 | 当此参数为空时,所有字段均为 sink 列 | |
is_enable_transaction | boolean | 否 | true | |
batch_size | int | 否 | 1000000 | |
compress_codec | string | 否 | none | |
common-options | object | 否 | - | |
max_rows_in_memory | int | 否 | - | 仅当 file_format 为 excel 时使用 |
sheet_name | string | 否 | Sheet${Random number} | 仅当 file_format 为 excel 时使用 |
csv_string_quote_mode | enum | 否 | MINIMAL | 仅当 file_format 为 csv 时使用 |
xml_root_tag | string | 否 | RECORDS | 仅当 file_format 为 xml 时使用,指定 XML 文件中根元素的标签名称。 |
xml_row_tag | string | 否 | RECORD | 仅当 file_format 为 xml 时使用,指定 XML 文件中数据行的标签名称。 |
xml_use_attr_format | boolean | 否 | - | 仅当 file_format 为 xml 时使用,指定是否使用标签属性格式处理数据。 |
single_file_mode | boolean | 否 | false | 每个并行度只会输出一个文件。当此参数开启时,batch_size 将不会生效。输出文件名不会有文件块后缀。 |
create_empty_file_when_no_data | boolean | 否 | false | 当上游没有数据同步时,仍然会生成相应的数据文件。 |
parquet_avro_write_timestamp_as_int96 | boolean | 否 | false | 仅当 file_format 为 parquet 时使用 |
parquet_avro_write_fixed_as_int96 | array | 否 | - | 仅当 file_format 为 parquet 时使用 |
hadoop_s3_properties | map | 否 | 如果您需要添加其他选项,可以在此处添加,并参考此链接 | |
schema_save_mode | Enum | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST | 在开启同步任务之前,对目标路径进行不同的处理 |
data_save_mode | Enum | 否 | APPEND_DATA | 在开启同步任务之前,对目标路径中的数据文件进行不同的处理 |
enable_header_write | boolean | 否 | false | 仅当 file_format_type 为 text,csv 时使用。 false: 不写入表头, true: 写入表头。 |
encoding | string | 否 | “UTF-8” | 仅当 file_format_type 为 json,text,csv,xml 时使用。 |
merge_update_event | boolean | 否 | false | 仅当file_format_type为canal_json、debezium_json、maxwell_json. |
存储数据文件的路径,支持变量替换。例如:path=/test/${database_name}/${schema_name}/${table_name}
如果您需要添加其他选项,可以在此处添加,并参考此链接
hadoop_s3_properties { "fs.s3a.buffer.dir" = "/data/st_test/s3a" "fs.s3a.fast.upload.buffer" = "disk" }
是否自定义文件名
仅当 custom_filename
为 true
时使用
file_name_expression
描述了将创建到 path
中的文件表达式。我们可以在 file_name_expression
中添加变量 ${now}
或 ${uuid}
,例如 test_${uuid}_${now}
, ${now}
表示当前时间,其格式可以通过指定选项 filename_time_format
来定义。
请注意,如果 is_enable_transaction
为 true
,我们会在文件头部自动添加 ${transactionId}_
。
仅当 custom_filename
为 true
时使用
当 file_name_expression
参数中的格式为 xxxx-${now}
时,filename_time_format
可以指定路径的时间格式,默认值为 yyyy.MM.dd
。常用的时间格式如下:
符号 | 描述 |
---|---|
y | 年 |
M | 月 |
d | 日 |
H | 小时 (0-23) |
m | 分钟 |
s | 秒 |
我们支持以下文件类型:
text
csv
parquet
orc
json
excel
xml
binary
canal_json
debezium_json
maxwell_json
请注意,最终文件名将以文件格式类型的后缀结尾,文本文件的后缀为 txt
。
行数据中列之间的分隔符。仅在 text
文件格式中需要。
文件中行之间的分隔符。仅在 text
、csv
、json
文件格式中需要。
是否需要处理分区。
仅当 have_partition
为 true
时使用。
根据选定的字段对数据进行分区。
仅当 have_partition
为 true
时使用。
如果指定了 partition_by
,我们将根据分区信息生成相应的分区目录,最终文件将放置在分区目录中。
默认的 partition_dir_expression
为 ${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/
。k0
是第一个分区字段,v0
是第一个分区字段的值。
仅当 have_partition
为 true
时使用。
如果 is_partition_field_write_in_file
为 true
,分区字段及其值将被写入数据文件。
例如,如果您想写入 Hive 数据文件,其值应为 false
。
哪些列需要写入文件,默认值为从 Transform
或 Source
获取的所有列。 字段的顺序决定了文件实际写入的顺序。
如果 is_enable_transaction
为 true,我们将确保在将数据写入目标目录时不会丢失或重复。
请注意,如果 is_enable_transaction
为 true
,我们会在文件头部自动添加 ${transactionId}_
。
目前仅支持 true
。
文件中的最大行数。对于 SeaTunnel Engine,文件中的行数由 batch_size
和 checkpoint.interval
共同决定。如果 checkpoint.interval
的值足够大,sink writer 将一直写入文件,直到文件中的行数超过 batch_size
。如果 checkpoint.interval
较小,sink writer 将在新的 checkpoint 触发时创建一个新文件。
文件的压缩编解码器,支持的详细信息如下:
lzo
none
lzo
none
lzo
none
lzo
snappy
lz4
zlib
none
lzo
snappy
lz4
gzip
brotli
zstd
none
提示:excel 类型不支持任何压缩格式
Sink 插件通用参数,请参考 Sink 通用选项 获取详细信息。
当文件格式为 Excel 时,内存中可以缓存的最大数据项数。
写入工作表的名称
当文件格式为 CSV 时,CSV 的字符串引用模式。
指定 XML 文件中根元素的标签名称。
指定 XML 文件中数据行的标签名称。
指定是否使用标签属性格式处理数据。
支持将时间戳写入 Parquet INT96,仅对 parquet 文件有效。
支持将 12-byte 字段写入 Parquet INT96,仅对 parquet 文件有效。
在开启同步任务之前,对目标路径进行不同的处理。
选项介绍:RECREATE_SCHEMA
:当路径不存在时创建。如果路径已存在,则删除路径并重新创建。CREATE_SCHEMA_WHEN_NOT_EXIST
:当路径不存在时创建,路径存在时使用路径。ERROR_WHEN_SCHEMA_NOT_EXIST
:当路径不存在时报错IGNORE
:忽略表的处理
在开启同步任务之前,对目标路径中的数据文件进行不同的处理。 选项介绍:DROP_DATA
:使用路径但删除路径中的数据文件。 APPEND_DATA
:使用路径,并在路径中添加新文件以写入数据。ERROR_WHEN_DATA_EXISTS
:当路径中存在数据文件时,将报错。
仅当 file_format_type 为 json,text,csv,xml 时使用。 写入文件的编码。此参数将由 Charset.forName(encoding)
解析。
仅当file_format_type为canal_json、debezium_json、maxwell_json时使用. 设置成true,序列化数据时,UPDATE_AFTER 和 UPDATE_BEFORE 会合并成 UPDATE; 设置成false,序列化数据时,UPDATE_AFTER 和 UPDATE_BEFORE 不会合并;
此示例定义了一个 SeaTunnel 同步任务,通过 FakeSource 自动生成数据并将其发送到 S3File Sink。FakeSource 总共生成 16 行数据 (row.num=16),每行有两个字段,name (字符串类型) 和 age (int 类型)。最终的目标 s3 目录将创建一个文件,并将所有数据写入其中。 在运行此作业之前,您需要创建 s3 路径:/seatunnel/text。如果您尚未安装和部署 SeaTunnel,您需要按照 安装 SeaTunnel 中的说明安装和部署 SeaTunnel。然后按照 使用 SeaTunnel Engine 快速入门 中的说明运行此作业。
# 定义运行时环境 env { parallelism = 1 job.mode = "BATCH" } source { # 这是一个示例源插件,仅用于测试和演示功能源插件 FakeSource { parallelism = 1 plugin_output = "fake" row.num = 16 schema = { fields { c_map = "map<string, array<int>>" c_array = "array<int>" name = string c_boolean = boolean age = tinyint c_smallint = smallint c_int = int c_bigint = bigint c_float = float c_double = double c_decimal = "decimal(16, 1)" c_null = "null" c_bytes = bytes c_date = date c_timestamp = timestamp } } } # 如果您想了解更多关于如何配置SeaTunnel以及查看完整的源插件列表, # 请访问 https://seatunnel.apache.org/docs/connector-v2/source source { } transform { # 如果您想了解更多关于如何配置SeaTunnel以及查看完整的转换插件列表, # 请访问 https://seatunnel.apache.org/docs/transform-v2 } sink { S3File { bucket = "s3a://seatunnel-test" tmp_path = "/tmp/seatunnel" path="/seatunnel/text" fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn" fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider" file_format_type = "text" field_delimiter = "\t" row_delimiter = "\n" have_partition = true partition_by = ["age"] partition_dir_expression = "${k0}=${v0}" is_partition_field_write_in_file = true custom_filename = true file_name_expression = "${transactionId}_${now}" filename_time_format = "yyyy.MM.dd" sink_columns = ["name","age"] is_enable_transaction=true hadoop_s3_properties { "fs.s3a.buffer.dir" = "/data/st_test/s3a" "fs.s3a.fast.upload.buffer" = "disk" } } # 如果您想了解更多关于如何配置SeaTunnel以及查看完整的接收插件列表, # 请访问 https://seatunnel.apache.org/docs/connector-v2/sink }
对于文本文件格式,包含 have_partition
、custom_filename
、sink_columns
和 com.amazonaws.auth.InstanceProfileCredentialsProvider
S3File { bucket = "s3a://seatunnel-test" tmp_path = "/tmp/seatunnel" path="/seatunnel/text" fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn" fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider" file_format_type = "text" field_delimiter = "\t" row_delimiter = "\n" have_partition = true partition_by = ["age"] partition_dir_expression = "${k0}=${v0}" is_partition_field_write_in_file = true custom_filename = true file_name_expression = "${transactionId}_${now}" filename_time_format = "yyyy.MM.dd" sink_columns = ["name","age"] is_enable_transaction=true hadoop_s3_properties { "fs.s3a.buffer.dir" = "/data/st_test/s3a" "fs.s3a.fast.upload.buffer" = "disk" } }
对于Parquet文件格式,简单配置使用 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
S3File { bucket = "s3a://seatunnel-test" tmp_path = "/tmp/seatunnel" path="/seatunnel/parquet" fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn" fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" access_key = "xxxxxxxxxxxxxxxxx" secret_key = "xxxxxxxxxxxxxxxxx" file_format_type = "parquet" hadoop_s3_properties { "fs.s3a.buffer.dir" = "/data/st_test/s3a" "fs.s3a.fast.upload.buffer" = "disk" } }
对于ORC文件格式,简单配置使用 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
S3File { bucket = "s3a://seatunnel-test" tmp_path = "/tmp/seatunnel" path="/seatunnel/orc" fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn" fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" access_key = "xxxxxxxxxxxxxxxxx" secret_key = "xxxxxxxxxxxxxxxxx" file_format_type = "orc" schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" data_save_mode="APPEND_DATA" }
多表写入和保存模式
env { "job.name"="SeaTunnel_job" "job.mode"=STREAMING } source { MySQL-CDC { database-names=[ "wls_t1" ] table-names=[ "wls_t1.mysqlcdc_to_s3_t3", "wls_t1.mysqlcdc_to_s3_t4", "wls_t1.mysqlcdc_to_s3_t5", "wls_t1.mysqlcdc_to_s3_t1", "wls_t1.mysqlcdc_to_s3_t2" ] password="xxxxxx" username="xxxxxxxxxxxxx" url="jdbc:mysql://localhost:3306/qa_source" } } transform { } sink { S3File { bucket = "s3a://seatunnel-test" tmp_path = "/tmp/seatunnel/${table_name}" path="/test/${table_name}" fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn" fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" access_key = "xxxxxxxxxxxxxxxxx" secret_key = "xxxxxxxxxxxxxxxxx" file_format_type = "orc" schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" data_save_mode="APPEND_DATA" } }
仅在 file_format_type 为 text 或 csv 时使用。false:不写入表头,true:写入表头。