blob: 0d0998a0620026f432448666e57d8b5f0b168a61 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.fs.s3.common.writer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.fs.s3.common.utils.RefCountedFSOutputStream;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.UploadPartResult;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import java.io.File;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
/**
* An uploader for parts of a multipart upload. The uploader can snapshot its state to
* be recovered after a failure.
*
* <p><b>Note:</b> This class is NOT thread safe and relies on external synchronization.
*
* <p><b>Note:</b> If any of the methods to add parts throws an exception, this class may be
* in an inconsistent state (bookkeeping wise) and should be discarded and recovered.
*/
@Internal
@NotThreadSafe
final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload {
private final S3AccessHelper s3AccessHelper;
private final Executor uploadThreadPool;
private final Deque<CompletableFuture<PartETag>> uploadsInProgress;
private final String namePrefixForTempObjects;
private final MultiPartUploadInfo currentUploadInfo;
// ------------------------------------------------------------------------
private RecoverableMultiPartUploadImpl(
S3AccessHelper s3AccessHelper,
Executor uploadThreadPool,
String uploadId,
String objectName,
List<PartETag> partsSoFar,
long numBytes,
Optional<File> incompletePart
) {
checkArgument(numBytes >= 0L);
this.s3AccessHelper = checkNotNull(s3AccessHelper);
this.uploadThreadPool = checkNotNull(uploadThreadPool);
this.currentUploadInfo = new MultiPartUploadInfo(objectName, uploadId, partsSoFar, numBytes, incompletePart);
this.namePrefixForTempObjects = createIncompletePartObjectNamePrefix(objectName);
this.uploadsInProgress = new ArrayDeque<>();
}
/**
* Adds a part to the uploads without any size limitations.
*
* <p>This method is non-blocking and does not wait for the part upload to complete.
*
* @param file The file with the part data.
*
* @throws IOException If this method throws an exception, the RecoverableS3MultiPartUpload
* should not be used any more, but recovered instead.
*/
@Override
public void uploadPart(RefCountedFSOutputStream file) throws IOException {
// this is to guarantee that nobody is
// writing to the file we are uploading.
checkState(file.isClosed());
final CompletableFuture<PartETag> future = new CompletableFuture<>();
uploadsInProgress.add(future);
final long partLength = file.getPos();
currentUploadInfo.registerNewPart(partLength);
file.retain(); // keep the file while the async upload still runs
uploadThreadPool.execute(new UploadTask(s3AccessHelper, currentUploadInfo, file, future));
}
@Override
public Optional<File> getIncompletePart() {
return currentUploadInfo.getIncompletePart();
}
@Override
public S3Committer snapshotAndGetCommitter() throws IOException {
final S3Recoverable snapshot = snapshotAndGetRecoverable(null);
return new S3Committer(
s3AccessHelper,
snapshot.getObjectName(),
snapshot.uploadId(),
snapshot.parts(),
snapshot.numBytesInParts());
}
/**
* Creates a snapshot of this MultiPartUpload, from which the upload can be resumed.
*
* <p>Data buffered locally which is less than
* {@link org.apache.flink.fs.s3.common.FlinkS3FileSystem#S3_MULTIPART_MIN_PART_SIZE S3_MULTIPART_MIN_PART_SIZE},
* and cannot be uploaded as part of the MPU and set to S3 as independent objects.
*
* <p>This implementation currently blocks until all part uploads are complete and returns
* a completed future.
*/
@Override
public S3Recoverable snapshotAndGetRecoverable(@Nullable final RefCountedFSOutputStream incompletePartFile) throws IOException {
final String incompletePartObjectName = safelyUploadSmallPart(incompletePartFile);
// make sure all other uploads are complete
// this currently makes the method blocking,
// to be made non-blocking in the future
awaitPendingPartsUpload();
final String objectName = currentUploadInfo.getObjectName();
final String uploadId = currentUploadInfo.getUploadId();
final List<PartETag> completedParts = currentUploadInfo.getCopyOfEtagsOfCompleteParts();
final long sizeInBytes = currentUploadInfo.getExpectedSizeInBytes();
if (incompletePartObjectName == null) {
return new S3Recoverable(objectName, uploadId, completedParts, sizeInBytes);
} else {
return new S3Recoverable(objectName, uploadId, completedParts, sizeInBytes, incompletePartObjectName, incompletePartFile.getPos());
}
}
@Nullable
private String safelyUploadSmallPart(@Nullable RefCountedFSOutputStream file) throws IOException {
if (file == null || file.getPos() == 0L) {
return null;
}
// first, upload the trailing data file. during that time, other in-progress uploads may complete.
final String incompletePartObjectName = createIncompletePartObjectName();
file.retain();
try {
s3AccessHelper.putObject(incompletePartObjectName, file.getInputFile());
}
finally {
file.release();
}
return incompletePartObjectName;
}
// ------------------------------------------------------------------------
// utils
// ------------------------------------------------------------------------
@VisibleForTesting
static String createIncompletePartObjectNamePrefix(String objectName) {
checkNotNull(objectName);
final int lastSlash = objectName.lastIndexOf('/');
final String parent;
final String child;
if (lastSlash == -1) {
parent = "";
child = objectName;
} else {
parent = objectName.substring(0, lastSlash + 1);
child = objectName.substring(lastSlash + 1);
}
return parent + (child.isEmpty() ? "" : '_') + child + "_tmp_";
}
private String createIncompletePartObjectName() {
return namePrefixForTempObjects + UUID.randomUUID().toString();
}
private void awaitPendingPartsUpload() throws IOException {
checkState(currentUploadInfo.getRemainingParts() == uploadsInProgress.size());
while (currentUploadInfo.getRemainingParts() > 0) {
CompletableFuture<PartETag> next = uploadsInProgress.peekFirst();
PartETag nextPart = awaitPendingPartUploadToComplete(next);
currentUploadInfo.registerCompletePart(nextPart);
uploadsInProgress.removeFirst();
}
}
private PartETag awaitPendingPartUploadToComplete(CompletableFuture<PartETag> upload) throws IOException {
final PartETag completedUploadEtag;
try {
completedUploadEtag = upload.get();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while waiting for part uploads to complete");
}
catch (ExecutionException e) {
throw new IOException("Uploading parts failed", e.getCause());
}
return completedUploadEtag;
}
// ------------------------------------------------------------------------
// factory methods
// ------------------------------------------------------------------------
public static RecoverableMultiPartUploadImpl newUpload(
final S3AccessHelper s3AccessHelper,
final Executor uploadThreadPool,
final String objectName) throws IOException {
final String multiPartUploadId = s3AccessHelper.startMultiPartUpload(objectName);
return new RecoverableMultiPartUploadImpl(
s3AccessHelper,
uploadThreadPool,
multiPartUploadId,
objectName,
new ArrayList<>(),
0L,
Optional.empty());
}
public static RecoverableMultiPartUploadImpl recoverUpload(
final S3AccessHelper s3AccessHelper,
final Executor uploadThreadPool,
final String multipartUploadId,
final String objectName,
final List<PartETag> partsSoFar,
final long numBytesSoFar,
final Optional<File> incompletePart) {
return new RecoverableMultiPartUploadImpl(
s3AccessHelper,
uploadThreadPool,
multipartUploadId,
objectName,
new ArrayList<>(partsSoFar),
numBytesSoFar,
incompletePart);
}
// ------------------------------------------------------------------------
// factory methods
// ------------------------------------------------------------------------
private static class UploadTask implements Runnable {
private final S3AccessHelper s3AccessHelper;
private final String objectName;
private final String uploadId;
private final int partNumber;
private final RefCountedFSOutputStream file;
private final CompletableFuture<PartETag> future;
UploadTask(
final S3AccessHelper s3AccessHelper,
final MultiPartUploadInfo currentUpload,
final RefCountedFSOutputStream file,
final CompletableFuture<PartETag> future) {
checkNotNull(currentUpload);
this.objectName = currentUpload.getObjectName();
this.uploadId = currentUpload.getUploadId();
this.partNumber = currentUpload.getNumberOfRegisteredParts();
// these are limits put by Amazon
checkArgument(partNumber >= 1 && partNumber <= 10_000);
this.s3AccessHelper = checkNotNull(s3AccessHelper);
this.file = checkNotNull(file);
this.future = checkNotNull(future);
}
@Override
public void run() {
try {
final UploadPartResult result = s3AccessHelper.uploadPart(objectName, uploadId, partNumber, file.getInputFile(), file.getPos());
future.complete(new PartETag(result.getPartNumber(), result.getETag()));
file.release();
}
catch (Throwable t) {
future.completeExceptionally(t);
}
}
}
}