blob: 2141e6977ad05ca703537aa106f98ab1b691b1fb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.fs.s3;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Module;
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.lib.io.block.AbstractBlockReader;
import com.datatorrent.lib.io.block.BlockMetadata;
import com.datatorrent.lib.io.fs.AbstractFileSplitter;
import com.datatorrent.netlet.util.Slice;
import static com.datatorrent.api.Context.OperatorContext.TIMEOUT_WINDOW_COUNT;
/**
* S3OutputModule can be used to upload the files/directory into S3. This module supports
* parallel uploads of multiple blocks of the same file and merge those blocks in sequence.
*
* Below operators are wrapped into single component using Module API
* - S3InitiateFileUploadOperator
* - S3BlockUploadOperator
* - S3FileMerger
*
* Initial BenchMark Results
* -------------------------
* The Module writes 18 MB/s to S3 using multi part upload feature with the following configuration
*
* File Size = 1 GB
* Partition count of S3BlockUploadOperator = 6
* Partition count of S3FileMerger = 1
* Container memory size of this module as follows:
* S3InitiateFileUploadOperator = 1 GB
* S3BlockUploadOperator = 2.5 GB
* S3FileMerger = 2 GB
*
*
* @displayName S3 Output Module
* @tags S3, Output
*
* @since 3.7.0
*/
@InterfaceStability.Evolving
public class S3OutputModule implements Module
{
/**
* AWS access key
*/
@NotNull
private String accessKey;
/**
* AWS secret access key
*/
@NotNull
private String secretAccessKey;
/**
* S3 End point
*/
private String endPoint;
/**
* Name of the bucket in which to upload the files
*/
@NotNull
private String bucketName;
/**
* Path of the output directory. Relative path of the files copied will be
* maintained w.r.t. source directory and output directory
*/
@NotNull
private String outputDirectoryPath;
/**
* Specified as count of streaming windows. This value will set to the operators in this module because
* the operators in this module is mostly interacts with the Amazon S3.
* Due to this reason, window id of these operators might be lag behind with the upstream operators.
*/
@Min(120)
private int timeOutWindowCount = 6000;
/**
* Creates the number of instances of S3FileMerger operator.
*/
@Min(1)
private int mergerCount = 1;
/**
* Input port for files metadata.
*/
public final transient ProxyInputPort<AbstractFileSplitter.FileMetadata> filesMetadataInput = new ProxyInputPort<AbstractFileSplitter.FileMetadata>();
/**
* Input port for blocks metadata
*/
public final transient ProxyInputPort<BlockMetadata.FileBlockMetadata> blocksMetadataInput = new ProxyInputPort<BlockMetadata.FileBlockMetadata>();
/**
* Input port for blocks data
*/
public final transient ProxyInputPort<AbstractBlockReader.ReaderRecord<Slice>> blockData = new ProxyInputPort<AbstractBlockReader.ReaderRecord<Slice>>();
@Override
public void populateDAG(DAG dag, Configuration conf)
{
// DAG for S3 Output Module as follows:
// ---- S3InitiateFileUploadOperator -----|
// | S3FileMerger
// ---- S3BlockUploadOperator ------------|
S3InitiateFileUploadOperator initiateUpload = dag.addOperator("InitiateUpload", createS3InitiateUpload());
initiateUpload.setAccessKey(accessKey);
initiateUpload.setSecretAccessKey(secretAccessKey);
initiateUpload.setBucketName(bucketName);
initiateUpload.setOutputDirectoryPath(outputDirectoryPath);
S3BlockUploadOperator blockUploader = dag.addOperator("BlockUpload", createS3BlockUpload());
blockUploader.setAccessKey(accessKey);
blockUploader.setSecretAccessKey(secretAccessKey);
blockUploader.setBucketName(bucketName);
S3FileMerger fileMerger = dag.addOperator("FileMerger", createS3FileMerger());
fileMerger.setAccessKey(accessKey);
fileMerger.setSecretAccessKey(secretAccessKey);
fileMerger.setBucketName(bucketName);
if (endPoint != null) {
initiateUpload.setEndPoint(endPoint);
blockUploader.setEndPoint(endPoint);
fileMerger.setEndPoint(endPoint);
}
dag.setInputPortAttribute(blockUploader.blockInput, Context.PortContext.PARTITION_PARALLEL, true);
dag.setInputPortAttribute(blockUploader.blockMetadataInput, Context.PortContext.PARTITION_PARALLEL, true);
dag.setAttribute(initiateUpload, TIMEOUT_WINDOW_COUNT, timeOutWindowCount);
dag.setAttribute(blockUploader, TIMEOUT_WINDOW_COUNT, timeOutWindowCount);
dag.setAttribute(fileMerger, TIMEOUT_WINDOW_COUNT, timeOutWindowCount);
dag.setUnifierAttribute(blockUploader.output, TIMEOUT_WINDOW_COUNT, timeOutWindowCount);
dag.setAttribute(fileMerger,Context.OperatorContext.PARTITIONER, new StatelessPartitioner<S3FileMerger>(mergerCount));
// Add Streams
dag.addStream("InitiateUploadIDToMerger", initiateUpload.fileMetadataOutput, fileMerger.filesMetadataInput);
dag.addStream("InitiateUploadIDToWriter", initiateUpload.uploadMetadataOutput, blockUploader.uploadMetadataInput);
dag.addStream("WriterToMerger", blockUploader.output, fileMerger.uploadMetadataInput);
// Set the proxy ports
filesMetadataInput.set(initiateUpload.filesMetadataInput);
blocksMetadataInput.set(blockUploader.blockMetadataInput);
blockData.set(blockUploader.blockInput);
}
/**
* Create the S3InitiateFileUploadOperator for initiate upload
* @return S3InitiateFileUploadOperator
*/
protected S3InitiateFileUploadOperator createS3InitiateUpload()
{
return new S3InitiateFileUploadOperator();
}
/**
* Create the S3BlockUploadOperator for block upload into S3 bucket
* @return S3BlockUploadOperator
*/
protected S3BlockUploadOperator createS3BlockUpload()
{
return new S3BlockUploadOperator();
}
/**
* Create the S3FileMerger for sending complete request
* @return S3FileMerger
*/
protected S3FileMerger createS3FileMerger()
{
return new S3FileMerger();
}
/**
* Get the AWS access key
* @return AWS access key
*/
public String getAccessKey()
{
return accessKey;
}
/**
* Set the AWS access key
* @param accessKey access key
*/
public void setAccessKey(String accessKey)
{
this.accessKey = accessKey;
}
/**
* Return the AWS secret access key
* @return AWS secret access key
*/
public String getSecretAccessKey()
{
return secretAccessKey;
}
/**
* Set the AWS secret access key
* @param secretAccessKey AWS secret access key
*/
public void setSecretAccessKey(String secretAccessKey)
{
this.secretAccessKey = secretAccessKey;
}
/**
* Get the name of the bucket in which to upload the files
* @return bucket name
*/
public String getBucketName()
{
return bucketName;
}
/**
* Set the name of the bucket in which to upload the files
* @param bucketName name of the bucket
*/
public void setBucketName(String bucketName)
{
this.bucketName = bucketName;
}
/**
* Return the S3 End point
* @return S3 End point
*/
public String getEndPoint()
{
return endPoint;
}
/**
* Set the S3 End point
* @param endPoint S3 end point
*/
public void setEndPoint(String endPoint)
{
this.endPoint = endPoint;
}
/**
* Get the path of the output directory.
* @return path of output directory
*/
public String getOutputDirectoryPath()
{
return outputDirectoryPath;
}
/**
* Set the path of the output directory.
* @param outputDirectoryPath path of output directory
*/
public void setOutputDirectoryPath(String outputDirectoryPath)
{
this.outputDirectoryPath = outputDirectoryPath;
}
/**
* Get the number of streaming windows for the operators which have stalled processing.
* @return the number of streaming windows
*/
public int getTimeOutWindowCount()
{
return timeOutWindowCount;
}
/**
* Set the number of streaming windows.
* @param timeOutWindowCount given number of streaming windows for time out.
*/
public void setTimeOutWindowCount(int timeOutWindowCount)
{
this.timeOutWindowCount = timeOutWindowCount;
}
/**
* Get the partition count of S3FileMerger operator
* @return the partition count
*/
public int getMergerCount()
{
return mergerCount;
}
/**
* Set the partition count of S3FileMerger Operator
* @param mergerCount given partition count
*/
public void setMergerCount(int mergerCount)
{
this.mergerCount = mergerCount;
}
}