blob: 9792a7816c3e747090244161368f767e108ffcdc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import com.aliyun.oss.ClientConfiguration;
import com.aliyun.oss.ClientException;
import com.aliyun.oss.OSSClient;
import com.aliyun.oss.OSSException;
import com.aliyun.oss.common.auth.CredentialsProvider;
import com.aliyun.oss.common.comm.Protocol;
import com.aliyun.oss.model.AbortMultipartUploadRequest;
import com.aliyun.oss.model.CannedAccessControlList;
import com.aliyun.oss.model.CompleteMultipartUploadRequest;
import com.aliyun.oss.model.CompleteMultipartUploadResult;
import com.aliyun.oss.model.CopyObjectResult;
import com.aliyun.oss.model.DeleteObjectsRequest;
import com.aliyun.oss.model.GetObjectRequest;
import com.aliyun.oss.model.InitiateMultipartUploadRequest;
import com.aliyun.oss.model.InitiateMultipartUploadResult;
import com.aliyun.oss.model.ListObjectsRequest;
import com.aliyun.oss.model.ObjectMetadata;
import com.aliyun.oss.model.ObjectListing;
import com.aliyun.oss.model.OSSObjectSummary;
import com.aliyun.oss.model.PartETag;
import com.aliyun.oss.model.PutObjectResult;
import com.aliyun.oss.model.UploadPartCopyRequest;
import com.aliyun.oss.model.UploadPartCopyResult;
import com.aliyun.oss.model.UploadPartRequest;
import com.aliyun.oss.model.UploadPartResult;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
/**
* Core implementation of Aliyun OSS Filesystem for Hadoop.
* Provides the bridging logic between Hadoop's abstract filesystem and
* Aliyun OSS.
*/
public class AliyunOSSFileSystemStore {
public static final Logger LOG =
LoggerFactory.getLogger(AliyunOSSFileSystemStore.class);
private FileSystem.Statistics statistics;
private OSSClient ossClient;
private String bucketName;
private long uploadPartSize;
private long multipartThreshold;
private long partSize;
private int maxKeys;
private String serverSideEncryptionAlgorithm;
public void initialize(URI uri, Configuration conf,
FileSystem.Statistics stat) throws IOException {
statistics = stat;
ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS_KEY,
MAXIMUM_CONNECTIONS_DEFAULT));
boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS_KEY,
SECURE_CONNECTIONS_DEFAULT);
clientConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
clientConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES_KEY,
MAX_ERROR_RETRIES_DEFAULT));
clientConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT_KEY,
ESTABLISH_TIMEOUT_DEFAULT));
clientConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT_KEY,
SOCKET_TIMEOUT_DEFAULT));
String proxyHost = conf.getTrimmed(PROXY_HOST_KEY, "");
int proxyPort = conf.getInt(PROXY_PORT_KEY, -1);
if (StringUtils.isNotEmpty(proxyHost)) {
clientConf.setProxyHost(proxyHost);
if (proxyPort >= 0) {
clientConf.setProxyPort(proxyPort);
} else {
if (secureConnections) {
LOG.warn("Proxy host set without port. Using HTTPS default 443");
clientConf.setProxyPort(443);
} else {
LOG.warn("Proxy host set without port. Using HTTP default 80");
clientConf.setProxyPort(80);
}
}
String proxyUsername = conf.getTrimmed(PROXY_USERNAME_KEY);
String proxyPassword = conf.getTrimmed(PROXY_PASSWORD_KEY);
if ((proxyUsername == null) != (proxyPassword == null)) {
String msg = "Proxy error: " + PROXY_USERNAME_KEY + " or " +
PROXY_PASSWORD_KEY + " set without the other.";
LOG.error(msg);
throw new IllegalArgumentException(msg);
}
clientConf.setProxyUsername(proxyUsername);
clientConf.setProxyPassword(proxyPassword);
clientConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN_KEY));
clientConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION_KEY));
} else if (proxyPort >= 0) {
String msg = "Proxy error: " + PROXY_PORT_KEY + " set without " +
PROXY_HOST_KEY;
LOG.error(msg);
throw new IllegalArgumentException(msg);
}
String endPoint = conf.getTrimmed(ENDPOINT_KEY, "");
CredentialsProvider provider =
AliyunOSSUtils.getCredentialsProvider(conf);
ossClient = new OSSClient(endPoint, provider, clientConf);
uploadPartSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY,
MULTIPART_UPLOAD_SIZE_DEFAULT);
multipartThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY,
MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT);
partSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY,
MULTIPART_UPLOAD_SIZE_DEFAULT);
if (partSize < MIN_MULTIPART_UPLOAD_PART_SIZE) {
partSize = MIN_MULTIPART_UPLOAD_PART_SIZE;
}
serverSideEncryptionAlgorithm =
conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY, "");
if (uploadPartSize < 5 * 1024 * 1024) {
LOG.warn(MULTIPART_UPLOAD_SIZE_KEY + " must be at least 5 MB");
uploadPartSize = 5 * 1024 * 1024;
}
if (multipartThreshold < 5 * 1024 * 1024) {
LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be at least 5 MB");
multipartThreshold = 5 * 1024 * 1024;
}
if (multipartThreshold > 1024 * 1024 * 1024) {
LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be less than 1 GB");
multipartThreshold = 1024 * 1024 * 1024;
}
String cannedACLName = conf.get(CANNED_ACL_KEY, CANNED_ACL_DEFAULT);
if (StringUtils.isNotEmpty(cannedACLName)) {
CannedAccessControlList cannedACL =
CannedAccessControlList.valueOf(cannedACLName);
ossClient.setBucketAcl(bucketName, cannedACL);
}
maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
bucketName = uri.getHost();
}
/**
* Delete an object, and update write operation statistics.
*
* @param key key to blob to delete.
*/
public void deleteObject(String key) {
ossClient.deleteObject(bucketName, key);
statistics.incrementWriteOps(1);
}
/**
* Delete a list of keys, and update write operation statistics.
*
* @param keysToDelete collection of keys to delete.
*/
public void deleteObjects(List<String> keysToDelete) {
if (CollectionUtils.isNotEmpty(keysToDelete)) {
DeleteObjectsRequest deleteRequest =
new DeleteObjectsRequest(bucketName);
deleteRequest.setKeys(keysToDelete);
ossClient.deleteObjects(deleteRequest);
statistics.incrementWriteOps(keysToDelete.size());
}
}
/**
* Delete a directory from Aliyun OSS.
*
* @param key directory key to delete.
*/
public void deleteDirs(String key) {
key = AliyunOSSUtils.maybeAddTrailingSlash(key);
ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
listRequest.setPrefix(key);
listRequest.setDelimiter(null);
listRequest.setMaxKeys(maxKeys);
while (true) {
ObjectListing objects = ossClient.listObjects(listRequest);
statistics.incrementReadOps(1);
List<String> keysToDelete = new ArrayList<String>();
for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
keysToDelete.add(objectSummary.getKey());
}
deleteObjects(keysToDelete);
if (objects.isTruncated()) {
listRequest.setMarker(objects.getNextMarker());
} else {
break;
}
}
}
/**
* Return metadata of a given object key.
*
* @param key object key.
* @return return null if key does not exist.
*/
public ObjectMetadata getObjectMetadata(String key) {
try {
return ossClient.getObjectMetadata(bucketName, key);
} catch (OSSException osse) {
return null;
} finally {
statistics.incrementReadOps(1);
}
}
/**
* Upload an empty file as an OSS object, using single upload.
*
* @param key object key.
* @throws IOException if failed to upload object.
*/
public void storeEmptyFile(String key) throws IOException {
ObjectMetadata dirMeta = new ObjectMetadata();
byte[] buffer = new byte[0];
ByteArrayInputStream in = new ByteArrayInputStream(buffer);
dirMeta.setContentLength(0);
try {
ossClient.putObject(bucketName, key, in, dirMeta);
} finally {
in.close();
}
}
/**
* Copy an object from source key to destination key.
*
* @param srcKey source key.
* @param dstKey destination key.
* @return true if file is successfully copied.
*/
public boolean copyFile(String srcKey, String dstKey) {
ObjectMetadata objectMeta =
ossClient.getObjectMetadata(bucketName, srcKey);
long contentLength = objectMeta.getContentLength();
if (contentLength <= multipartThreshold) {
return singleCopy(srcKey, dstKey);
} else {
return multipartCopy(srcKey, contentLength, dstKey);
}
}
/**
* Use single copy to copy an OSS object.
* (The caller should make sure srcPath is a file and dstPath is valid)
*
* @param srcKey source key.
* @param dstKey destination key.
* @return true if object is successfully copied.
*/
private boolean singleCopy(String srcKey, String dstKey) {
CopyObjectResult copyResult =
ossClient.copyObject(bucketName, srcKey, bucketName, dstKey);
LOG.debug(copyResult.getETag());
return true;
}
/**
* Use multipart copy to copy an OSS object.
* (The caller should make sure srcPath is a file and dstPath is valid)
*
* @param srcKey source key.
* @param contentLength data size of the object to copy.
* @param dstKey destination key.
* @return true if success, or false if upload is aborted.
*/
private boolean multipartCopy(String srcKey, long contentLength,
String dstKey) {
long realPartSize =
AliyunOSSUtils.calculatePartSize(contentLength, uploadPartSize);
int partNum = (int) (contentLength / realPartSize);
if (contentLength % realPartSize != 0) {
partNum++;
}
InitiateMultipartUploadRequest initiateMultipartUploadRequest =
new InitiateMultipartUploadRequest(bucketName, dstKey);
ObjectMetadata meta = new ObjectMetadata();
if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) {
meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
}
initiateMultipartUploadRequest.setObjectMetadata(meta);
InitiateMultipartUploadResult initiateMultipartUploadResult =
ossClient.initiateMultipartUpload(initiateMultipartUploadRequest);
String uploadId = initiateMultipartUploadResult.getUploadId();
List<PartETag> partETags = new ArrayList<PartETag>();
try {
for (int i = 0; i < partNum; i++) {
long skipBytes = realPartSize * i;
long size = (realPartSize < contentLength - skipBytes) ?
realPartSize : contentLength - skipBytes;
UploadPartCopyRequest partCopyRequest = new UploadPartCopyRequest();
partCopyRequest.setSourceBucketName(bucketName);
partCopyRequest.setSourceKey(srcKey);
partCopyRequest.setBucketName(bucketName);
partCopyRequest.setKey(dstKey);
partCopyRequest.setUploadId(uploadId);
partCopyRequest.setPartSize(size);
partCopyRequest.setBeginIndex(skipBytes);
partCopyRequest.setPartNumber(i + 1);
UploadPartCopyResult partCopyResult =
ossClient.uploadPartCopy(partCopyRequest);
statistics.incrementWriteOps(1);
partETags.add(partCopyResult.getPartETag());
}
CompleteMultipartUploadRequest completeMultipartUploadRequest =
new CompleteMultipartUploadRequest(bucketName, dstKey,
uploadId, partETags);
CompleteMultipartUploadResult completeMultipartUploadResult =
ossClient.completeMultipartUpload(completeMultipartUploadRequest);
LOG.debug(completeMultipartUploadResult.getETag());
return true;
} catch (OSSException | ClientException e) {
AbortMultipartUploadRequest abortMultipartUploadRequest =
new AbortMultipartUploadRequest(bucketName, dstKey, uploadId);
ossClient.abortMultipartUpload(abortMultipartUploadRequest);
return false;
}
}
/**
* Upload a file as an OSS object, using single upload.
*
* @param key object key.
* @param file local file to upload.
* @throws IOException if failed to upload object.
*/
public void uploadObject(String key, File file) throws IOException {
File object = file.getAbsoluteFile();
FileInputStream fis = new FileInputStream(object);
ObjectMetadata meta = new ObjectMetadata();
meta.setContentLength(object.length());
if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) {
meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
}
try {
PutObjectResult result = ossClient.putObject(bucketName, key, fis, meta);
LOG.debug(result.getETag());
statistics.incrementWriteOps(1);
} finally {
fis.close();
}
}
/**
* Upload a file as an OSS object, using multipart upload.
*
* @param key object key.
* @param file local file to upload.
* @throws IOException if failed to upload object.
*/
public void multipartUploadObject(String key, File file) throws IOException {
File object = file.getAbsoluteFile();
long dataLen = object.length();
long realPartSize = AliyunOSSUtils.calculatePartSize(dataLen, partSize);
int partNum = (int) (dataLen / realPartSize);
if (dataLen % realPartSize != 0) {
partNum += 1;
}
InitiateMultipartUploadRequest initiateMultipartUploadRequest =
new InitiateMultipartUploadRequest(bucketName, key);
ObjectMetadata meta = new ObjectMetadata();
if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) {
meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
}
initiateMultipartUploadRequest.setObjectMetadata(meta);
InitiateMultipartUploadResult initiateMultipartUploadResult =
ossClient.initiateMultipartUpload(initiateMultipartUploadRequest);
List<PartETag> partETags = new ArrayList<PartETag>();
String uploadId = initiateMultipartUploadResult.getUploadId();
try {
for (int i = 0; i < partNum; i++) {
// TODO: Optimize this, avoid opening the object multiple times
FileInputStream fis = new FileInputStream(object);
try {
long skipBytes = realPartSize * i;
AliyunOSSUtils.skipFully(fis, skipBytes);
long size = (realPartSize < dataLen - skipBytes) ?
realPartSize : dataLen - skipBytes;
UploadPartRequest uploadPartRequest = new UploadPartRequest();
uploadPartRequest.setBucketName(bucketName);
uploadPartRequest.setKey(key);
uploadPartRequest.setUploadId(uploadId);
uploadPartRequest.setInputStream(fis);
uploadPartRequest.setPartSize(size);
uploadPartRequest.setPartNumber(i + 1);
UploadPartResult uploadPartResult =
ossClient.uploadPart(uploadPartRequest);
statistics.incrementWriteOps(1);
partETags.add(uploadPartResult.getPartETag());
} finally {
fis.close();
}
}
CompleteMultipartUploadRequest completeMultipartUploadRequest =
new CompleteMultipartUploadRequest(bucketName, key,
uploadId, partETags);
CompleteMultipartUploadResult completeMultipartUploadResult =
ossClient.completeMultipartUpload(completeMultipartUploadRequest);
LOG.debug(completeMultipartUploadResult.getETag());
} catch (OSSException | ClientException e) {
AbortMultipartUploadRequest abortMultipartUploadRequest =
new AbortMultipartUploadRequest(bucketName, key, uploadId);
ossClient.abortMultipartUpload(abortMultipartUploadRequest);
}
}
/**
* list objects.
*
* @param prefix prefix.
* @param maxListingLength max no. of entries
* @param marker last key in any previous search.
* @param recursive whether to list directory recursively.
* @return a list of matches.
*/
public ObjectListing listObjects(String prefix, int maxListingLength,
String marker, boolean recursive) {
String delimiter = recursive ? null : "/";
prefix = AliyunOSSUtils.maybeAddTrailingSlash(prefix);
ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
listRequest.setPrefix(prefix);
listRequest.setDelimiter(delimiter);
listRequest.setMaxKeys(maxListingLength);
listRequest.setMarker(marker);
ObjectListing listing = ossClient.listObjects(listRequest);
statistics.incrementReadOps(1);
return listing;
}
/**
* Retrieve a part of an object.
*
* @param key the object name that is being retrieved from the Aliyun OSS.
* @param byteStart start position.
* @param byteEnd end position.
* @return This method returns null if the key is not found.
*/
public InputStream retrieve(String key, long byteStart, long byteEnd) {
try {
GetObjectRequest request = new GetObjectRequest(bucketName, key);
request.setRange(byteStart, byteEnd);
return ossClient.getObject(request).getObjectContent();
} catch (OSSException | ClientException e) {
return null;
}
}
/**
* Close OSS client properly.
*/
public void close() {
if (ossClient != null) {
ossClient.shutdown();
ossClient = null;
}
}
/**
* Clean up all objects matching the prefix.
*
* @param prefix Aliyun OSS object prefix.
*/
public void purge(String prefix) {
String key;
try {
ObjectListing objects = listObjects(prefix, maxKeys, null, true);
for (OSSObjectSummary object : objects.getObjectSummaries()) {
key = object.getKey();
ossClient.deleteObject(bucketName, key);
}
for (String dir: objects.getCommonPrefixes()) {
deleteDirs(dir);
}
} catch (OSSException | ClientException e) {
LOG.error("Failed to purge " + prefix);
}
}
}