| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.fs.s3.common.writer; |
| |
| import org.apache.flink.annotation.Internal; |
| import org.apache.flink.annotation.VisibleForTesting; |
| import org.apache.flink.fs.s3.common.utils.RefCountedFSOutputStream; |
| |
| import com.amazonaws.services.s3.model.PartETag; |
| import com.amazonaws.services.s3.model.UploadPartResult; |
| |
| import javax.annotation.Nullable; |
| import javax.annotation.concurrent.NotThreadSafe; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.ArrayDeque; |
| import java.util.ArrayList; |
| import java.util.Deque; |
| import java.util.List; |
| import java.util.Optional; |
| import java.util.UUID; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Executor; |
| |
| import static org.apache.flink.util.Preconditions.checkArgument; |
| import static org.apache.flink.util.Preconditions.checkNotNull; |
| import static org.apache.flink.util.Preconditions.checkState; |
| |
| /** |
| * An uploader for parts of a multipart upload. The uploader can snapshot its state to |
| * be recovered after a failure. |
| * |
| * <p><b>Note:</b> This class is NOT thread safe and relies on external synchronization. |
| * |
| * <p><b>Note:</b> If any of the methods to add parts throws an exception, this class may be |
| * in an inconsistent state (bookkeeping wise) and should be discarded and recovered. |
| */ |
| @Internal |
| @NotThreadSafe |
| final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload { |
| |
| private final S3AccessHelper s3AccessHelper; |
| |
| private final Executor uploadThreadPool; |
| |
| private final Deque<CompletableFuture<PartETag>> uploadsInProgress; |
| |
| private final String namePrefixForTempObjects; |
| |
| private final MultiPartUploadInfo currentUploadInfo; |
| |
| // ------------------------------------------------------------------------ |
| |
| private RecoverableMultiPartUploadImpl( |
| S3AccessHelper s3AccessHelper, |
| Executor uploadThreadPool, |
| String uploadId, |
| String objectName, |
| List<PartETag> partsSoFar, |
| long numBytes, |
| Optional<File> incompletePart |
| ) { |
| checkArgument(numBytes >= 0L); |
| |
| this.s3AccessHelper = checkNotNull(s3AccessHelper); |
| this.uploadThreadPool = checkNotNull(uploadThreadPool); |
| this.currentUploadInfo = new MultiPartUploadInfo(objectName, uploadId, partsSoFar, numBytes, incompletePart); |
| this.namePrefixForTempObjects = createIncompletePartObjectNamePrefix(objectName); |
| this.uploadsInProgress = new ArrayDeque<>(); |
| } |
| |
| /** |
| * Adds a part to the uploads without any size limitations. |
| * |
| * <p>This method is non-blocking and does not wait for the part upload to complete. |
| * |
| * @param file The file with the part data. |
| * |
| * @throws IOException If this method throws an exception, the RecoverableS3MultiPartUpload |
| * should not be used any more, but recovered instead. |
| */ |
| @Override |
| public void uploadPart(RefCountedFSOutputStream file) throws IOException { |
| // this is to guarantee that nobody is |
| // writing to the file we are uploading. |
| checkState(file.isClosed()); |
| |
| final CompletableFuture<PartETag> future = new CompletableFuture<>(); |
| uploadsInProgress.add(future); |
| |
| final long partLength = file.getPos(); |
| currentUploadInfo.registerNewPart(partLength); |
| |
| file.retain(); // keep the file while the async upload still runs |
| uploadThreadPool.execute(new UploadTask(s3AccessHelper, currentUploadInfo, file, future)); |
| } |
| |
| @Override |
| public Optional<File> getIncompletePart() { |
| return currentUploadInfo.getIncompletePart(); |
| } |
| |
| @Override |
| public S3Committer snapshotAndGetCommitter() throws IOException { |
| final S3Recoverable snapshot = snapshotAndGetRecoverable(null); |
| |
| return new S3Committer( |
| s3AccessHelper, |
| snapshot.getObjectName(), |
| snapshot.uploadId(), |
| snapshot.parts(), |
| snapshot.numBytesInParts()); |
| } |
| |
| /** |
| * Creates a snapshot of this MultiPartUpload, from which the upload can be resumed. |
| * |
| * <p>Data buffered locally which is less than |
| * {@link org.apache.flink.fs.s3.common.FlinkS3FileSystem#S3_MULTIPART_MIN_PART_SIZE S3_MULTIPART_MIN_PART_SIZE}, |
| * and cannot be uploaded as part of the MPU and set to S3 as independent objects. |
| * |
| * <p>This implementation currently blocks until all part uploads are complete and returns |
| * a completed future. |
| */ |
| @Override |
| public S3Recoverable snapshotAndGetRecoverable(@Nullable final RefCountedFSOutputStream incompletePartFile) throws IOException { |
| |
| final String incompletePartObjectName = safelyUploadSmallPart(incompletePartFile); |
| |
| // make sure all other uploads are complete |
| // this currently makes the method blocking, |
| // to be made non-blocking in the future |
| awaitPendingPartsUpload(); |
| |
| final String objectName = currentUploadInfo.getObjectName(); |
| final String uploadId = currentUploadInfo.getUploadId(); |
| final List<PartETag> completedParts = currentUploadInfo.getCopyOfEtagsOfCompleteParts(); |
| final long sizeInBytes = currentUploadInfo.getExpectedSizeInBytes(); |
| |
| if (incompletePartObjectName == null) { |
| return new S3Recoverable(objectName, uploadId, completedParts, sizeInBytes); |
| } else { |
| return new S3Recoverable(objectName, uploadId, completedParts, sizeInBytes, incompletePartObjectName, incompletePartFile.getPos()); |
| } |
| } |
| |
| @Nullable |
| private String safelyUploadSmallPart(@Nullable RefCountedFSOutputStream file) throws IOException { |
| |
| if (file == null || file.getPos() == 0L) { |
| return null; |
| } |
| |
| // first, upload the trailing data file. during that time, other in-progress uploads may complete. |
| final String incompletePartObjectName = createIncompletePartObjectName(); |
| file.retain(); |
| try { |
| s3AccessHelper.putObject(incompletePartObjectName, file.getInputFile()); |
| } |
| finally { |
| file.release(); |
| } |
| return incompletePartObjectName; |
| } |
| |
| // ------------------------------------------------------------------------ |
| // utils |
| // ------------------------------------------------------------------------ |
| |
| @VisibleForTesting |
| static String createIncompletePartObjectNamePrefix(String objectName) { |
| checkNotNull(objectName); |
| |
| final int lastSlash = objectName.lastIndexOf('/'); |
| final String parent; |
| final String child; |
| |
| if (lastSlash == -1) { |
| parent = ""; |
| child = objectName; |
| } else { |
| parent = objectName.substring(0, lastSlash + 1); |
| child = objectName.substring(lastSlash + 1); |
| } |
| return parent + (child.isEmpty() ? "" : '_') + child + "_tmp_"; |
| } |
| |
| private String createIncompletePartObjectName() { |
| return namePrefixForTempObjects + UUID.randomUUID().toString(); |
| } |
| |
| private void awaitPendingPartsUpload() throws IOException { |
| checkState(currentUploadInfo.getRemainingParts() == uploadsInProgress.size()); |
| |
| while (currentUploadInfo.getRemainingParts() > 0) { |
| CompletableFuture<PartETag> next = uploadsInProgress.peekFirst(); |
| PartETag nextPart = awaitPendingPartUploadToComplete(next); |
| currentUploadInfo.registerCompletePart(nextPart); |
| uploadsInProgress.removeFirst(); |
| } |
| } |
| |
| private PartETag awaitPendingPartUploadToComplete(CompletableFuture<PartETag> upload) throws IOException { |
| final PartETag completedUploadEtag; |
| try { |
| completedUploadEtag = upload.get(); |
| } |
| catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new IOException("Interrupted while waiting for part uploads to complete"); |
| } |
| catch (ExecutionException e) { |
| throw new IOException("Uploading parts failed", e.getCause()); |
| } |
| return completedUploadEtag; |
| } |
| |
| // ------------------------------------------------------------------------ |
| // factory methods |
| // ------------------------------------------------------------------------ |
| |
| public static RecoverableMultiPartUploadImpl newUpload( |
| final S3AccessHelper s3AccessHelper, |
| final Executor uploadThreadPool, |
| final String objectName) throws IOException { |
| |
| final String multiPartUploadId = s3AccessHelper.startMultiPartUpload(objectName); |
| |
| return new RecoverableMultiPartUploadImpl( |
| s3AccessHelper, |
| uploadThreadPool, |
| multiPartUploadId, |
| objectName, |
| new ArrayList<>(), |
| 0L, |
| Optional.empty()); |
| } |
| |
| public static RecoverableMultiPartUploadImpl recoverUpload( |
| final S3AccessHelper s3AccessHelper, |
| final Executor uploadThreadPool, |
| final String multipartUploadId, |
| final String objectName, |
| final List<PartETag> partsSoFar, |
| final long numBytesSoFar, |
| final Optional<File> incompletePart) { |
| |
| return new RecoverableMultiPartUploadImpl( |
| s3AccessHelper, |
| uploadThreadPool, |
| multipartUploadId, |
| objectName, |
| new ArrayList<>(partsSoFar), |
| numBytesSoFar, |
| incompletePart); |
| |
| } |
| |
| // ------------------------------------------------------------------------ |
| // factory methods |
| // ------------------------------------------------------------------------ |
| |
| private static class UploadTask implements Runnable { |
| |
| private final S3AccessHelper s3AccessHelper; |
| |
| private final String objectName; |
| |
| private final String uploadId; |
| |
| private final int partNumber; |
| |
| private final RefCountedFSOutputStream file; |
| |
| private final CompletableFuture<PartETag> future; |
| |
| UploadTask( |
| final S3AccessHelper s3AccessHelper, |
| final MultiPartUploadInfo currentUpload, |
| final RefCountedFSOutputStream file, |
| final CompletableFuture<PartETag> future) { |
| |
| checkNotNull(currentUpload); |
| |
| this.objectName = currentUpload.getObjectName(); |
| this.uploadId = currentUpload.getUploadId(); |
| this.partNumber = currentUpload.getNumberOfRegisteredParts(); |
| |
| // these are limits put by Amazon |
| checkArgument(partNumber >= 1 && partNumber <= 10_000); |
| |
| this.s3AccessHelper = checkNotNull(s3AccessHelper); |
| this.file = checkNotNull(file); |
| this.future = checkNotNull(future); |
| } |
| |
| @Override |
| public void run() { |
| try { |
| final UploadPartResult result = s3AccessHelper.uploadPart(objectName, uploadId, partNumber, file.getInputFile(), file.getPos()); |
| future.complete(new PartETag(result.getPartNumber(), result.getETag())); |
| file.release(); |
| } |
| catch (Throwable t) { |
| future.completeExceptionally(t); |
| } |
| } |
| } |
| } |