blob: d2484c0e47b3cc84ab17f05b7feff295ead6ea0e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.cosn;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import com.qcloud.cos.COSClient;
import com.qcloud.cos.ClientConfig;
import com.qcloud.cos.auth.BasicCOSCredentials;
import com.qcloud.cos.auth.COSCredentials;
import com.qcloud.cos.endpoint.SuffixEndpointBuilder;
import com.qcloud.cos.exception.CosClientException;
import com.qcloud.cos.exception.CosServiceException;
import com.qcloud.cos.http.HttpProtocol;
import com.qcloud.cos.model.AbortMultipartUploadRequest;
import com.qcloud.cos.model.COSObject;
import com.qcloud.cos.model.COSObjectSummary;
import com.qcloud.cos.model.CompleteMultipartUploadRequest;
import com.qcloud.cos.model.CompleteMultipartUploadResult;
import com.qcloud.cos.model.CopyObjectRequest;
import com.qcloud.cos.model.DeleteObjectRequest;
import com.qcloud.cos.model.GetObjectMetadataRequest;
import com.qcloud.cos.model.GetObjectRequest;
import com.qcloud.cos.model.InitiateMultipartUploadRequest;
import com.qcloud.cos.model.InitiateMultipartUploadResult;
import com.qcloud.cos.model.ListObjectsRequest;
import com.qcloud.cos.model.ObjectListing;
import com.qcloud.cos.model.ObjectMetadata;
import com.qcloud.cos.model.PartETag;
import com.qcloud.cos.model.PutObjectRequest;
import com.qcloud.cos.model.PutObjectResult;
import com.qcloud.cos.model.UploadPartRequest;
import com.qcloud.cos.model.UploadPartResult;
import com.qcloud.cos.region.Region;
import com.qcloud.cos.utils.Base64;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.cosn.auth.COSCredentialsProviderList;
import org.apache.hadoop.util.VersionInfo;
import org.apache.http.HttpStatus;
/**
* The class actually performs access operation to the COS blob store.
* It provides the bridging logic for the Hadoop's abstract filesystem and COS.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
class CosNativeFileSystemStore implements NativeFileSystemStore {
private COSClient cosClient;
private String bucketName;
private int maxRetryTimes;
public static final Logger LOG =
LoggerFactory.getLogger(CosNativeFileSystemStore.class);
/**
* Initialize the client to access COS blob storage.
*
* @param conf Hadoop configuration with COS configuration options.
* @throws IOException Initialize the COS client failed,
* caused by incorrect options.
*/
private void initCOSClient(URI uri, Configuration conf) throws IOException {
COSCredentialsProviderList credentialProviderList =
CosNUtils.createCosCredentialsProviderSet(uri, conf);
String region = conf.get(CosNConfigKeys.COSN_REGION_KEY);
String endpointSuffix = conf.get(
CosNConfigKeys.COSN_ENDPOINT_SUFFIX_KEY);
if (null == region && null == endpointSuffix) {
String exceptionMsg = String.format("config %s and %s at least one",
CosNConfigKeys.COSN_REGION_KEY,
CosNConfigKeys.COSN_ENDPOINT_SUFFIX_KEY);
throw new IOException(exceptionMsg);
}
COSCredentials cosCred;
cosCred = new BasicCOSCredentials(
credentialProviderList.getCredentials().getCOSAccessKeyId(),
credentialProviderList.getCredentials().getCOSSecretKey());
boolean useHttps = conf.getBoolean(CosNConfigKeys.COSN_USE_HTTPS_KEY,
CosNConfigKeys.DEFAULT_USE_HTTPS);
ClientConfig config;
if (null == region) {
config = new ClientConfig(new Region(""));
config.setEndpointBuilder(new SuffixEndpointBuilder(endpointSuffix));
} else {
config = new ClientConfig(new Region(region));
}
if (useHttps) {
config.setHttpProtocol(HttpProtocol.https);
}
config.setUserAgent(conf.get(CosNConfigKeys.USER_AGENT,
CosNConfigKeys.DEFAULT_USER_AGENT) + " For " + " Hadoop "
+ VersionInfo.getVersion());
this.maxRetryTimes = conf.getInt(CosNConfigKeys.COSN_MAX_RETRIES_KEY,
CosNConfigKeys.DEFAULT_MAX_RETRIES);
config.setMaxConnectionsCount(
conf.getInt(CosNConfigKeys.MAX_CONNECTION_NUM,
CosNConfigKeys.DEFAULT_MAX_CONNECTION_NUM));
this.cosClient = new COSClient(cosCred, config);
}
/**
* Initialize the CosNativeFileSystemStore object, including
* its COS client and default COS bucket.
*
* @param uri The URI of the COS bucket accessed by default.
* @param conf Hadoop configuration with COS configuration options.
* @throws IOException Initialize the COS client failed.
*/
@Override
public void initialize(URI uri, Configuration conf) throws IOException {
try {
initCOSClient(uri, conf);
this.bucketName = uri.getHost();
} catch (Exception e) {
handleException(e, "");
}
}
/**
* Store a file into COS from the specified input stream, which would be
* retried until the success or maximum number.
*
* @param key COS object key.
* @param inputStream Input stream to be uploaded into COS.
* @param md5Hash MD5 value of the content to be uploaded.
* @param length Length of uploaded content.
* @throws IOException Upload the file failed.
*/
private void storeFileWithRetry(String key, InputStream inputStream,
byte[] md5Hash, long length) throws IOException {
try {
ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentMD5(Base64.encodeAsString(md5Hash));
objectMetadata.setContentLength(length);
PutObjectRequest putObjectRequest =
new PutObjectRequest(bucketName, key, inputStream, objectMetadata);
PutObjectResult putObjectResult =
(PutObjectResult) callCOSClientWithRetry(putObjectRequest);
LOG.debug("Store file successfully. COS key: [{}], ETag: [{}].",
key, putObjectResult.getETag());
} catch (Exception e) {
String errMsg = String.format("Store file failed. COS key: [%s], "
+ "exception: [%s]", key, e.toString());
LOG.error(errMsg);
handleException(new Exception(errMsg), key);
}
}
/**
* Store a local file into COS.
*
* @param key COS object key.
* @param file The local file to be uploaded.
* @param md5Hash The MD5 value of the file to be uploaded.
* @throws IOException Upload the file failed.
*/
@Override
public void storeFile(String key, File file, byte[] md5Hash)
throws IOException {
LOG.info("Store file from local path: [{}]. file length: [{}] COS key: " +
"[{}]", file.getCanonicalPath(), file.length(), key);
storeFileWithRetry(key, new BufferedInputStream(new FileInputStream(file)),
md5Hash, file.length());
}
/**
* Store a file into COS from the specified input stream.
*
* @param key COS object key.
* @param inputStream The Input stream to be uploaded.
* @param md5Hash The MD5 value of the content to be uploaded.
* @param contentLength Length of uploaded content.
* @throws IOException Upload the file failed.
*/
@Override
public void storeFile(
String key,
InputStream inputStream,
byte[] md5Hash,
long contentLength) throws IOException {
LOG.info("Store file from input stream. COS key: [{}], "
+ "length: [{}].", key, contentLength);
storeFileWithRetry(key, inputStream, md5Hash, contentLength);
}
// For cos, storeEmptyFile means creating a directory
@Override
public void storeEmptyFile(String key) throws IOException {
if (!key.endsWith(CosNFileSystem.PATH_DELIMITER)) {
key = key + CosNFileSystem.PATH_DELIMITER;
}
ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentLength(0);
InputStream input = new ByteArrayInputStream(new byte[0]);
PutObjectRequest putObjectRequest =
new PutObjectRequest(bucketName, key, input, objectMetadata);
try {
PutObjectResult putObjectResult =
(PutObjectResult) callCOSClientWithRetry(putObjectRequest);
LOG.debug("Store empty file successfully. COS key: [{}], ETag: [{}].",
key, putObjectResult.getETag());
} catch (Exception e) {
String errMsg = String.format("Store empty file failed. "
+ "COS key: [%s], exception: [%s]", key, e.toString());
LOG.error(errMsg);
handleException(new Exception(errMsg), key);
}
}
public PartETag uploadPart(File file, String key, String uploadId,
int partNum) throws IOException {
InputStream inputStream = new FileInputStream(file);
try {
return uploadPart(inputStream, key, uploadId, partNum, file.length());
} finally {
inputStream.close();
}
}
@Override
public PartETag uploadPart(InputStream inputStream, String key,
String uploadId, int partNum, long partSize) throws IOException {
UploadPartRequest uploadPartRequest = new UploadPartRequest();
uploadPartRequest.setBucketName(this.bucketName);
uploadPartRequest.setUploadId(uploadId);
uploadPartRequest.setInputStream(inputStream);
uploadPartRequest.setPartNumber(partNum);
uploadPartRequest.setPartSize(partSize);
uploadPartRequest.setKey(key);
try {
UploadPartResult uploadPartResult =
(UploadPartResult) callCOSClientWithRetry(uploadPartRequest);
return uploadPartResult.getPartETag();
} catch (Exception e) {
String errMsg = String.format("Current thread: [%d], COS key: [%s], "
+ "upload id: [%s], part num: [%d], exception: [%s]",
Thread.currentThread().getId(), key, uploadId, partNum, e.toString());
handleException(new Exception(errMsg), key);
}
return null;
}
public void abortMultipartUpload(String key, String uploadId) {
LOG.info("Abort the multipart upload. COS key: [{}], upload id: [{}].",
key, uploadId);
AbortMultipartUploadRequest abortMultipartUploadRequest =
new AbortMultipartUploadRequest(bucketName, key, uploadId);
cosClient.abortMultipartUpload(abortMultipartUploadRequest);
}
/**
* Initialize a multipart upload and return the upload id.
*
* @param key The COS object key initialized to multipart upload.
* @return The multipart upload id.
*/
public String getUploadId(String key) {
if (null == key || key.length() == 0) {
return "";
}
LOG.info("Initiate a multipart upload. bucket: [{}], COS key: [{}].",
bucketName, key);
InitiateMultipartUploadRequest initiateMultipartUploadRequest =
new InitiateMultipartUploadRequest(bucketName, key);
InitiateMultipartUploadResult initiateMultipartUploadResult =
cosClient.initiateMultipartUpload(initiateMultipartUploadRequest);
return initiateMultipartUploadResult.getUploadId();
}
/**
* Finish a multipart upload process, which will merge all parts uploaded.
*
* @param key The COS object key to be finished.
* @param uploadId The upload id of the multipart upload to be finished.
* @param partETagList The etag list of the part that has been uploaded.
* @return The result object of completing the multipart upload process.
*/
public CompleteMultipartUploadResult completeMultipartUpload(
String key, String uploadId, List<PartETag> partETagList) {
Collections.sort(partETagList, new Comparator<PartETag>() {
@Override
public int compare(PartETag o1, PartETag o2) {
return o1.getPartNumber() - o2.getPartNumber();
}
});
LOG.info("Complete the multipart upload. bucket: [{}], COS key: [{}], "
+ "upload id: [{}].", bucketName, key, uploadId);
CompleteMultipartUploadRequest completeMultipartUploadRequest =
new CompleteMultipartUploadRequest(
bucketName, key, uploadId, partETagList);
return cosClient.completeMultipartUpload(completeMultipartUploadRequest);
}
private FileMetadata queryObjectMetadata(String key) throws IOException {
GetObjectMetadataRequest getObjectMetadataRequest =
new GetObjectMetadataRequest(bucketName, key);
try {
ObjectMetadata objectMetadata =
(ObjectMetadata) callCOSClientWithRetry(getObjectMetadataRequest);
long mtime = 0;
if (objectMetadata.getLastModified() != null) {
mtime = objectMetadata.getLastModified().getTime();
}
long fileSize = objectMetadata.getContentLength();
FileMetadata fileMetadata = new FileMetadata(key, fileSize, mtime,
!key.endsWith(CosNFileSystem.PATH_DELIMITER));
LOG.debug("Retrieve file metadata. COS key: [{}], ETag: [{}], "
+ "length: [{}].", key, objectMetadata.getETag(),
objectMetadata.getContentLength());
return fileMetadata;
} catch (CosServiceException e) {
if (e.getStatusCode() != HttpStatus.SC_NOT_FOUND) {
String errorMsg = String.format("Retrieve file metadata file failed. "
+ "COS key: [%s], CosServiceException: [%s].", key, e.toString());
LOG.error(errorMsg);
handleException(new Exception(errorMsg), key);
}
}
return null;
}
@Override
public FileMetadata retrieveMetadata(String key) throws IOException {
if (key.endsWith(CosNFileSystem.PATH_DELIMITER)) {
key = key.substring(0, key.length() - 1);
}
if (!key.isEmpty()) {
FileMetadata fileMetadata = queryObjectMetadata(key);
if (fileMetadata != null) {
return fileMetadata;
}
}
// If the key is a directory.
key = key + CosNFileSystem.PATH_DELIMITER;
return queryObjectMetadata(key);
}
/**
* Download a COS object and return the input stream associated with it.
*
* @param key The object key that is being retrieved from the COS bucket
* @return This method returns null if the key is not found
* @throws IOException if failed to download.
*/
@Override
public InputStream retrieve(String key) throws IOException {
LOG.debug("Retrieve object key: [{}].", key);
GetObjectRequest getObjectRequest =
new GetObjectRequest(this.bucketName, key);
try {
COSObject cosObject =
(COSObject) callCOSClientWithRetry(getObjectRequest);
return cosObject.getObjectContent();
} catch (Exception e) {
String errMsg = String.format("Retrieving key: [%s] occurs "
+ "an exception: [%s].", key, e.toString());
LOG.error("Retrieving COS key: [{}] occurs an exception: [{}].", key, e);
handleException(new Exception(errMsg), key);
}
// never will get here
return null;
}
/**
* Retrieved a part of a COS object, which is specified the start position.
*
* @param key The object key that is being retrieved from
* the COS bucket.
* @param byteRangeStart The start position of the part to be retrieved in
* the object.
* @return The input stream associated with the retrieved object.
* @throws IOException if failed to retrieve.
*/
@Override
public InputStream retrieve(String key, long byteRangeStart)
throws IOException {
try {
LOG.debug("Retrieve COS key:[{}]. range start:[{}].",
key, byteRangeStart);
long fileSize = getFileLength(key);
long byteRangeEnd = fileSize - 1;
GetObjectRequest getObjectRequest =
new GetObjectRequest(this.bucketName, key);
if (byteRangeEnd >= byteRangeStart) {
getObjectRequest.setRange(byteRangeStart, fileSize - 1);
}
COSObject cosObject =
(COSObject) callCOSClientWithRetry(getObjectRequest);
return cosObject.getObjectContent();
} catch (Exception e) {
String errMsg =
String.format("Retrieving COS key: [%s] occurs an exception. " +
"byte range start: [%s], exception: [%s].",
key, byteRangeStart, e.toString());
LOG.error(errMsg);
handleException(new Exception(errMsg), key);
}
// never will get here
return null;
}
/**
* Download a part of a COS object, which is specified the start and
* end position.
*
* @param key The object key that is being downloaded
* @param byteRangeStart The start position of the part to be retrieved in
* the object.
* @param byteRangeEnd The end position of the part to be retrieved in
* the object.
* @return The input stream associated with the retrieved objects.
* @throws IOException If failed to retrieve.
*/
@Override
public InputStream retrieveBlock(String key, long byteRangeStart,
long byteRangeEnd) throws IOException {
try {
GetObjectRequest request = new GetObjectRequest(this.bucketName, key);
request.setRange(byteRangeStart, byteRangeEnd);
COSObject cosObject = (COSObject) this.callCOSClientWithRetry(request);
return cosObject.getObjectContent();
} catch (CosServiceException e) {
String errMsg =
String.format("Retrieving key [%s] with byteRangeStart [%d] occurs " +
"an CosServiceException: [%s].",
key, byteRangeStart, e.toString());
LOG.error(errMsg);
handleException(new Exception(errMsg), key);
return null;
} catch (CosClientException e) {
String errMsg =
String.format("Retrieving key [%s] with byteRangeStart [%d] "
+ "occurs an exception: [%s].",
key, byteRangeStart, e.toString());
LOG.error("Retrieving COS key: [{}] with byteRangeStart: [{}] " +
"occurs an exception: [{}].", key, byteRangeStart, e);
handleException(new Exception(errMsg), key);
}
return null;
}
@Override
public PartialListing list(String prefix, int maxListingLength)
throws IOException {
return list(prefix, maxListingLength, null, false);
}
@Override
public PartialListing list(String prefix, int maxListingLength,
String priorLastKey, boolean recurse) throws IOException {
return list(prefix, recurse ? null : CosNFileSystem.PATH_DELIMITER,
maxListingLength, priorLastKey);
}
/**
* List the metadata for all objects that
* the object key has the specified prefix.
*
* @param prefix The prefix to be listed.
* @param delimiter The delimiter is a sign, the same paths between
* are listed.
* @param maxListingLength The maximum number of listed entries.
* @param priorLastKey The last key in any previous search.
* @return A metadata list on the match.
* @throws IOException If list objects failed.
*/
private PartialListing list(String prefix, String delimiter,
int maxListingLength, String priorLastKey) throws IOException {
LOG.debug("List objects. prefix: [{}], delimiter: [{}], " +
"maxListLength: [{}], priorLastKey: [{}].",
prefix, delimiter, maxListingLength, priorLastKey);
if (!prefix.startsWith(CosNFileSystem.PATH_DELIMITER)) {
prefix += CosNFileSystem.PATH_DELIMITER;
}
ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
listObjectsRequest.setBucketName(bucketName);
listObjectsRequest.setPrefix(prefix);
listObjectsRequest.setDelimiter(delimiter);
listObjectsRequest.setMarker(priorLastKey);
listObjectsRequest.setMaxKeys(maxListingLength);
ObjectListing objectListing = null;
try {
objectListing =
(ObjectListing) callCOSClientWithRetry(listObjectsRequest);
} catch (Exception e) {
String errMsg = String.format("prefix: [%s], delimiter: [%s], "
+ "maxListingLength: [%d], priorLastKey: [%s]. "
+ "List objects occur an exception: [%s].", prefix,
(delimiter == null) ? "" : delimiter, maxListingLength, priorLastKey,
e.toString());
LOG.error(errMsg);
handleException(new Exception(errMsg), prefix);
}
ArrayList<FileMetadata> fileMetadataArray = new ArrayList<>();
ArrayList<FileMetadata> commonPrefixArray = new ArrayList<>();
if (null == objectListing) {
String errMsg = String.format("List the prefix: [%s] failed. " +
"delimiter: [%s], max listing length:" +
" [%s], prior last key: [%s]",
prefix, delimiter, maxListingLength, priorLastKey);
handleException(new Exception(errMsg), prefix);
}
List<COSObjectSummary> summaries = objectListing.getObjectSummaries();
for (COSObjectSummary cosObjectSummary : summaries) {
String filePath = cosObjectSummary.getKey();
if (!filePath.startsWith(CosNFileSystem.PATH_DELIMITER)) {
filePath = CosNFileSystem.PATH_DELIMITER + filePath;
}
if (filePath.equals(prefix)) {
continue;
}
long mtime = 0;
if (cosObjectSummary.getLastModified() != null) {
mtime = cosObjectSummary.getLastModified().getTime();
}
long fileLen = cosObjectSummary.getSize();
fileMetadataArray.add(
new FileMetadata(filePath, fileLen, mtime, true));
}
List<String> commonPrefixes = objectListing.getCommonPrefixes();
for (String commonPrefix : commonPrefixes) {
if (!commonPrefix.startsWith(CosNFileSystem.PATH_DELIMITER)) {
commonPrefix = CosNFileSystem.PATH_DELIMITER + commonPrefix;
}
commonPrefixArray.add(
new FileMetadata(commonPrefix, 0, 0, false));
}
FileMetadata[] fileMetadata = new FileMetadata[fileMetadataArray.size()];
for (int i = 0; i < fileMetadataArray.size(); ++i) {
fileMetadata[i] = fileMetadataArray.get(i);
}
FileMetadata[] commonPrefixMetaData =
new FileMetadata[commonPrefixArray.size()];
for (int i = 0; i < commonPrefixArray.size(); ++i) {
commonPrefixMetaData[i] = commonPrefixArray.get(i);
}
// when truncated is false, it means that listing is finished.
if (!objectListing.isTruncated()) {
return new PartialListing(
null, fileMetadata, commonPrefixMetaData);
} else {
return new PartialListing(
objectListing.getNextMarker(), fileMetadata, commonPrefixMetaData);
}
}
@Override
public void delete(String key) throws IOException {
LOG.debug("Delete object key: [{}] from bucket: {}.", key, this.bucketName);
try {
DeleteObjectRequest deleteObjectRequest =
new DeleteObjectRequest(bucketName, key);
callCOSClientWithRetry(deleteObjectRequest);
} catch (Exception e) {
String errMsg =
String.format("Delete key: [%s] occurs an exception: [%s].",
key, e.toString());
LOG.error(errMsg);
handleException(new Exception(errMsg), key);
}
}
public void rename(String srcKey, String dstKey) throws IOException {
LOG.debug("Rename source key: [{}] to dest key: [{}].", srcKey, dstKey);
try {
CopyObjectRequest copyObjectRequest =
new CopyObjectRequest(bucketName, srcKey, bucketName, dstKey);
callCOSClientWithRetry(copyObjectRequest);
DeleteObjectRequest deleteObjectRequest =
new DeleteObjectRequest(bucketName, srcKey);
callCOSClientWithRetry(deleteObjectRequest);
} catch (Exception e) {
String errMsg = String.format("Rename object unsuccessfully. "
+ "source cos key: [%s], dest COS " +
"key: [%s], exception: [%s]",
srcKey,
dstKey, e.toString());
LOG.error(errMsg);
handleException(new Exception(errMsg), srcKey);
}
}
@Override
public void copy(String srcKey, String dstKey) throws IOException {
LOG.debug("Copy source key: [{}] to dest key: [{}].", srcKey, dstKey);
try {
CopyObjectRequest copyObjectRequest =
new CopyObjectRequest(bucketName, srcKey, bucketName, dstKey);
callCOSClientWithRetry(copyObjectRequest);
} catch (Exception e) {
String errMsg = String.format("Copy object unsuccessfully. "
+ "source COS key: %s, dest COS key: " +
"%s, exception: %s",
srcKey,
dstKey, e.toString());
LOG.error(errMsg);
handleException(new Exception(errMsg), srcKey);
}
}
@Override
public void purge(String prefix) throws IOException {
throw new IOException("purge not supported");
}
@Override
public void dump() throws IOException {
throw new IOException("dump not supported");
}
// process Exception and print detail
private void handleException(Exception e, String key) throws IOException {
String cosPath = CosNFileSystem.SCHEME + "://" + bucketName + key;
String exceptInfo = String.format("%s : %s", cosPath, e.toString());
throw new IOException(exceptInfo);
}
@Override
public long getFileLength(String key) throws IOException {
LOG.debug("Get file length. COS key: {}", key);
GetObjectMetadataRequest getObjectMetadataRequest =
new GetObjectMetadataRequest(bucketName, key);
try {
ObjectMetadata objectMetadata =
(ObjectMetadata) callCOSClientWithRetry(getObjectMetadataRequest);
return objectMetadata.getContentLength();
} catch (Exception e) {
String errMsg = String.format("Getting file length occurs an exception." +
"COS key: %s, exception: %s", key,
e.toString());
LOG.error(errMsg);
handleException(new Exception(errMsg), key);
return 0; // never will get here
}
}
private <X> Object callCOSClientWithRetry(X request)
throws CosServiceException, IOException {
String sdkMethod = "";
int retryIndex = 1;
while (true) {
try {
if (request instanceof PutObjectRequest) {
sdkMethod = "putObject";
return this.cosClient.putObject((PutObjectRequest) request);
} else if (request instanceof UploadPartRequest) {
sdkMethod = "uploadPart";
if (((UploadPartRequest) request).getInputStream()
instanceof ByteBufferInputStream) {
((UploadPartRequest) request).getInputStream()
.mark((int) ((UploadPartRequest) request).getPartSize());
}
return this.cosClient.uploadPart((UploadPartRequest) request);
} else if (request instanceof GetObjectMetadataRequest) {
sdkMethod = "queryObjectMeta";
return this.cosClient.getObjectMetadata(
(GetObjectMetadataRequest) request);
} else if (request instanceof DeleteObjectRequest) {
sdkMethod = "deleteObject";
this.cosClient.deleteObject((DeleteObjectRequest) request);
return new Object();
} else if (request instanceof CopyObjectRequest) {
sdkMethod = "copyFile";
return this.cosClient.copyObject((CopyObjectRequest) request);
} else if (request instanceof GetObjectRequest) {
sdkMethod = "getObject";
return this.cosClient.getObject((GetObjectRequest) request);
} else if (request instanceof ListObjectsRequest) {
sdkMethod = "listObjects";
return this.cosClient.listObjects((ListObjectsRequest) request);
} else {
throw new IOException("no such method");
}
} catch (CosServiceException cse) {
String errMsg = String.format("Call cos sdk failed, "
+ "retryIndex: [%d / %d], "
+ "call method: %s, exception: %s",
retryIndex, this.maxRetryTimes, sdkMethod, cse.toString());
int statusCode = cse.getStatusCode();
// Retry all server errors
if (statusCode / 100 == 5) {
if (retryIndex <= this.maxRetryTimes) {
LOG.info(errMsg);
long sleepLeast = retryIndex * 300L;
long sleepBound = retryIndex * 500L;
try {
if (request instanceof UploadPartRequest) {
if (((UploadPartRequest) request).getInputStream()
instanceof ByteBufferInputStream) {
((UploadPartRequest) request).getInputStream().reset();
}
}
Thread.sleep(
ThreadLocalRandom.current().nextLong(sleepLeast, sleepBound));
++retryIndex;
} catch (InterruptedException e) {
throw new IOException(e.toString());
}
} else {
LOG.error(errMsg);
throw new IOException(errMsg);
}
} else {
throw cse;
}
} catch (Exception e) {
String errMsg = String.format("Call cos sdk failed, "
+ "call method: %s, exception: %s", sdkMethod, e.toString());
LOG.error(errMsg);
throw new IOException(errMsg);
}
}
}
@Override
public void close() {
if (null != this.cosClient) {
this.cosClient.shutdown();
}
}
}