| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.apex.malhar.lib.fs.s3; |
| |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| |
| import javax.validation.constraints.NotNull; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.apex.malhar.lib.fs.FSRecordCompactionOperator; |
| import org.apache.apex.malhar.lib.io.fs.AbstractReconciler; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| |
| import com.amazonaws.auth.BasicAWSCredentials; |
| import com.amazonaws.regions.Region; |
| import com.amazonaws.regions.Regions; |
| import com.amazonaws.services.s3.AmazonS3; |
| import com.amazonaws.services.s3.AmazonS3Client; |
| import com.amazonaws.services.s3.model.ObjectMetadata; |
| import com.amazonaws.services.s3.model.PutObjectRequest; |
| import com.amazonaws.services.s3.model.PutObjectResult; |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| |
| import com.datatorrent.api.Context; |
| import com.datatorrent.api.DAG; |
| import com.datatorrent.api.DefaultOutputPort; |
| |
| /** |
| * This operator uploads files to Amazon S3 after files are finalized and |
| * frozen by the committed callback. |
| * |
| * S3TupleOutputModule uses this operator in conjunction with S3CompactionOperator |
| * |
| * @since 3.7.0 |
| */ |
| @org.apache.hadoop.classification.InterfaceStability.Evolving |
| public class S3Reconciler extends AbstractReconciler<FSRecordCompactionOperator.OutputMetaData, FSRecordCompactionOperator.OutputMetaData> |
| { |
| /** |
| * Access key id for Amazon S3 |
| */ |
| @NotNull |
| private String accessKey; |
| |
| /** |
| * Secret key for Amazon S3 |
| */ |
| @NotNull |
| private String secretKey; |
| |
| /** |
| * Bucket name for data upload |
| */ |
| @NotNull |
| private String bucketName; |
| |
| /** |
| * S3 Region |
| */ |
| private String region; |
| |
| /** |
| * Directory name under S3 bucket |
| */ |
| @NotNull |
| private String directoryName; |
| |
| /** |
| * Client instance for connecting to Amazon S3 |
| */ |
| protected transient AmazonS3 s3client; |
| |
| /** |
| * FileSystem instance for reading intermediate directory |
| */ |
| protected transient FileSystem fs; |
| |
| protected transient String filePath; |
| |
| private static final String TMP_EXTENSION = ".tmp"; |
| |
| public final transient DefaultOutputPort<FSRecordCompactionOperator.OutputMetaData> outputPort = new DefaultOutputPort<>(); |
| |
| @Override |
| public void setup(Context.OperatorContext context) |
| { |
| s3client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretKey)); |
| if (region != null) { |
| s3client.setRegion(Region.getRegion(Regions.fromName(region))); |
| } |
| filePath = context.getValue(DAG.APPLICATION_PATH); |
| try { |
| fs = FileSystem.newInstance(new Path(filePath).toUri(), new Configuration()); |
| } catch (IOException e) { |
| logger.error("Unable to create FileSystem: {}", e.getMessage()); |
| } |
| super.setup(context); |
| } |
| |
| /** |
| * Enques the tuple for processing after committed callback |
| */ |
| @Override |
| protected void processTuple(FSRecordCompactionOperator.OutputMetaData outputMetaData) |
| { |
| logger.debug("enque : {}", outputMetaData); |
| enqueueForProcessing(outputMetaData); |
| } |
| |
| /** |
| * Uploads the file on Amazon S3 using putObject API from S3 client |
| */ |
| @Override |
| protected void processCommittedData(FSRecordCompactionOperator.OutputMetaData outputMetaData) |
| { |
| try { |
| Path path = new Path(outputMetaData.getPath()); |
| if (fs.exists(path) == false) { |
| logger.debug("Ignoring non-existent path assuming replay : {}", path); |
| return; |
| } |
| FSDataInputStream fsinput = fs.open(path); |
| ObjectMetadata omd = new ObjectMetadata(); |
| omd.setContentLength(outputMetaData.getSize()); |
| String keyName = directoryName + Path.SEPARATOR + outputMetaData.getFileName(); |
| PutObjectRequest request = new PutObjectRequest(bucketName, keyName, fsinput, omd); |
| if (outputMetaData.getSize() < Integer.MAX_VALUE) { |
| request.getRequestClientOptions().setReadLimit((int)outputMetaData.getSize()); |
| } else { |
| throw new RuntimeException("PutRequestSize greater than Integer.MAX_VALUE"); |
| } |
| if (fs.exists(path)) { |
| PutObjectResult result = s3client.putObject(request); |
| logger.debug("File {} Uploaded at {}", keyName, result.getETag()); |
| } |
| } catch (FileNotFoundException e) { |
| logger.debug("Ignoring non-existent path assuming replay : {}", outputMetaData.getPath()); |
| } catch (IOException e) { |
| logger.error("Unable to create Stream: {}", e.getMessage()); |
| } |
| } |
| |
| /** |
| * Clears intermediate/temporary files if any |
| */ |
| @Override |
| public void endWindow() |
| { |
| while (doneTuples.peek() != null) { |
| FSRecordCompactionOperator.OutputMetaData metaData = doneTuples.poll(); |
| removeIntermediateFiles(metaData); |
| if (outputPort.isConnected()) { |
| // Emit the meta data with S3 path |
| metaData.setPath(getDirectoryName() + Path.SEPARATOR + metaData.getFileName()); |
| outputPort.emit(metaData); |
| } |
| } |
| } |
| |
| /** |
| * Remove intermediate files |
| */ |
| protected void removeIntermediateFiles(FSRecordCompactionOperator.OutputMetaData metaData) |
| { |
| logger.debug("found metaData = {}", metaData); |
| committedTuples.remove(metaData); |
| try { |
| Path dest = new Path(metaData.getPath()); |
| //Deleting the intermediate files and when writing to tmp files |
| // there can be vagrant tmp files which we have to clean |
| FileStatus[] statuses = fs.listStatus(dest.getParent()); |
| |
| for (FileStatus status : statuses) { |
| String statusName = status.getPath().getName(); |
| if (statusName.endsWith(TMP_EXTENSION) && statusName.startsWith(metaData.getFileName())) { |
| //a tmp file has tmp extension always preceded by timestamp |
| String actualFileName = statusName.substring(0, |
| statusName.lastIndexOf('.', statusName.lastIndexOf('.') - 1)); |
| logger.debug("actualFileName = {}", actualFileName); |
| if (metaData.getFileName().equals(actualFileName)) { |
| logger.debug("deleting stray file {}", statusName); |
| fs.delete(status.getPath(), true); |
| } |
| } else if (statusName.equals(metaData.getFileName())) { |
| logger.info("deleting s3-compaction file {}", statusName); |
| fs.delete(status.getPath(), true); |
| } |
| } |
| } catch (IOException e) { |
| logger.error("Unable to Delete a file: {}", metaData.getFileName()); |
| } |
| } |
| |
| /** |
| * Get access key id |
| * |
| * @return Access key id for Amazon S3 |
| */ |
| public String getAccessKey() |
| { |
| return accessKey; |
| } |
| |
| /** |
| * Set access key id |
| * |
| * @param accessKey |
| * Access key id for Amazon S3 |
| */ |
| public void setAccessKey(@NotNull String accessKey) |
| { |
| this.accessKey = Preconditions.checkNotNull(accessKey); |
| } |
| |
| /** |
| * Get secret key |
| * |
| * @return Secret key for Amazon S3 |
| */ |
| public String getSecretKey() |
| { |
| return secretKey; |
| } |
| |
| /** |
| * Set secret key |
| * |
| * @param secretKey |
| * Secret key for Amazon S3 |
| */ |
| public void setSecretKey(@NotNull String secretKey) |
| { |
| this.secretKey = Preconditions.checkNotNull(secretKey); |
| } |
| |
| /** |
| * Get bucket name |
| * |
| * @return Bucket name for data upload |
| */ |
| public String getBucketName() |
| { |
| return bucketName; |
| } |
| |
| /** |
| * Set bucket name |
| * |
| * @param bucketName |
| * Bucket name for data upload |
| */ |
| public void setBucketName(@NotNull String bucketName) |
| { |
| this.bucketName = Preconditions.checkNotNull(bucketName); |
| } |
| |
| /** |
| * Get directory name |
| * |
| * @return Directory name under S3 bucket |
| */ |
| public String getDirectoryName() |
| { |
| return directoryName; |
| } |
| |
| /** |
| * Set directory name |
| * |
| * @param directoryName |
| * Directory name under S3 bucket |
| */ |
| public void setDirectoryName(@NotNull String directoryName) |
| { |
| this.directoryName = Preconditions.checkNotNull(directoryName); |
| } |
| |
| /** |
| * Get the AWS S3 Region |
| * @return region |
| */ |
| public String getRegion() |
| { |
| return region; |
| } |
| |
| /** |
| * Set the AWS S3 Region |
| * @param region region |
| */ |
| public void setRegion(String region) |
| { |
| this.region = region; |
| } |
| |
| /** |
| * Set Amazon S3 client |
| * |
| * @param s3client |
| * Client for Amazon S3 |
| */ |
| @VisibleForTesting |
| void setS3client(AmazonS3 s3client) |
| { |
| this.s3client = s3client; |
| } |
| |
| private static final Logger logger = LoggerFactory.getLogger(S3Reconciler.class); |
| |
| } |