blob: 80d9420826cbdbaf443efdd3e7d5a46a065c7a87 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.fs.s3;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.io.block.AbstractBlockReader;
import org.apache.apex.malhar.lib.io.block.BlockMetadata;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.hadoop.classification.InterfaceStability;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.google.common.base.Preconditions;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.netlet.util.Slice;
/**
* This operator can be used to upload the block into S3 bucket using multi-part feature or putObject API.
* Upload the block into S3 using multi-part feature only if the number of blocks of a file is > 1.
* This operator is useful in context of S3 Output Module.
*
* @since 3.7.0
*/
@InterfaceStability.Evolving
public class S3BlockUploadOperator implements Operator, Operator.CheckpointNotificationListener, Operator.IdleTimeHandler
{
private static final Logger LOG = LoggerFactory.getLogger(S3BlockUploadOperator.class);
@NotNull
private String bucketName;
@NotNull
private String accessKey;
@NotNull
private String secretAccessKey;
private String endPoint;
private Map<String, S3BlockMetaData> blockInfo = new HashMap<>();
private transient Map<Long, String> blockIdToFilePath = new HashMap<>();
private WindowDataManager windowDataManager = new FSWindowDataManager();
protected transient AmazonS3 s3Client;
private transient long currentWindowId;
private transient List<AbstractBlockReader.ReaderRecord<Slice>> waitingTuples;
private transient Map<String, UploadBlockMetadata> currentWindowRecoveryState;
public final transient DefaultOutputPort<UploadBlockMetadata> output = new DefaultOutputPort<>();
/**
* This input port receives incoming tuple's(Block data).
*/
public final transient DefaultInputPort<AbstractBlockReader.ReaderRecord<Slice>> blockInput = new DefaultInputPort<AbstractBlockReader.ReaderRecord<Slice>>()
{
@Override
public void process(AbstractBlockReader.ReaderRecord<Slice> tuple)
{
uploadBlockIntoS3(tuple);
}
};
/**
* Input port to receive Block meta data
*/
public final transient DefaultInputPort<BlockMetadata.FileBlockMetadata> blockMetadataInput = new DefaultInputPort<BlockMetadata.FileBlockMetadata>()
{
@Override
public void process(BlockMetadata.FileBlockMetadata blockMetadata)
{
if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) {
return;
}
blockIdToFilePath.put(blockMetadata.getBlockId(), blockMetadata.getFilePath());
LOG.debug("received blockId {} for file {} ", blockMetadata.getBlockId(), blockMetadata.getFilePath());
}
};
/**
* Input port to receive upload file meta data.
*/
public final transient DefaultInputPort<S3InitiateFileUploadOperator.UploadFileMetadata> uploadMetadataInput = new DefaultInputPort<S3InitiateFileUploadOperator.UploadFileMetadata>()
{
@Override
public void process(S3InitiateFileUploadOperator.UploadFileMetadata tuple)
{
processUploadFileMetadata(tuple);
}
};
/**
* Convert each block of a given file into S3BlockMetaData
* @param tuple UploadFileMetadata
*/
protected void processUploadFileMetadata(S3InitiateFileUploadOperator.UploadFileMetadata tuple)
{
long[] blocks = tuple.getFileMetadata().getBlockIds();
String filePath = tuple.getFileMetadata().getFilePath();
for (int i = 0; i < blocks.length; i++) {
String blockId = getUniqueBlockIdFromFile(blocks[i], filePath);
if (blockInfo.get(blockId) != null) {
break;
}
blockInfo.put(blockId, new S3BlockMetaData(tuple.getKeyName(), tuple.getUploadId(), i + 1));
}
if (blocks.length > 0) {
blockInfo.get(getUniqueBlockIdFromFile(blocks[blocks.length - 1], filePath)).setLastBlock(true);
}
}
/**
* Construct the unique block id from the given block id and file path.
* @param blockId Id of the block
* @param filepath given filepath
* @return unique block id
*/
public static String getUniqueBlockIdFromFile(long blockId, String filepath)
{
return Long.toString(blockId) + "|" + filepath;
}
@Override
public void setup(Context.OperatorContext context)
{
waitingTuples = new ArrayList<>();
currentWindowRecoveryState = new HashMap<>();
windowDataManager.setup(context);
s3Client = createClient();
}
/**
* Create AmazonS3 client using AWS credentials
* @return AmazonS3
*/
protected AmazonS3 createClient()
{
AmazonS3 client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretAccessKey));
if (endPoint != null) {
client.setEndpoint(endPoint);
}
return client;
}
@Override
public void beginWindow(long windowId)
{
currentWindowId = windowId;
if (windowId <= windowDataManager.getLargestCompletedWindow()) {
replay(windowId);
}
}
/**
* Replay the state.
* @param windowId replay window Id
*/
protected void replay(long windowId)
{
try {
@SuppressWarnings("unchecked")
Map<String, UploadBlockMetadata> recoveredData = (Map<String, UploadBlockMetadata>)windowDataManager.retrieve(windowId);
if (recoveredData == null) {
return;
}
for (Map.Entry<String, UploadBlockMetadata> uploadBlockMetadata: recoveredData.entrySet()) {
output.emit(uploadBlockMetadata.getValue());
blockInfo.remove(uploadBlockMetadata.getKey());
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void endWindow()
{
if (waitingTuples.size() > 0) {
processWaitBlocks();
}
for (String uniqueblockId : currentWindowRecoveryState.keySet()) {
long blockId = Long.parseLong(uniqueblockId.substring(0, uniqueblockId.indexOf("|")));
LOG.debug("Successfully uploaded {} block", blockId);
blockIdToFilePath.remove(blockId);
blockInfo.remove(uniqueblockId);
}
if (blockIdToFilePath.size() > 0) {
for (Long blockId : blockIdToFilePath.keySet()) {
LOG.info("Unable to uploaded {} block", blockId);
}
blockIdToFilePath.clear();
}
if (currentWindowId > windowDataManager.getLargestCompletedWindow()) {
try {
windowDataManager.save(currentWindowRecoveryState, currentWindowId);
} catch (IOException e) {
throw new RuntimeException("Unable to save recovery", e);
}
}
currentWindowRecoveryState.clear();
}
@Override
public void teardown()
{
windowDataManager.teardown();
}
/**
* Process the blocks which are in wait state.
*/
private void processWaitBlocks()
{
Iterator<AbstractBlockReader.ReaderRecord<Slice>> waitIterator = waitingTuples.iterator();
while (waitIterator.hasNext()) {
AbstractBlockReader.ReaderRecord<Slice> blockData = waitIterator.next();
String filePath = blockIdToFilePath.get(blockData.getBlockId());
if (filePath != null && blockInfo.get(getUniqueBlockIdFromFile(blockData.getBlockId(), filePath)) != null) {
uploadBlockIntoS3(blockData);
waitIterator.remove();
}
}
}
/**
* Upload the block into S3 bucket.
* @param tuple block data
*/
protected void uploadBlockIntoS3(AbstractBlockReader.ReaderRecord<Slice> tuple)
{
if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) {
return;
}
// Check whether the block metadata is present for this block
if (blockIdToFilePath.get(tuple.getBlockId()) == null) {
if (!waitingTuples.contains(tuple)) {
waitingTuples.add(tuple);
}
return;
}
String uniqueBlockId = getUniqueBlockIdFromFile(tuple.getBlockId(), blockIdToFilePath.get(tuple.getBlockId()));
S3BlockMetaData metaData = blockInfo.get(uniqueBlockId);
// Check whether the file metadata is received
if (metaData == null) {
if (!waitingTuples.contains(tuple)) {
waitingTuples.add(tuple);
}
return;
}
long partSize = tuple.getRecord().length;
PartETag partETag = null;
ByteArrayInputStream bis = new ByteArrayInputStream(tuple.getRecord().buffer);
// Check if it is a Single block of a file
if (metaData.isLastBlock && metaData.partNo == 1) {
ObjectMetadata omd = createObjectMetadata();
omd.setContentLength(partSize);
PutObjectResult result = s3Client.putObject(new PutObjectRequest(bucketName, metaData.getKeyName(), bis, omd));
partETag = new PartETag(1, result.getETag());
} else {
// Else upload use multi-part feature
try {
// Create request to upload a part.
UploadPartRequest uploadRequest = new UploadPartRequest().withBucketName(bucketName).withKey(metaData.getKeyName())
.withUploadId(metaData.getUploadId()).withPartNumber(metaData.getPartNo()).withInputStream(bis).withPartSize(partSize);
partETag = s3Client.uploadPart(uploadRequest).getPartETag();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
UploadBlockMetadata uploadmetadata = new UploadBlockMetadata(partETag, metaData.getKeyName());
output.emit(uploadmetadata);
currentWindowRecoveryState.put(uniqueBlockId, uploadmetadata);
try {
bis.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* Creates the empty object metadata for initiate multipart upload request.
* @return the ObjectMetadata
*/
public ObjectMetadata createObjectMetadata()
{
return new ObjectMetadata();
}
@Override
public void beforeCheckpoint(long windowId)
{
}
@Override
public void checkpointed(long windowId)
{
}
@Override
public void committed(long windowId)
{
try {
windowDataManager.committed(windowId);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void handleIdleTime()
{
if (waitingTuples.size() > 0) {
processWaitBlocks();
}
}
/**
* Upload block metadata consists of partETag and key name.
*/
public static class UploadBlockMetadata
{
@FieldSerializer.Bind(JavaSerializer.class)
private PartETag partETag;
private String keyName;
// For Kryo
public UploadBlockMetadata()
{
}
public UploadBlockMetadata(PartETag partETag, String keyName)
{
this.partETag = partETag;
this.keyName = keyName;
}
/**
* Get the partETag of the block
* @return the partETag
*/
public PartETag getPartETag()
{
return partETag;
}
/**
* Return the key name of the file
* @return key name
*/
public String getKeyName()
{
return keyName;
}
@Override
public int hashCode()
{
return keyName.hashCode();
}
}
/**
* S3 Block meta data consists of keyname, upload Id, part number and whether the block is last block or not.
*/
public static class S3BlockMetaData
{
private String keyName;
private String uploadId;
private Integer partNo;
private boolean isLastBlock;
// For Kryo Serialization
public S3BlockMetaData()
{
}
public S3BlockMetaData(String keyName, String uploadId, Integer partNo)
{
this.keyName = keyName;
this.uploadId = uploadId;
this.partNo = partNo;
this.isLastBlock = false;
}
/**
* Return the key name of the file
* @return key name
*/
public String getKeyName()
{
return keyName;
}
/**
* Return the upload id of the block
* @return the uplaod id
*/
public String getUploadId()
{
return uploadId;
}
/**
* Return the part number of the block
* @return the part number
*/
public Integer getPartNo()
{
return partNo;
}
/**
* Specifies whether the block is last or not.
* @return isLastBlock
*/
public boolean isLastBlock()
{
return isLastBlock;
}
/**
* Sets the block is last or not.
* @param lastBlock Specifies whether the block is last or not
*/
public void setLastBlock(boolean lastBlock)
{
isLastBlock = lastBlock;
}
}
/**
* Returns the name of the bucket in which to upload the blocks.
* @return bucket name
*/
public String getBucketName()
{
return bucketName;
}
/**
* Sets the name of the bucket in which to upload the blocks.
* @param bucketName bucket name
*/
public void setBucketName(@NotNull String bucketName)
{
this.bucketName = Preconditions.checkNotNull(bucketName);
}
/**
* Return the AWS access key
* @return access key
*/
public String getAccessKey()
{
return accessKey;
}
/**
* Sets the AWS access key
* @param accessKey access key
*/
public void setAccessKey(@NotNull String accessKey)
{
this.accessKey = Preconditions.checkNotNull(accessKey);
}
/**
* Return the AWS access key
* @return access key
*/
public String getSecretAccessKey()
{
return secretAccessKey;
}
/**
* Sets the AWS access key
* @param secretAccessKey access key
*/
public void setSecretAccessKey(@NotNull String secretAccessKey)
{
this.secretAccessKey = Preconditions.checkNotNull(secretAccessKey);
}
/**
* Return the AWS S3 end point
* @return S3 end point
*/
public String getEndPoint()
{
return endPoint;
}
/**
* Sets the AWS S3 end point
* @param endPoint S3 end point
*/
public void setEndPoint(String endPoint)
{
this.endPoint = endPoint;
}
}