| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.io.aws.s3; |
| |
| import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument; |
| import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull; |
| import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState; |
| |
| import com.amazonaws.AmazonClientException; |
| import com.amazonaws.services.s3.AmazonS3; |
| import com.amazonaws.services.s3.AmazonS3ClientBuilder; |
| import com.amazonaws.services.s3.model.AmazonS3Exception; |
| import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; |
| import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; |
| import com.amazonaws.services.s3.model.CopyObjectRequest; |
| import com.amazonaws.services.s3.model.CopyObjectResult; |
| import com.amazonaws.services.s3.model.CopyPartRequest; |
| import com.amazonaws.services.s3.model.CopyPartResult; |
| import com.amazonaws.services.s3.model.DeleteObjectsRequest; |
| import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion; |
| import com.amazonaws.services.s3.model.GetObjectMetadataRequest; |
| import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; |
| import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; |
| import com.amazonaws.services.s3.model.ListObjectsV2Request; |
| import com.amazonaws.services.s3.model.ListObjectsV2Result; |
| import com.amazonaws.services.s3.model.ObjectMetadata; |
| import com.amazonaws.services.s3.model.PartETag; |
| import com.amazonaws.services.s3.model.S3ObjectSummary; |
| import com.google.auto.value.AutoValue; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.nio.channels.ReadableByteChannel; |
| import java.nio.channels.WritableByteChannel; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CompletionStage; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.regex.Pattern; |
| import java.util.stream.Collectors; |
| import javax.annotation.Nullable; |
| import org.apache.beam.sdk.io.FileSystem; |
| import org.apache.beam.sdk.io.aws.options.S3ClientBuilderFactory; |
| import org.apache.beam.sdk.io.aws.options.S3Options; |
| import org.apache.beam.sdk.io.fs.CreateOptions; |
| import org.apache.beam.sdk.io.fs.MatchResult; |
| import org.apache.beam.sdk.util.InstanceBuilder; |
| import org.apache.beam.sdk.util.MoreFutures; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Supplier; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Suppliers; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ArrayListMultimap; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Multimap; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.MoreExecutors; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ThreadFactoryBuilder; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| class S3FileSystem extends FileSystem<S3ResourceId> { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(S3FileSystem.class); |
| |
| // Amazon S3 API: You can create a copy of your object up to 5 GB in a single atomic operation |
| // Ref. https://docs.aws.amazon.com/AmazonS3/latest/dev/CopyingObjectsExamples.html |
| private static final long MAX_COPY_OBJECT_SIZE_BYTES = 5_368_709_120L; |
| |
| // S3 API, delete-objects: "You may specify up to 1000 keys." |
| private static final int MAX_DELETE_OBJECTS_PER_REQUEST = 1000; |
| |
| private static final ImmutableSet<String> NON_READ_SEEK_EFFICIENT_ENCODINGS = |
| ImmutableSet.of("gzip"); |
| |
| // Non-final for testing. |
| private Supplier<AmazonS3> amazonS3; |
| private final S3Options options; |
| private final ListeningExecutorService executorService; |
| |
| S3FileSystem(S3Options options) { |
| this.options = checkNotNull(options, "options"); |
| AmazonS3ClientBuilder builder = |
| InstanceBuilder.ofType(S3ClientBuilderFactory.class) |
| .fromClass(options.getS3ClientFactoryClass()) |
| .build() |
| .createBuilder(options); |
| // The Supplier is to make sure we don't call .build() unless we are actually using S3. |
| amazonS3 = Suppliers.memoize(builder::build); |
| |
| checkNotNull(options.getS3StorageClass(), "storageClass"); |
| checkArgument(options.getS3ThreadPoolSize() > 0, "threadPoolSize"); |
| executorService = |
| MoreExecutors.listeningDecorator( |
| Executors.newFixedThreadPool( |
| options.getS3ThreadPoolSize(), new ThreadFactoryBuilder().setDaemon(true).build())); |
| } |
| |
| @Override |
| protected String getScheme() { |
| return S3ResourceId.SCHEME; |
| } |
| |
| @VisibleForTesting |
| void setAmazonS3Client(AmazonS3 amazonS3) { |
| this.amazonS3 = Suppliers.ofInstance(amazonS3); |
| } |
| |
| @VisibleForTesting |
| AmazonS3 getAmazonS3Client() { |
| return this.amazonS3.get(); |
| } |
| |
| @Override |
| protected List<MatchResult> match(List<String> specs) throws IOException { |
| List<S3ResourceId> paths = |
| specs.stream().map(S3ResourceId::fromUri).collect(Collectors.toList()); |
| List<S3ResourceId> globs = new ArrayList<>(); |
| List<S3ResourceId> nonGlobs = new ArrayList<>(); |
| List<Boolean> isGlobBooleans = new ArrayList<>(); |
| |
| for (S3ResourceId path : paths) { |
| if (path.isWildcard()) { |
| globs.add(path); |
| isGlobBooleans.add(true); |
| } else { |
| nonGlobs.add(path); |
| isGlobBooleans.add(false); |
| } |
| } |
| |
| Iterator<MatchResult> globMatches = matchGlobPaths(globs).iterator(); |
| Iterator<MatchResult> nonGlobMatches = matchNonGlobPaths(nonGlobs).iterator(); |
| |
| ImmutableList.Builder<MatchResult> matchResults = ImmutableList.builder(); |
| for (Boolean isGlob : isGlobBooleans) { |
| if (isGlob) { |
| checkState(globMatches.hasNext(), "Expect globMatches has next."); |
| matchResults.add(globMatches.next()); |
| } else { |
| checkState(nonGlobMatches.hasNext(), "Expect nonGlobMatches has next."); |
| matchResults.add(nonGlobMatches.next()); |
| } |
| } |
| checkState(!globMatches.hasNext(), "Expect no more elements in globMatches."); |
| checkState(!nonGlobMatches.hasNext(), "Expect no more elements in nonGlobMatches."); |
| |
| return matchResults.build(); |
| } |
| |
| /** Gets {@link MatchResult} representing all objects that match wildcard-containing paths. */ |
| @VisibleForTesting |
| List<MatchResult> matchGlobPaths(Collection<S3ResourceId> globPaths) throws IOException { |
| List<Callable<ExpandedGlob>> expandTasks = new ArrayList<>(globPaths.size()); |
| for (final S3ResourceId path : globPaths) { |
| expandTasks.add(() -> expandGlob(path)); |
| } |
| |
| Map<S3ResourceId, ExpandedGlob> expandedGlobByGlobPath = new HashMap<>(); |
| List<Callable<PathWithEncoding>> contentTypeTasks = new ArrayList<>(globPaths.size()); |
| for (ExpandedGlob expandedGlob : callTasks(expandTasks)) { |
| expandedGlobByGlobPath.put(expandedGlob.getGlobPath(), expandedGlob); |
| if (expandedGlob.getExpandedPaths() != null) { |
| for (final S3ResourceId path : expandedGlob.getExpandedPaths()) { |
| contentTypeTasks.add(() -> getPathContentEncoding(path)); |
| } |
| } |
| } |
| |
| Map<S3ResourceId, PathWithEncoding> exceptionByPath = new HashMap<>(); |
| for (PathWithEncoding pathWithException : callTasks(contentTypeTasks)) { |
| exceptionByPath.put(pathWithException.getPath(), pathWithException); |
| } |
| |
| List<MatchResult> results = new ArrayList<>(globPaths.size()); |
| for (S3ResourceId globPath : globPaths) { |
| ExpandedGlob expandedGlob = expandedGlobByGlobPath.get(globPath); |
| |
| if (expandedGlob.getException() != null) { |
| results.add(MatchResult.create(MatchResult.Status.ERROR, expandedGlob.getException())); |
| |
| } else { |
| List<MatchResult.Metadata> metadatas = new ArrayList<>(); |
| IOException exception = null; |
| for (S3ResourceId expandedPath : expandedGlob.getExpandedPaths()) { |
| PathWithEncoding pathWithEncoding = exceptionByPath.get(expandedPath); |
| |
| if (pathWithEncoding.getException() != null) { |
| exception = pathWithEncoding.getException(); |
| break; |
| } else { |
| metadatas.add( |
| createBeamMetadata( |
| pathWithEncoding.getPath(), pathWithEncoding.getContentEncoding())); |
| } |
| } |
| |
| if (exception != null) { |
| if (exception instanceof FileNotFoundException) { |
| results.add(MatchResult.create(MatchResult.Status.NOT_FOUND, exception)); |
| } else { |
| results.add(MatchResult.create(MatchResult.Status.ERROR, exception)); |
| } |
| } else { |
| results.add(MatchResult.create(MatchResult.Status.OK, metadatas)); |
| } |
| } |
| } |
| |
| return ImmutableList.copyOf(results); |
| } |
| |
| @AutoValue |
| abstract static class ExpandedGlob { |
| |
| abstract S3ResourceId getGlobPath(); |
| |
| @Nullable |
| abstract List<S3ResourceId> getExpandedPaths(); |
| |
| @Nullable |
| abstract IOException getException(); |
| |
| static ExpandedGlob create(S3ResourceId globPath, List<S3ResourceId> expandedPaths) { |
| checkNotNull(globPath, "globPath"); |
| checkNotNull(expandedPaths, "expandedPaths"); |
| return new AutoValue_S3FileSystem_ExpandedGlob(globPath, expandedPaths, null); |
| } |
| |
| static ExpandedGlob create(S3ResourceId globPath, IOException exception) { |
| checkNotNull(globPath, "globPath"); |
| checkNotNull(exception, "exception"); |
| return new AutoValue_S3FileSystem_ExpandedGlob(globPath, null, exception); |
| } |
| } |
| |
| @AutoValue |
| abstract static class PathWithEncoding { |
| |
| abstract S3ResourceId getPath(); |
| |
| @Nullable |
| abstract String getContentEncoding(); |
| |
| @Nullable |
| abstract IOException getException(); |
| |
| static PathWithEncoding create(S3ResourceId path, String contentEncoding) { |
| checkNotNull(path, "path"); |
| checkNotNull(contentEncoding, "contentEncoding"); |
| return new AutoValue_S3FileSystem_PathWithEncoding(path, contentEncoding, null); |
| } |
| |
| static PathWithEncoding create(S3ResourceId path, IOException exception) { |
| checkNotNull(path, "path"); |
| checkNotNull(exception, "exception"); |
| return new AutoValue_S3FileSystem_PathWithEncoding(path, null, exception); |
| } |
| } |
| |
| private ExpandedGlob expandGlob(S3ResourceId glob) { |
| // The S3 API can list objects, filtered by prefix, but not by wildcard. |
| // Here, we find the longest prefix without wildcard "*", |
| // then filter the results with a regex. |
| checkArgument(glob.isWildcard(), "isWildcard"); |
| String keyPrefix = glob.getKeyNonWildcardPrefix(); |
| Pattern wildcardRegexp = Pattern.compile(wildcardToRegexp(glob.getKey())); |
| |
| LOG.debug( |
| "expanding bucket {}, prefix {}, against pattern {}", |
| glob.getBucket(), |
| keyPrefix, |
| wildcardRegexp.toString()); |
| |
| ImmutableList.Builder<S3ResourceId> expandedPaths = ImmutableList.builder(); |
| String continuationToken = null; |
| |
| do { |
| ListObjectsV2Request request = |
| new ListObjectsV2Request() |
| .withBucketName(glob.getBucket()) |
| .withPrefix(keyPrefix) |
| .withContinuationToken(continuationToken); |
| ListObjectsV2Result result; |
| try { |
| result = amazonS3.get().listObjectsV2(request); |
| } catch (AmazonClientException e) { |
| return ExpandedGlob.create(glob, new IOException(e)); |
| } |
| continuationToken = result.getNextContinuationToken(); |
| |
| for (S3ObjectSummary objectSummary : result.getObjectSummaries()) { |
| // Filter against regex. |
| if (wildcardRegexp.matcher(objectSummary.getKey()).matches()) { |
| S3ResourceId expandedPath = |
| S3ResourceId.fromComponents(objectSummary.getBucketName(), objectSummary.getKey()) |
| .withSize(objectSummary.getSize()) |
| .withLastModified(objectSummary.getLastModified()); |
| LOG.debug("Expanded S3 object path {}", expandedPath); |
| expandedPaths.add(expandedPath); |
| } |
| } |
| } while (continuationToken != null); |
| |
| return ExpandedGlob.create(glob, expandedPaths.build()); |
| } |
| |
| private PathWithEncoding getPathContentEncoding(S3ResourceId path) { |
| ObjectMetadata s3Metadata; |
| try { |
| s3Metadata = getObjectMetadata(path); |
| } catch (AmazonClientException e) { |
| if (e instanceof AmazonS3Exception && ((AmazonS3Exception) e).getStatusCode() == 404) { |
| return PathWithEncoding.create(path, new FileNotFoundException()); |
| } |
| return PathWithEncoding.create(path, new IOException(e)); |
| } |
| return PathWithEncoding.create(path, Strings.nullToEmpty(s3Metadata.getContentEncoding())); |
| } |
| |
| private List<MatchResult> matchNonGlobPaths(Collection<S3ResourceId> paths) throws IOException { |
| List<Callable<MatchResult>> tasks = new ArrayList<>(paths.size()); |
| for (final S3ResourceId path : paths) { |
| tasks.add(() -> matchNonGlobPath(path)); |
| } |
| |
| return callTasks(tasks); |
| } |
| |
| private ObjectMetadata getObjectMetadata(S3ResourceId s3ResourceId) throws AmazonClientException { |
| GetObjectMetadataRequest request = |
| new GetObjectMetadataRequest(s3ResourceId.getBucket(), s3ResourceId.getKey()); |
| request.setSSECustomerKey(options.getSSECustomerKey()); |
| return amazonS3.get().getObjectMetadata(request); |
| } |
| |
| @VisibleForTesting |
| MatchResult matchNonGlobPath(S3ResourceId path) { |
| ObjectMetadata s3Metadata; |
| try { |
| s3Metadata = getObjectMetadata(path); |
| } catch (AmazonClientException e) { |
| if (e instanceof AmazonS3Exception && ((AmazonS3Exception) e).getStatusCode() == 404) { |
| return MatchResult.create(MatchResult.Status.NOT_FOUND, new FileNotFoundException()); |
| } |
| return MatchResult.create(MatchResult.Status.ERROR, new IOException(e)); |
| } |
| |
| return MatchResult.create( |
| MatchResult.Status.OK, |
| ImmutableList.of( |
| createBeamMetadata( |
| path.withSize(s3Metadata.getContentLength()) |
| .withLastModified(s3Metadata.getLastModified()), |
| Strings.nullToEmpty(s3Metadata.getContentEncoding())))); |
| } |
| |
| private static MatchResult.Metadata createBeamMetadata( |
| S3ResourceId path, String contentEncoding) { |
| checkArgument(path.getSize().isPresent(), "path has size"); |
| checkNotNull(contentEncoding, "contentEncoding"); |
| boolean isReadSeekEfficient = !NON_READ_SEEK_EFFICIENT_ENCODINGS.contains(contentEncoding); |
| |
| return MatchResult.Metadata.builder() |
| .setIsReadSeekEfficient(isReadSeekEfficient) |
| .setResourceId(path) |
| .setSizeBytes(path.getSize().get()) |
| .setLastModifiedMillis(path.getLastModified().transform(Date::getTime).or(0L)) |
| .build(); |
| } |
| |
| /** |
| * Expands glob expressions to regular expressions. |
| * |
| * @param globExp the glob expression to expand |
| * @return a string with the regular expression this glob expands to |
| */ |
| @VisibleForTesting |
| static String wildcardToRegexp(String globExp) { |
| StringBuilder dst = new StringBuilder(); |
| char[] src = globExp.replace("**/*", "**").toCharArray(); |
| int i = 0; |
| while (i < src.length) { |
| char c = src[i++]; |
| switch (c) { |
| case '*': |
| // One char lookahead for ** |
| if (i < src.length && src[i] == '*') { |
| dst.append(".*"); |
| ++i; |
| } else { |
| dst.append("[^/]*"); |
| } |
| break; |
| case '?': |
| dst.append("[^/]"); |
| break; |
| case '.': |
| case '+': |
| case '{': |
| case '}': |
| case '(': |
| case ')': |
| case '|': |
| case '^': |
| case '$': |
| // These need to be escaped in regular expressions |
| dst.append('\\').append(c); |
| break; |
| case '\\': |
| i = doubleSlashes(dst, src, i); |
| break; |
| default: |
| dst.append(c); |
| break; |
| } |
| } |
| return dst.toString(); |
| } |
| |
| private static int doubleSlashes(StringBuilder dst, char[] src, int i) { |
| // Emit the next character without special interpretation |
| dst.append("\\\\"); |
| if ((i - 1) != src.length) { |
| dst.append(src[i]); |
| i++; |
| } else { |
| // A backslash at the very end is treated like an escaped backslash |
| dst.append('\\'); |
| } |
| return i; |
| } |
| |
| @Override |
| protected WritableByteChannel create(S3ResourceId resourceId, CreateOptions createOptions) |
| throws IOException { |
| return new S3WritableByteChannel(amazonS3.get(), resourceId, createOptions.mimeType(), options); |
| } |
| |
| @Override |
| protected ReadableByteChannel open(S3ResourceId resourceId) throws IOException { |
| return new S3ReadableSeekableByteChannel(amazonS3.get(), resourceId, options); |
| } |
| |
| @Override |
| protected void copy(List<S3ResourceId> sourcePaths, List<S3ResourceId> destinationPaths) |
| throws IOException { |
| checkArgument( |
| sourcePaths.size() == destinationPaths.size(), |
| "sizes of sourcePaths and destinationPaths do not match"); |
| |
| List<Callable<Void>> tasks = new ArrayList<>(sourcePaths.size()); |
| |
| Iterator<S3ResourceId> sourcePathsIterator = sourcePaths.iterator(); |
| Iterator<S3ResourceId> destinationPathsIterator = destinationPaths.iterator(); |
| while (sourcePathsIterator.hasNext()) { |
| final S3ResourceId sourcePath = sourcePathsIterator.next(); |
| final S3ResourceId destinationPath = destinationPathsIterator.next(); |
| |
| tasks.add( |
| () -> { |
| copy(sourcePath, destinationPath); |
| return null; |
| }); |
| } |
| |
| callTasks(tasks); |
| } |
| |
| @VisibleForTesting |
| void copy(S3ResourceId sourcePath, S3ResourceId destinationPath) throws IOException { |
| try { |
| ObjectMetadata sourceObjectMetadata = getObjectMetadata(sourcePath); |
| if (sourceObjectMetadata.getContentLength() < MAX_COPY_OBJECT_SIZE_BYTES) { |
| atomicCopy(sourcePath, destinationPath, sourceObjectMetadata); |
| } else { |
| multipartCopy(sourcePath, destinationPath, sourceObjectMetadata); |
| } |
| } catch (AmazonClientException e) { |
| throw new IOException(e); |
| } |
| } |
| |
| @VisibleForTesting |
| CopyObjectResult atomicCopy( |
| S3ResourceId sourcePath, S3ResourceId destinationPath, ObjectMetadata sourceObjectMetadata) |
| throws AmazonClientException { |
| CopyObjectRequest copyObjectRequest = |
| new CopyObjectRequest( |
| sourcePath.getBucket(), |
| sourcePath.getKey(), |
| destinationPath.getBucket(), |
| destinationPath.getKey()); |
| copyObjectRequest.setNewObjectMetadata(sourceObjectMetadata); |
| copyObjectRequest.setStorageClass(options.getS3StorageClass()); |
| copyObjectRequest.setSourceSSECustomerKey(options.getSSECustomerKey()); |
| copyObjectRequest.setDestinationSSECustomerKey(options.getSSECustomerKey()); |
| return amazonS3.get().copyObject(copyObjectRequest); |
| } |
| |
| @VisibleForTesting |
| CompleteMultipartUploadResult multipartCopy( |
| S3ResourceId sourcePath, S3ResourceId destinationPath, ObjectMetadata sourceObjectMetadata) |
| throws AmazonClientException { |
| InitiateMultipartUploadRequest initiateUploadRequest = |
| new InitiateMultipartUploadRequest(destinationPath.getBucket(), destinationPath.getKey()) |
| .withStorageClass(options.getS3StorageClass()) |
| .withObjectMetadata(sourceObjectMetadata); |
| initiateUploadRequest.setSSECustomerKey(options.getSSECustomerKey()); |
| |
| InitiateMultipartUploadResult initiateUploadResult = |
| amazonS3.get().initiateMultipartUpload(initiateUploadRequest); |
| final String uploadId = initiateUploadResult.getUploadId(); |
| |
| List<PartETag> eTags = new ArrayList<>(); |
| |
| final long objectSize = sourceObjectMetadata.getContentLength(); |
| // extra validation in case a caller calls directly S3FileSystem.multipartCopy |
| // without using S3FileSystem.copy in the future |
| if (objectSize == 0) { |
| final CopyPartRequest copyPartRequest = |
| new CopyPartRequest() |
| .withSourceBucketName(sourcePath.getBucket()) |
| .withSourceKey(sourcePath.getKey()) |
| .withDestinationBucketName(destinationPath.getBucket()) |
| .withDestinationKey(destinationPath.getKey()) |
| .withUploadId(uploadId) |
| .withPartNumber(1); |
| copyPartRequest.setSourceSSECustomerKey(options.getSSECustomerKey()); |
| copyPartRequest.setDestinationSSECustomerKey(options.getSSECustomerKey()); |
| |
| CopyPartResult copyPartResult = amazonS3.get().copyPart(copyPartRequest); |
| eTags.add(copyPartResult.getPartETag()); |
| } else { |
| long bytePosition = 0; |
| Integer uploadBufferSizeBytes = options.getS3UploadBufferSizeBytes(); |
| // Amazon parts are 1-indexed, not zero-indexed. |
| for (int partNumber = 1; bytePosition < objectSize; partNumber++) { |
| final CopyPartRequest copyPartRequest = |
| new CopyPartRequest() |
| .withSourceBucketName(sourcePath.getBucket()) |
| .withSourceKey(sourcePath.getKey()) |
| .withDestinationBucketName(destinationPath.getBucket()) |
| .withDestinationKey(destinationPath.getKey()) |
| .withUploadId(uploadId) |
| .withPartNumber(partNumber) |
| .withFirstByte(bytePosition) |
| .withLastByte(Math.min(objectSize - 1, bytePosition + uploadBufferSizeBytes - 1)); |
| copyPartRequest.setSourceSSECustomerKey(options.getSSECustomerKey()); |
| copyPartRequest.setDestinationSSECustomerKey(options.getSSECustomerKey()); |
| |
| CopyPartResult copyPartResult = amazonS3.get().copyPart(copyPartRequest); |
| eTags.add(copyPartResult.getPartETag()); |
| |
| bytePosition += uploadBufferSizeBytes; |
| } |
| } |
| |
| CompleteMultipartUploadRequest completeUploadRequest = |
| new CompleteMultipartUploadRequest() |
| .withBucketName(destinationPath.getBucket()) |
| .withKey(destinationPath.getKey()) |
| .withUploadId(uploadId) |
| .withPartETags(eTags); |
| return amazonS3.get().completeMultipartUpload(completeUploadRequest); |
| } |
| |
| @Override |
| protected void rename( |
| List<S3ResourceId> sourceResourceIds, List<S3ResourceId> destinationResourceIds) |
| throws IOException { |
| copy(sourceResourceIds, destinationResourceIds); |
| delete(sourceResourceIds); |
| } |
| |
| @Override |
| protected void delete(Collection<S3ResourceId> resourceIds) throws IOException { |
| List<S3ResourceId> nonDirectoryPaths = |
| resourceIds.stream() |
| .filter(s3ResourceId -> !s3ResourceId.isDirectory()) |
| .collect(Collectors.toList()); |
| Multimap<String, String> keysByBucket = ArrayListMultimap.create(); |
| for (S3ResourceId path : nonDirectoryPaths) { |
| keysByBucket.put(path.getBucket(), path.getKey()); |
| } |
| |
| List<Callable<Void>> tasks = new ArrayList<>(); |
| for (final String bucket : keysByBucket.keySet()) { |
| for (final List<String> keysPartition : |
| Iterables.partition(keysByBucket.get(bucket), MAX_DELETE_OBJECTS_PER_REQUEST)) { |
| tasks.add( |
| () -> { |
| delete(bucket, keysPartition); |
| return null; |
| }); |
| } |
| } |
| |
| callTasks(tasks); |
| } |
| |
| private void delete(String bucket, Collection<String> keys) throws IOException { |
| checkArgument( |
| keys.size() <= MAX_DELETE_OBJECTS_PER_REQUEST, |
| "only %s keys can be deleted per request, but got %s", |
| MAX_DELETE_OBJECTS_PER_REQUEST, |
| keys.size()); |
| List<KeyVersion> deleteKeyVersions = |
| keys.stream().map(KeyVersion::new).collect(Collectors.toList()); |
| DeleteObjectsRequest request = new DeleteObjectsRequest(bucket).withKeys(deleteKeyVersions); |
| try { |
| amazonS3.get().deleteObjects(request); |
| } catch (AmazonClientException e) { |
| throw new IOException(e); |
| } |
| } |
| |
| @Override |
| protected S3ResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) { |
| if (isDirectory) { |
| if (!singleResourceSpec.endsWith("/")) { |
| singleResourceSpec += "/"; |
| } |
| } else { |
| checkArgument( |
| !singleResourceSpec.endsWith("/"), |
| "Expected a file path, but [%s] ends with '/'. This is unsupported in S3FileSystem.", |
| singleResourceSpec); |
| } |
| return S3ResourceId.fromUri(singleResourceSpec); |
| } |
| |
| /** |
| * Invokes tasks in a thread pool, then unwraps the resulting {@link Future Futures}. |
| * |
| * <p>Any task exception is wrapped in {@link IOException}. |
| */ |
| private <T> List<T> callTasks(Collection<Callable<T>> tasks) throws IOException { |
| |
| try { |
| List<CompletionStage<T>> futures = new ArrayList<>(tasks.size()); |
| for (Callable<T> task : tasks) { |
| futures.add(MoreFutures.supplyAsync(task::call, executorService)); |
| } |
| return MoreFutures.get(MoreFutures.allAsList(futures)); |
| |
| } catch (ExecutionException e) { |
| if (e.getCause() != null) { |
| if (e.getCause() instanceof IOException) { |
| throw (IOException) e.getCause(); |
| } |
| throw new IOException(e.getCause()); |
| } |
| throw new IOException(e); |
| |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new IOException("executor service was interrupted"); |
| } |
| } |
| } |