| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.fs.s3.common.writer; |
| |
| import org.apache.flink.annotation.PublicEvolving; |
| import org.apache.flink.core.fs.RecoverableFsDataOutputStream; |
| import org.apache.flink.core.fs.RecoverableWriter; |
| import org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream; |
| import org.apache.flink.fs.s3.common.utils.RefCountedFSOutputStream; |
| import org.apache.flink.fs.s3.common.utils.RefCountedFile; |
| import org.apache.flink.util.function.FunctionWithException; |
| |
| import org.apache.commons.io.IOUtils; |
| |
| import javax.annotation.concurrent.NotThreadSafe; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.Optional; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| import static org.apache.flink.fs.s3.common.FlinkS3FileSystem.S3_MULTIPART_MIN_PART_SIZE; |
| import static org.apache.flink.util.Preconditions.checkArgument; |
| import static org.apache.flink.util.Preconditions.checkNotNull; |
| |
| /** |
| * A RecoverableFsDataOutputStream to S3 that is based on a recoverable multipart upload. |
| * |
| * <p>This class is NOT thread-safe. Concurrent writes tho this stream result in corrupt or |
| * lost data. |
| * |
| * <p>The {@link #close()} method may be called concurrently when cancelling / shutting down. |
| * It will still ensure that local transient resources (like streams and temp files) are cleaned up, |
| * but will not touch data previously persisted in S3. |
| */ |
| @PublicEvolving |
| @NotThreadSafe |
| public final class S3RecoverableFsDataOutputStream extends RecoverableFsDataOutputStream { |
| |
| /** Lock that guards the critical sections when new parts are rolled over. |
| * Despite the class being declared not thread safe, we protect certain regions to |
| * at least enable concurrent close() calls during cancellation or abort/cleanup. */ |
| private final ReentrantLock lock = new ReentrantLock(); |
| |
| private final RecoverableMultiPartUpload upload; |
| |
| private final FunctionWithException<File, RefCountedFile, IOException> tmpFileProvider; |
| |
| /** |
| * The number of bytes at which we start a new part of the multipart upload. |
| * This has to be greater than the non-configurable minimum. That is equal to |
| * {@link org.apache.flink.fs.s3.common.FlinkS3FileSystem#S3_MULTIPART_MIN_PART_SIZE S3_MULTIPART_MIN_PART_SIZE} |
| * and is set by Amazon. |
| */ |
| private final long userDefinedMinPartSize; |
| |
| private RefCountedFSOutputStream fileStream; |
| |
| private long bytesBeforeCurrentPart; |
| |
| /** |
| * Single constructor to initialize all. Actual setup of the parts happens in the |
| * factory methods. |
| */ |
| S3RecoverableFsDataOutputStream( |
| RecoverableMultiPartUpload upload, |
| FunctionWithException<File, RefCountedFile, IOException> tempFileCreator, |
| RefCountedFSOutputStream initialTmpFile, |
| long userDefinedMinPartSize, |
| long bytesBeforeCurrentPart) { |
| |
| checkArgument(bytesBeforeCurrentPart >= 0L); |
| |
| this.upload = checkNotNull(upload); |
| this.tmpFileProvider = checkNotNull(tempFileCreator); |
| this.userDefinedMinPartSize = userDefinedMinPartSize; |
| |
| this.fileStream = initialTmpFile; |
| this.bytesBeforeCurrentPart = bytesBeforeCurrentPart; |
| } |
| |
| // ------------------------------------------------------------------------ |
| // stream methods |
| // ------------------------------------------------------------------------ |
| |
| @Override |
| public void write(int b) throws IOException { |
| fileStream.write(b); |
| } |
| |
| @Override |
| public void write(byte[] b, int off, int len) throws IOException { |
| fileStream.write(b, off, len); |
| openNewPartIfNecessary(userDefinedMinPartSize); |
| } |
| |
| @Override |
| public void flush() throws IOException { |
| fileStream.flush(); |
| openNewPartIfNecessary(userDefinedMinPartSize); |
| } |
| |
| @Override |
| public long getPos() throws IOException { |
| return bytesBeforeCurrentPart + fileStream.getPos(); |
| } |
| |
| @Override |
| public void sync() throws IOException { |
| fileStream.sync(); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| lock(); |
| try { |
| fileStream.flush(); |
| } finally { |
| IOUtils.closeQuietly(fileStream); |
| fileStream.release(); |
| unlock(); |
| } |
| } |
| |
| // ------------------------------------------------------------------------ |
| // recoverable stream methods |
| // ------------------------------------------------------------------------ |
| |
| @Override |
| public RecoverableWriter.ResumeRecoverable persist() throws IOException { |
| lock(); |
| try { |
| fileStream.flush(); |
| openNewPartIfNecessary(userDefinedMinPartSize); |
| |
| // We do not stop writing to the current file, we merely limit the upload to the |
| // first n bytes of the current file |
| |
| return upload.snapshotAndGetRecoverable(fileStream); |
| } |
| finally { |
| unlock(); |
| } |
| } |
| |
| @Override |
| public Committer closeForCommit() throws IOException { |
| lock(); |
| try { |
| closeAndUploadPart(); |
| return upload.snapshotAndGetCommitter(); |
| } |
| finally { |
| unlock(); |
| } |
| } |
| |
| // ------------------------------------------------------------------------ |
| // S3 |
| // ------------------------------------------------------------------------ |
| |
| private void openNewPartIfNecessary(long sizeThreshold) throws IOException { |
| final long fileLength = fileStream.getPos(); |
| if (fileLength >= sizeThreshold) { |
| lock(); |
| try { |
| uploadCurrentAndOpenNewPart(fileLength); |
| } finally { |
| unlock(); |
| } |
| } |
| } |
| |
| private void uploadCurrentAndOpenNewPart(long fileLength) throws IOException { |
| bytesBeforeCurrentPart += fileLength; |
| closeAndUploadPart(); |
| |
| // initialize a new temp file |
| fileStream = RefCountedBufferingFileStream.openNew(tmpFileProvider); |
| } |
| |
| private void closeAndUploadPart() throws IOException { |
| fileStream.flush(); |
| fileStream.close(); |
| if (fileStream.getPos() > 0L) { |
| upload.uploadPart(fileStream); |
| } |
| fileStream.release(); |
| } |
| |
| // ------------------------------------------------------------------------ |
| // locking |
| // ------------------------------------------------------------------------ |
| |
| private void lock() throws IOException { |
| try { |
| lock.lockInterruptibly(); |
| } |
| catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new IOException("interrupted"); |
| } |
| } |
| |
| private void unlock() { |
| lock.unlock(); |
| } |
| |
| // ------------------------------------------------------------------------ |
| // factory methods |
| // ------------------------------------------------------------------------ |
| |
| public static S3RecoverableFsDataOutputStream newStream( |
| final RecoverableMultiPartUpload upload, |
| final FunctionWithException<File, RefCountedFile, IOException> tmpFileCreator, |
| final long userDefinedMinPartSize) throws IOException { |
| |
| checkArgument(userDefinedMinPartSize >= S3_MULTIPART_MIN_PART_SIZE); |
| |
| final RefCountedBufferingFileStream fileStream = boundedBufferingFileStream(tmpFileCreator, Optional.empty()); |
| |
| return new S3RecoverableFsDataOutputStream( |
| upload, |
| tmpFileCreator, |
| fileStream, |
| userDefinedMinPartSize, |
| 0L); |
| } |
| |
| public static S3RecoverableFsDataOutputStream recoverStream( |
| final RecoverableMultiPartUpload upload, |
| final FunctionWithException<File, RefCountedFile, IOException> tmpFileCreator, |
| final long userDefinedMinPartSize, |
| final long bytesBeforeCurrentPart) throws IOException { |
| |
| checkArgument(userDefinedMinPartSize >= S3_MULTIPART_MIN_PART_SIZE); |
| |
| final RefCountedBufferingFileStream fileStream = boundedBufferingFileStream( |
| tmpFileCreator, |
| upload.getIncompletePart()); |
| |
| return new S3RecoverableFsDataOutputStream( |
| upload, |
| tmpFileCreator, |
| fileStream, |
| userDefinedMinPartSize, |
| bytesBeforeCurrentPart); |
| } |
| |
| private static RefCountedBufferingFileStream boundedBufferingFileStream( |
| final FunctionWithException<File, RefCountedFile, IOException> tmpFileCreator, |
| final Optional<File> incompletePart) throws IOException { |
| |
| if (!incompletePart.isPresent()) { |
| return RefCountedBufferingFileStream.openNew(tmpFileCreator); |
| } |
| |
| final File file = incompletePart.get(); |
| return RefCountedBufferingFileStream.restore(tmpFileCreator, file); |
| } |
| } |