| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.fs.cosn; |
| |
| import java.io.BufferedInputStream; |
| import java.io.ByteArrayInputStream; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.net.URI; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.List; |
| import java.util.concurrent.ThreadLocalRandom; |
| |
| import com.qcloud.cos.COSClient; |
| import com.qcloud.cos.ClientConfig; |
| import com.qcloud.cos.auth.BasicCOSCredentials; |
| import com.qcloud.cos.auth.COSCredentials; |
| import com.qcloud.cos.endpoint.SuffixEndpointBuilder; |
| import com.qcloud.cos.exception.CosClientException; |
| import com.qcloud.cos.exception.CosServiceException; |
| import com.qcloud.cos.http.HttpProtocol; |
| import com.qcloud.cos.model.AbortMultipartUploadRequest; |
| import com.qcloud.cos.model.COSObject; |
| import com.qcloud.cos.model.COSObjectSummary; |
| import com.qcloud.cos.model.CompleteMultipartUploadRequest; |
| import com.qcloud.cos.model.CompleteMultipartUploadResult; |
| import com.qcloud.cos.model.CopyObjectRequest; |
| import com.qcloud.cos.model.DeleteObjectRequest; |
| import com.qcloud.cos.model.GetObjectMetadataRequest; |
| import com.qcloud.cos.model.GetObjectRequest; |
| import com.qcloud.cos.model.InitiateMultipartUploadRequest; |
| import com.qcloud.cos.model.InitiateMultipartUploadResult; |
| import com.qcloud.cos.model.ListObjectsRequest; |
| import com.qcloud.cos.model.ObjectListing; |
| import com.qcloud.cos.model.ObjectMetadata; |
| import com.qcloud.cos.model.PartETag; |
| import com.qcloud.cos.model.PutObjectRequest; |
| import com.qcloud.cos.model.PutObjectResult; |
| import com.qcloud.cos.model.UploadPartRequest; |
| import com.qcloud.cos.model.UploadPartResult; |
| import com.qcloud.cos.region.Region; |
| import com.qcloud.cos.utils.Base64; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.cosn.auth.COSCredentialsProviderList; |
| import org.apache.hadoop.util.VersionInfo; |
| import org.apache.http.HttpStatus; |
| |
| /** |
| * The class actually performs access operation to the COS blob store. |
| * It provides the bridging logic for the Hadoop's abstract filesystem and COS. |
| */ |
| @InterfaceAudience.Private |
| @InterfaceStability.Unstable |
| class CosNativeFileSystemStore implements NativeFileSystemStore { |
| private COSClient cosClient; |
| private String bucketName; |
| private int maxRetryTimes; |
| |
| public static final Logger LOG = |
| LoggerFactory.getLogger(CosNativeFileSystemStore.class); |
| |
| /** |
| * Initialize the client to access COS blob storage. |
| * |
| * @param conf Hadoop configuration with COS configuration options. |
| * @throws IOException Initialize the COS client failed, |
| * caused by incorrect options. |
| */ |
| private void initCOSClient(URI uri, Configuration conf) throws IOException { |
| COSCredentialsProviderList credentialProviderList = |
| CosNUtils.createCosCredentialsProviderSet(uri, conf); |
| String region = conf.get(CosNConfigKeys.COSN_REGION_KEY); |
| String endpointSuffix = conf.get( |
| CosNConfigKeys.COSN_ENDPOINT_SUFFIX_KEY); |
| if (null == region && null == endpointSuffix) { |
| String exceptionMsg = String.format("config %s and %s at least one", |
| CosNConfigKeys.COSN_REGION_KEY, |
| CosNConfigKeys.COSN_ENDPOINT_SUFFIX_KEY); |
| throw new IOException(exceptionMsg); |
| } |
| |
| COSCredentials cosCred; |
| cosCred = new BasicCOSCredentials( |
| credentialProviderList.getCredentials().getCOSAccessKeyId(), |
| credentialProviderList.getCredentials().getCOSSecretKey()); |
| |
| boolean useHttps = conf.getBoolean(CosNConfigKeys.COSN_USE_HTTPS_KEY, |
| CosNConfigKeys.DEFAULT_USE_HTTPS); |
| |
| ClientConfig config; |
| if (null == region) { |
| config = new ClientConfig(new Region("")); |
| config.setEndpointBuilder(new SuffixEndpointBuilder(endpointSuffix)); |
| } else { |
| config = new ClientConfig(new Region(region)); |
| } |
| if (useHttps) { |
| config.setHttpProtocol(HttpProtocol.https); |
| } |
| |
| config.setUserAgent(conf.get(CosNConfigKeys.USER_AGENT, |
| CosNConfigKeys.DEFAULT_USER_AGENT) + " For " + " Hadoop " |
| + VersionInfo.getVersion()); |
| |
| this.maxRetryTimes = conf.getInt(CosNConfigKeys.COSN_MAX_RETRIES_KEY, |
| CosNConfigKeys.DEFAULT_MAX_RETRIES); |
| |
| config.setMaxConnectionsCount( |
| conf.getInt(CosNConfigKeys.MAX_CONNECTION_NUM, |
| CosNConfigKeys.DEFAULT_MAX_CONNECTION_NUM)); |
| |
| this.cosClient = new COSClient(cosCred, config); |
| } |
| |
| /** |
| * Initialize the CosNativeFileSystemStore object, including |
| * its COS client and default COS bucket. |
| * |
| * @param uri The URI of the COS bucket accessed by default. |
| * @param conf Hadoop configuration with COS configuration options. |
| * @throws IOException Initialize the COS client failed. |
| */ |
| @Override |
| public void initialize(URI uri, Configuration conf) throws IOException { |
| try { |
| initCOSClient(uri, conf); |
| this.bucketName = uri.getHost(); |
| } catch (Exception e) { |
| handleException(e, ""); |
| } |
| } |
| |
| /** |
| * Store a file into COS from the specified input stream, which would be |
| * retried until the success or maximum number. |
| * |
| * @param key COS object key. |
| * @param inputStream Input stream to be uploaded into COS. |
| * @param md5Hash MD5 value of the content to be uploaded. |
| * @param length Length of uploaded content. |
| * @throws IOException Upload the file failed. |
| */ |
| private void storeFileWithRetry(String key, InputStream inputStream, |
| byte[] md5Hash, long length) throws IOException { |
| try { |
| ObjectMetadata objectMetadata = new ObjectMetadata(); |
| objectMetadata.setContentMD5(Base64.encodeAsString(md5Hash)); |
| objectMetadata.setContentLength(length); |
| PutObjectRequest putObjectRequest = |
| new PutObjectRequest(bucketName, key, inputStream, objectMetadata); |
| |
| PutObjectResult putObjectResult = |
| (PutObjectResult) callCOSClientWithRetry(putObjectRequest); |
| LOG.debug("Store file successfully. COS key: [{}], ETag: [{}].", |
| key, putObjectResult.getETag()); |
| } catch (Exception e) { |
| String errMsg = String.format("Store file failed. COS key: [%s], " |
| + "exception: [%s]", key, e.toString()); |
| LOG.error(errMsg); |
| handleException(new Exception(errMsg), key); |
| } |
| } |
| |
| /** |
| * Store a local file into COS. |
| * |
| * @param key COS object key. |
| * @param file The local file to be uploaded. |
| * @param md5Hash The MD5 value of the file to be uploaded. |
| * @throws IOException Upload the file failed. |
| */ |
| @Override |
| public void storeFile(String key, File file, byte[] md5Hash) |
| throws IOException { |
| LOG.info("Store file from local path: [{}]. file length: [{}] COS key: " + |
| "[{}]", file.getCanonicalPath(), file.length(), key); |
| storeFileWithRetry(key, new BufferedInputStream(new FileInputStream(file)), |
| md5Hash, file.length()); |
| } |
| |
| /** |
| * Store a file into COS from the specified input stream. |
| * |
| * @param key COS object key. |
| * @param inputStream The Input stream to be uploaded. |
| * @param md5Hash The MD5 value of the content to be uploaded. |
| * @param contentLength Length of uploaded content. |
| * @throws IOException Upload the file failed. |
| */ |
| @Override |
| public void storeFile( |
| String key, |
| InputStream inputStream, |
| byte[] md5Hash, |
| long contentLength) throws IOException { |
| LOG.info("Store file from input stream. COS key: [{}], " |
| + "length: [{}].", key, contentLength); |
| storeFileWithRetry(key, inputStream, md5Hash, contentLength); |
| } |
| |
| // For cos, storeEmptyFile means creating a directory |
| @Override |
| public void storeEmptyFile(String key) throws IOException { |
| if (!key.endsWith(CosNFileSystem.PATH_DELIMITER)) { |
| key = key + CosNFileSystem.PATH_DELIMITER; |
| } |
| |
| ObjectMetadata objectMetadata = new ObjectMetadata(); |
| objectMetadata.setContentLength(0); |
| InputStream input = new ByteArrayInputStream(new byte[0]); |
| PutObjectRequest putObjectRequest = |
| new PutObjectRequest(bucketName, key, input, objectMetadata); |
| try { |
| PutObjectResult putObjectResult = |
| (PutObjectResult) callCOSClientWithRetry(putObjectRequest); |
| LOG.debug("Store empty file successfully. COS key: [{}], ETag: [{}].", |
| key, putObjectResult.getETag()); |
| } catch (Exception e) { |
| String errMsg = String.format("Store empty file failed. " |
| + "COS key: [%s], exception: [%s]", key, e.toString()); |
| LOG.error(errMsg); |
| handleException(new Exception(errMsg), key); |
| } |
| } |
| |
| public PartETag uploadPart(File file, String key, String uploadId, |
| int partNum) throws IOException { |
| InputStream inputStream = new FileInputStream(file); |
| try { |
| return uploadPart(inputStream, key, uploadId, partNum, file.length()); |
| } finally { |
| inputStream.close(); |
| } |
| } |
| |
| @Override |
| public PartETag uploadPart(InputStream inputStream, String key, |
| String uploadId, int partNum, long partSize) throws IOException { |
| UploadPartRequest uploadPartRequest = new UploadPartRequest(); |
| uploadPartRequest.setBucketName(this.bucketName); |
| uploadPartRequest.setUploadId(uploadId); |
| uploadPartRequest.setInputStream(inputStream); |
| uploadPartRequest.setPartNumber(partNum); |
| uploadPartRequest.setPartSize(partSize); |
| uploadPartRequest.setKey(key); |
| |
| try { |
| UploadPartResult uploadPartResult = |
| (UploadPartResult) callCOSClientWithRetry(uploadPartRequest); |
| return uploadPartResult.getPartETag(); |
| } catch (Exception e) { |
| String errMsg = String.format("Current thread: [%d], COS key: [%s], " |
| + "upload id: [%s], part num: [%d], exception: [%s]", |
| Thread.currentThread().getId(), key, uploadId, partNum, e.toString()); |
| handleException(new Exception(errMsg), key); |
| } |
| |
| return null; |
| } |
| |
| public void abortMultipartUpload(String key, String uploadId) { |
| LOG.info("Abort the multipart upload. COS key: [{}], upload id: [{}].", |
| key, uploadId); |
| AbortMultipartUploadRequest abortMultipartUploadRequest = |
| new AbortMultipartUploadRequest(bucketName, key, uploadId); |
| cosClient.abortMultipartUpload(abortMultipartUploadRequest); |
| } |
| |
| /** |
| * Initialize a multipart upload and return the upload id. |
| * |
| * @param key The COS object key initialized to multipart upload. |
| * @return The multipart upload id. |
| */ |
| public String getUploadId(String key) { |
| if (null == key || key.length() == 0) { |
| return ""; |
| } |
| |
| LOG.info("Initiate a multipart upload. bucket: [{}], COS key: [{}].", |
| bucketName, key); |
| InitiateMultipartUploadRequest initiateMultipartUploadRequest = |
| new InitiateMultipartUploadRequest(bucketName, key); |
| InitiateMultipartUploadResult initiateMultipartUploadResult = |
| cosClient.initiateMultipartUpload(initiateMultipartUploadRequest); |
| return initiateMultipartUploadResult.getUploadId(); |
| } |
| |
| /** |
| * Finish a multipart upload process, which will merge all parts uploaded. |
| * |
| * @param key The COS object key to be finished. |
| * @param uploadId The upload id of the multipart upload to be finished. |
| * @param partETagList The etag list of the part that has been uploaded. |
| * @return The result object of completing the multipart upload process. |
| */ |
| public CompleteMultipartUploadResult completeMultipartUpload( |
| String key, String uploadId, List<PartETag> partETagList) { |
| Collections.sort(partETagList, new Comparator<PartETag>() { |
| @Override |
| public int compare(PartETag o1, PartETag o2) { |
| return o1.getPartNumber() - o2.getPartNumber(); |
| } |
| }); |
| LOG.info("Complete the multipart upload. bucket: [{}], COS key: [{}], " |
| + "upload id: [{}].", bucketName, key, uploadId); |
| CompleteMultipartUploadRequest completeMultipartUploadRequest = |
| new CompleteMultipartUploadRequest( |
| bucketName, key, uploadId, partETagList); |
| return cosClient.completeMultipartUpload(completeMultipartUploadRequest); |
| } |
| |
| private FileMetadata queryObjectMetadata(String key) throws IOException { |
| GetObjectMetadataRequest getObjectMetadataRequest = |
| new GetObjectMetadataRequest(bucketName, key); |
| try { |
| ObjectMetadata objectMetadata = |
| (ObjectMetadata) callCOSClientWithRetry(getObjectMetadataRequest); |
| long mtime = 0; |
| if (objectMetadata.getLastModified() != null) { |
| mtime = objectMetadata.getLastModified().getTime(); |
| } |
| long fileSize = objectMetadata.getContentLength(); |
| FileMetadata fileMetadata = new FileMetadata(key, fileSize, mtime, |
| !key.endsWith(CosNFileSystem.PATH_DELIMITER)); |
| LOG.debug("Retrieve file metadata. COS key: [{}], ETag: [{}], " |
| + "length: [{}].", key, objectMetadata.getETag(), |
| objectMetadata.getContentLength()); |
| return fileMetadata; |
| } catch (CosServiceException e) { |
| if (e.getStatusCode() != HttpStatus.SC_NOT_FOUND) { |
| String errorMsg = String.format("Retrieve file metadata file failed. " |
| + "COS key: [%s], CosServiceException: [%s].", key, e.toString()); |
| LOG.error(errorMsg); |
| handleException(new Exception(errorMsg), key); |
| } |
| } |
| return null; |
| } |
| |
| @Override |
| public FileMetadata retrieveMetadata(String key) throws IOException { |
| if (key.endsWith(CosNFileSystem.PATH_DELIMITER)) { |
| key = key.substring(0, key.length() - 1); |
| } |
| |
| if (!key.isEmpty()) { |
| FileMetadata fileMetadata = queryObjectMetadata(key); |
| if (fileMetadata != null) { |
| return fileMetadata; |
| } |
| } |
| |
| // If the key is a directory. |
| key = key + CosNFileSystem.PATH_DELIMITER; |
| return queryObjectMetadata(key); |
| } |
| |
| /** |
| * Download a COS object and return the input stream associated with it. |
| * |
| * @param key The object key that is being retrieved from the COS bucket |
| * @return This method returns null if the key is not found |
| * @throws IOException if failed to download. |
| */ |
| @Override |
| public InputStream retrieve(String key) throws IOException { |
| LOG.debug("Retrieve object key: [{}].", key); |
| GetObjectRequest getObjectRequest = |
| new GetObjectRequest(this.bucketName, key); |
| try { |
| COSObject cosObject = |
| (COSObject) callCOSClientWithRetry(getObjectRequest); |
| return cosObject.getObjectContent(); |
| } catch (Exception e) { |
| String errMsg = String.format("Retrieving key: [%s] occurs " |
| + "an exception: [%s].", key, e.toString()); |
| LOG.error("Retrieving COS key: [{}] occurs an exception: [{}].", key, e); |
| handleException(new Exception(errMsg), key); |
| } |
| // never will get here |
| return null; |
| } |
| |
| /** |
| * Retrieved a part of a COS object, which is specified the start position. |
| * |
| * @param key The object key that is being retrieved from |
| * the COS bucket. |
| * @param byteRangeStart The start position of the part to be retrieved in |
| * the object. |
| * @return The input stream associated with the retrieved object. |
| * @throws IOException if failed to retrieve. |
| */ |
| @Override |
| public InputStream retrieve(String key, long byteRangeStart) |
| throws IOException { |
| try { |
| LOG.debug("Retrieve COS key:[{}]. range start:[{}].", |
| key, byteRangeStart); |
| long fileSize = getFileLength(key); |
| long byteRangeEnd = fileSize - 1; |
| GetObjectRequest getObjectRequest = |
| new GetObjectRequest(this.bucketName, key); |
| if (byteRangeEnd >= byteRangeStart) { |
| getObjectRequest.setRange(byteRangeStart, fileSize - 1); |
| } |
| COSObject cosObject = |
| (COSObject) callCOSClientWithRetry(getObjectRequest); |
| return cosObject.getObjectContent(); |
| } catch (Exception e) { |
| String errMsg = |
| String.format("Retrieving COS key: [%s] occurs an exception. " + |
| "byte range start: [%s], exception: [%s].", |
| key, byteRangeStart, e.toString()); |
| LOG.error(errMsg); |
| handleException(new Exception(errMsg), key); |
| } |
| |
| // never will get here |
| return null; |
| } |
| |
| /** |
| * Download a part of a COS object, which is specified the start and |
| * end position. |
| * |
| * @param key The object key that is being downloaded |
| * @param byteRangeStart The start position of the part to be retrieved in |
| * the object. |
| * @param byteRangeEnd The end position of the part to be retrieved in |
| * the object. |
| * @return The input stream associated with the retrieved objects. |
| * @throws IOException If failed to retrieve. |
| */ |
| @Override |
| public InputStream retrieveBlock(String key, long byteRangeStart, |
| long byteRangeEnd) throws IOException { |
| try { |
| GetObjectRequest request = new GetObjectRequest(this.bucketName, key); |
| request.setRange(byteRangeStart, byteRangeEnd); |
| COSObject cosObject = (COSObject) this.callCOSClientWithRetry(request); |
| return cosObject.getObjectContent(); |
| } catch (CosServiceException e) { |
| String errMsg = |
| String.format("Retrieving key [%s] with byteRangeStart [%d] occurs " + |
| "an CosServiceException: [%s].", |
| key, byteRangeStart, e.toString()); |
| LOG.error(errMsg); |
| handleException(new Exception(errMsg), key); |
| return null; |
| } catch (CosClientException e) { |
| String errMsg = |
| String.format("Retrieving key [%s] with byteRangeStart [%d] " |
| + "occurs an exception: [%s].", |
| key, byteRangeStart, e.toString()); |
| LOG.error("Retrieving COS key: [{}] with byteRangeStart: [{}] " + |
| "occurs an exception: [{}].", key, byteRangeStart, e); |
| handleException(new Exception(errMsg), key); |
| } |
| |
| return null; |
| } |
| |
| @Override |
| public PartialListing list(String prefix, int maxListingLength) |
| throws IOException { |
| return list(prefix, maxListingLength, null, false); |
| } |
| |
| @Override |
| public PartialListing list(String prefix, int maxListingLength, |
| String priorLastKey, boolean recurse) throws IOException { |
| return list(prefix, recurse ? null : CosNFileSystem.PATH_DELIMITER, |
| maxListingLength, priorLastKey); |
| } |
| |
| /** |
| * List the metadata for all objects that |
| * the object key has the specified prefix. |
| * |
| * @param prefix The prefix to be listed. |
| * @param delimiter The delimiter is a sign, the same paths between |
| * are listed. |
| * @param maxListingLength The maximum number of listed entries. |
| * @param priorLastKey The last key in any previous search. |
| * @return A metadata list on the match. |
| * @throws IOException If list objects failed. |
| */ |
| private PartialListing list(String prefix, String delimiter, |
| int maxListingLength, String priorLastKey) throws IOException { |
| LOG.debug("List objects. prefix: [{}], delimiter: [{}], " + |
| "maxListLength: [{}], priorLastKey: [{}].", |
| prefix, delimiter, maxListingLength, priorLastKey); |
| |
| if (!prefix.startsWith(CosNFileSystem.PATH_DELIMITER)) { |
| prefix += CosNFileSystem.PATH_DELIMITER; |
| } |
| ListObjectsRequest listObjectsRequest = new ListObjectsRequest(); |
| listObjectsRequest.setBucketName(bucketName); |
| listObjectsRequest.setPrefix(prefix); |
| listObjectsRequest.setDelimiter(delimiter); |
| listObjectsRequest.setMarker(priorLastKey); |
| listObjectsRequest.setMaxKeys(maxListingLength); |
| ObjectListing objectListing = null; |
| try { |
| objectListing = |
| (ObjectListing) callCOSClientWithRetry(listObjectsRequest); |
| } catch (Exception e) { |
| String errMsg = String.format("prefix: [%s], delimiter: [%s], " |
| + "maxListingLength: [%d], priorLastKey: [%s]. " |
| + "List objects occur an exception: [%s].", prefix, |
| (delimiter == null) ? "" : delimiter, maxListingLength, priorLastKey, |
| e.toString()); |
| LOG.error(errMsg); |
| handleException(new Exception(errMsg), prefix); |
| } |
| ArrayList<FileMetadata> fileMetadataArray = new ArrayList<>(); |
| ArrayList<FileMetadata> commonPrefixArray = new ArrayList<>(); |
| |
| if (null == objectListing) { |
| String errMsg = String.format("List the prefix: [%s] failed. " + |
| "delimiter: [%s], max listing length:" + |
| " [%s], prior last key: [%s]", |
| prefix, delimiter, maxListingLength, priorLastKey); |
| handleException(new Exception(errMsg), prefix); |
| } |
| |
| List<COSObjectSummary> summaries = objectListing.getObjectSummaries(); |
| for (COSObjectSummary cosObjectSummary : summaries) { |
| String filePath = cosObjectSummary.getKey(); |
| if (!filePath.startsWith(CosNFileSystem.PATH_DELIMITER)) { |
| filePath = CosNFileSystem.PATH_DELIMITER + filePath; |
| } |
| if (filePath.equals(prefix)) { |
| continue; |
| } |
| long mtime = 0; |
| if (cosObjectSummary.getLastModified() != null) { |
| mtime = cosObjectSummary.getLastModified().getTime(); |
| } |
| long fileLen = cosObjectSummary.getSize(); |
| fileMetadataArray.add( |
| new FileMetadata(filePath, fileLen, mtime, true)); |
| } |
| List<String> commonPrefixes = objectListing.getCommonPrefixes(); |
| for (String commonPrefix : commonPrefixes) { |
| if (!commonPrefix.startsWith(CosNFileSystem.PATH_DELIMITER)) { |
| commonPrefix = CosNFileSystem.PATH_DELIMITER + commonPrefix; |
| } |
| commonPrefixArray.add( |
| new FileMetadata(commonPrefix, 0, 0, false)); |
| } |
| |
| FileMetadata[] fileMetadata = new FileMetadata[fileMetadataArray.size()]; |
| for (int i = 0; i < fileMetadataArray.size(); ++i) { |
| fileMetadata[i] = fileMetadataArray.get(i); |
| } |
| FileMetadata[] commonPrefixMetaData = |
| new FileMetadata[commonPrefixArray.size()]; |
| for (int i = 0; i < commonPrefixArray.size(); ++i) { |
| commonPrefixMetaData[i] = commonPrefixArray.get(i); |
| } |
| // when truncated is false, it means that listing is finished. |
| if (!objectListing.isTruncated()) { |
| return new PartialListing( |
| null, fileMetadata, commonPrefixMetaData); |
| } else { |
| return new PartialListing( |
| objectListing.getNextMarker(), fileMetadata, commonPrefixMetaData); |
| } |
| } |
| |
| @Override |
| public void delete(String key) throws IOException { |
| LOG.debug("Delete object key: [{}] from bucket: {}.", key, this.bucketName); |
| try { |
| DeleteObjectRequest deleteObjectRequest = |
| new DeleteObjectRequest(bucketName, key); |
| callCOSClientWithRetry(deleteObjectRequest); |
| } catch (Exception e) { |
| String errMsg = |
| String.format("Delete key: [%s] occurs an exception: [%s].", |
| key, e.toString()); |
| LOG.error(errMsg); |
| handleException(new Exception(errMsg), key); |
| } |
| } |
| |
| public void rename(String srcKey, String dstKey) throws IOException { |
| LOG.debug("Rename source key: [{}] to dest key: [{}].", srcKey, dstKey); |
| try { |
| CopyObjectRequest copyObjectRequest = |
| new CopyObjectRequest(bucketName, srcKey, bucketName, dstKey); |
| callCOSClientWithRetry(copyObjectRequest); |
| DeleteObjectRequest deleteObjectRequest = |
| new DeleteObjectRequest(bucketName, srcKey); |
| callCOSClientWithRetry(deleteObjectRequest); |
| } catch (Exception e) { |
| String errMsg = String.format("Rename object unsuccessfully. " |
| + "source cos key: [%s], dest COS " + |
| "key: [%s], exception: [%s]", |
| srcKey, |
| dstKey, e.toString()); |
| LOG.error(errMsg); |
| handleException(new Exception(errMsg), srcKey); |
| } |
| } |
| |
| @Override |
| public void copy(String srcKey, String dstKey) throws IOException { |
| LOG.debug("Copy source key: [{}] to dest key: [{}].", srcKey, dstKey); |
| try { |
| CopyObjectRequest copyObjectRequest = |
| new CopyObjectRequest(bucketName, srcKey, bucketName, dstKey); |
| callCOSClientWithRetry(copyObjectRequest); |
| } catch (Exception e) { |
| String errMsg = String.format("Copy object unsuccessfully. " |
| + "source COS key: %s, dest COS key: " + |
| "%s, exception: %s", |
| srcKey, |
| dstKey, e.toString()); |
| LOG.error(errMsg); |
| handleException(new Exception(errMsg), srcKey); |
| } |
| } |
| |
| @Override |
| public void purge(String prefix) throws IOException { |
| throw new IOException("purge not supported"); |
| } |
| |
| @Override |
| public void dump() throws IOException { |
| throw new IOException("dump not supported"); |
| } |
| |
| // process Exception and print detail |
| private void handleException(Exception e, String key) throws IOException { |
| String cosPath = CosNFileSystem.SCHEME + "://" + bucketName + key; |
| String exceptInfo = String.format("%s : %s", cosPath, e.toString()); |
| throw new IOException(exceptInfo); |
| } |
| |
| @Override |
| public long getFileLength(String key) throws IOException { |
| LOG.debug("Get file length. COS key: {}", key); |
| GetObjectMetadataRequest getObjectMetadataRequest = |
| new GetObjectMetadataRequest(bucketName, key); |
| try { |
| ObjectMetadata objectMetadata = |
| (ObjectMetadata) callCOSClientWithRetry(getObjectMetadataRequest); |
| return objectMetadata.getContentLength(); |
| } catch (Exception e) { |
| String errMsg = String.format("Getting file length occurs an exception." + |
| "COS key: %s, exception: %s", key, |
| e.toString()); |
| LOG.error(errMsg); |
| handleException(new Exception(errMsg), key); |
| return 0; // never will get here |
| } |
| } |
| |
| private <X> Object callCOSClientWithRetry(X request) |
| throws CosServiceException, IOException { |
| String sdkMethod = ""; |
| int retryIndex = 1; |
| while (true) { |
| try { |
| if (request instanceof PutObjectRequest) { |
| sdkMethod = "putObject"; |
| return this.cosClient.putObject((PutObjectRequest) request); |
| } else if (request instanceof UploadPartRequest) { |
| sdkMethod = "uploadPart"; |
| if (((UploadPartRequest) request).getInputStream() |
| instanceof ByteBufferInputStream) { |
| ((UploadPartRequest) request).getInputStream() |
| .mark((int) ((UploadPartRequest) request).getPartSize()); |
| } |
| return this.cosClient.uploadPart((UploadPartRequest) request); |
| } else if (request instanceof GetObjectMetadataRequest) { |
| sdkMethod = "queryObjectMeta"; |
| return this.cosClient.getObjectMetadata( |
| (GetObjectMetadataRequest) request); |
| } else if (request instanceof DeleteObjectRequest) { |
| sdkMethod = "deleteObject"; |
| this.cosClient.deleteObject((DeleteObjectRequest) request); |
| return new Object(); |
| } else if (request instanceof CopyObjectRequest) { |
| sdkMethod = "copyFile"; |
| return this.cosClient.copyObject((CopyObjectRequest) request); |
| } else if (request instanceof GetObjectRequest) { |
| sdkMethod = "getObject"; |
| return this.cosClient.getObject((GetObjectRequest) request); |
| } else if (request instanceof ListObjectsRequest) { |
| sdkMethod = "listObjects"; |
| return this.cosClient.listObjects((ListObjectsRequest) request); |
| } else { |
| throw new IOException("no such method"); |
| } |
| } catch (CosServiceException cse) { |
| String errMsg = String.format("Call cos sdk failed, " |
| + "retryIndex: [%d / %d], " |
| + "call method: %s, exception: %s", |
| retryIndex, this.maxRetryTimes, sdkMethod, cse.toString()); |
| int statusCode = cse.getStatusCode(); |
| // Retry all server errors |
| if (statusCode / 100 == 5) { |
| if (retryIndex <= this.maxRetryTimes) { |
| LOG.info(errMsg); |
| long sleepLeast = retryIndex * 300L; |
| long sleepBound = retryIndex * 500L; |
| try { |
| if (request instanceof UploadPartRequest) { |
| if (((UploadPartRequest) request).getInputStream() |
| instanceof ByteBufferInputStream) { |
| ((UploadPartRequest) request).getInputStream().reset(); |
| } |
| } |
| Thread.sleep( |
| ThreadLocalRandom.current().nextLong(sleepLeast, sleepBound)); |
| ++retryIndex; |
| } catch (InterruptedException e) { |
| throw new IOException(e.toString()); |
| } |
| } else { |
| LOG.error(errMsg); |
| throw new IOException(errMsg); |
| } |
| } else { |
| throw cse; |
| } |
| } catch (Exception e) { |
| String errMsg = String.format("Call cos sdk failed, " |
| + "call method: %s, exception: %s", sdkMethod, e.toString()); |
| LOG.error(errMsg); |
| throw new IOException(errMsg); |
| } |
| } |
| } |
| |
| @Override |
| public void close() { |
| if (null != this.cosClient) { |
| this.cosClient.shutdown(); |
| } |
| } |
| } |