Amazon S3 (Simple Storage Service) is an object storage system with a web service interface to store and retrieve any amount of data at any time from anywhere on the web, offered by Amazon Web Services.
Purpose of S3Output module is to upload files/directories into an Amazon S3 bucket using the multipart upload feature(see below).
S3Output module is fault-tolerant, statically/dynamically partitionable and has exactly once semantics.
Module class is S3OutputModule located in the package org.apache.apex.malhar.lib.fs.s3; please refer to github URL.
File upload to S3 can also be done using AbstractFileOutputOperator but that operator uploads large files sequentially; the current module in contrast can substantially improve the upload speed of large files by reading and uploading their constituent blocks in parallel.
The table below lists additional benefits of this module over AbstractFileOutputOperator.
S3OutputModule | AbstractFileOutputOperator |
---|---|
Maximum upload file size is 5TB. | Maximum upload file size is 5GB. |
Best fit for both large and small files. | Best fit for small files. |
Module uses AmazonS3Client API's to upload objects into S3. Large files will upload using multipart feature and small files(single block) will upload using putObject(...) API | Operator uses Hadoop filesystems like S3AFileSystem. Consists of couple of steps to upload object into S3: (1) Write the data into the local filesystem. (2) When the stream closes, filesystem uploads the local object into S3. |
If a block fails to upload then you need to re-upload the data for that block only | If a file fails to upload then you need to re-upload the complete file. |
Uploading parts of a file is done via the multipart feature; using this feature, each part of a file can be uploaded independently. After all parts of a file are uploaded successfully, Amazon S3 combines the parts as a single object.
Please refer to the Java code for uploading file into Amazon S3 bucket using multipart feature.
A module is a group of operators pre-wired together so they work as a single conceptual entity in an application. Typically, a module will contain a set of input ports, output ports and configuration properties. The operators internal to the module will be automatically configured based on the supplied module properties.
Following diagram illustrates the DAG in this module:
S3InitiateFileUploadOperator
S3BlockUploadOperator
S3FileMerger
accessKey - String
secretAccessKey - String
endPoint - String
bucketName - String
outputDirectoryPath - String
mergerCount - int
timeOutWIndowCount - int
filesMetadataInput - AbstractFileSplitter.FileMetadata
blocksMetadataInput - BlockMetadata.FileBlockMetadata
blockData - AbstractBlockReader.ReaderRecord
Please refer to Example for S3OutputModule sample application.
Partitioning the module means that the operators in the module can be partitioned.
Partitioning the operator in module can be achieved as follows:
Partition of this operator is achieved indirectly as follows:
<property> <name>dt.operator.{ModuleName}#InitiateUpload.attr.PARTITIONER</name> <value>com.datatorrent.common.partitioner.StatelessPartitioner:{N}</value> </property>
where {ModuleName} is the name of the S3OutputModule and {N} is the number of static partitions. Above lines will partition S3InitiateFileUploadOperator statically {N} times.
Locality of S3BlockUploadOperator with upstream operator (FSInputModule/BlockReader) must set to PARTITION_PARALLEL for performance benefits by avoiding serialization/deserialization of objects. So, partitioning of this operator depends on upstream operator which is of type FSInputModule/BlockReader.
By setting the parameter “mergerCount”, S3FileMerger be statically partitioned. This can be achieved by two ways:
(a) Following code can be added to populateDAG(DAG dag, Configuration conf) method of application to statically partitioning S3FileMerger {N} times:
FSInputModule inputModule = dag.addModule("HDFSInputModule", new FSInputModule()); S3OutputModule outputModule = dag.addModule("S3OutputModule", new S3OutputModule()); outputModule.setMergerCount({N});
(b) By setting the parameter in properties file as follows
<property> <name>dt.operator.{ModuleName}.prop.mergerCount</name> <value>{N}</value> </property>
where {ModuleName} is the name of the S3OutputModule and {N} is the number of static partitions. Above lines will partition S3FileMerger statically {N} times.
Dynamic partitioning is a feature of Apex platform which changes the number of partitions of an operator at run time. Locality of S3BlockUploadOperator with upstream operator(FSInputModule/BlockReader) must set to PARTITION_PARALLEL for performance benefits by avoiding serialization/deserialization of objects. So, dynamic partitioning of this operator depends on upstream operator which is of type FSInputModule/BlockReader.
From the example application, by setting the maxReaders and minReaders value to FSInputModule, S3BlockUploadOperator dynamically partitioned between minReaders and maxReaders. This can be achieved by two ways: (a) Following code can be added to populateDAG(DAG dag, Configuration conf) method of application to dynamically partitioned S3BlockUploadOperator between {N1} and {N2} times:
FSInputModule inputModule = dag.addModule("HDFSInputModule", new FSInputModule()); inputModule.setMinReaders({N1}); inputModule.setMaxReaders({N2}); S3OutputModule outputModule = dag.addModule("S3OutputModule", new S3OutputModule());
(b) By setting the parameter in properties file as follows:
<property> <name>dt.operator.HDFSInputModule.prop.minReaders</name> <value>{N1}</value> </property> <property> <name>dt.operator.HDFSInputModule.prop.maxReaders</name> <value>{N2}</value> </property>
{N1} and {N2} represents the number of minimum and maximum partitions of BlockReader. Above lines will dynamically partitioned the S3BlockUploadOperator between {N1} and {N2} times.