blob: bd449622906f1f0d36d9c42d7ab6288e33e031a3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.cosn;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.io.IOException;
import java.io.OutputStream;
import java.security.DigestOutputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import com.qcloud.cos.model.PartETag;
import org.apache.hadoop.conf.Configuration;
/**
* The output stream for the COS blob store.
* Implement streaming upload to COS based on the multipart upload function.
* ( the maximum size of each part is 5GB)
* Support up to 40TB single file by multipart upload (each part is 5GB).
* Improve the upload performance of writing large files by using byte buffers
* and a fixed thread pool.
*/
public class CosNOutputStream extends OutputStream {
private static final Logger LOG =
LoggerFactory.getLogger(CosNOutputStream.class);
private final Configuration conf;
private final NativeFileSystemStore store;
private MessageDigest digest;
private long blockSize;
private String key;
private int currentBlockId = 0;
private Set<ByteBufferWrapper> blockCacheBuffers = new HashSet<>();
private ByteBufferWrapper currentBlockBuffer;
private OutputStream currentBlockOutputStream;
private String uploadId = null;
private ListeningExecutorService executorService;
private List<ListenableFuture<PartETag>> etagList = new LinkedList<>();
private int blockWritten = 0;
private boolean closed = false;
public CosNOutputStream(Configuration conf, NativeFileSystemStore store,
String key, long blockSize, ExecutorService executorService)
throws IOException {
this.conf = conf;
this.store = store;
this.key = key;
this.blockSize = blockSize;
if (this.blockSize < Constants.MIN_PART_SIZE) {
LOG.warn(
String.format(
"The minimum size of a single block is limited to %d.",
Constants.MIN_PART_SIZE));
this.blockSize = Constants.MIN_PART_SIZE;
}
if (this.blockSize > Constants.MAX_PART_SIZE) {
LOG.warn(
String.format(
"The maximum size of a single block is limited to %d.",
Constants.MAX_PART_SIZE));
this.blockSize = Constants.MAX_PART_SIZE;
}
// Use a blocking thread pool with fair scheduling
this.executorService = MoreExecutors.listeningDecorator(executorService);
try {
this.currentBlockBuffer =
BufferPool.getInstance().getBuffer((int) this.blockSize);
} catch (IOException e) {
throw new IOException("Getting a buffer size: "
+ String.valueOf(this.blockSize)
+ " from buffer pool occurs an exception: ", e);
}
try {
this.digest = MessageDigest.getInstance("MD5");
this.currentBlockOutputStream = new DigestOutputStream(
new ByteBufferOutputStream(this.currentBlockBuffer.getByteBuffer()),
this.digest);
} catch (NoSuchAlgorithmException e) {
this.digest = null;
this.currentBlockOutputStream =
new ByteBufferOutputStream(this.currentBlockBuffer.getByteBuffer());
}
}
@Override
public void flush() throws IOException {
this.currentBlockOutputStream.flush();
}
@Override
public synchronized void close() throws IOException {
if (this.closed) {
return;
}
this.currentBlockOutputStream.flush();
this.currentBlockOutputStream.close();
LOG.info("The output stream has been close, and "
+ "begin to upload the last block: [{}].", this.currentBlockId);
this.blockCacheBuffers.add(this.currentBlockBuffer);
if (this.blockCacheBuffers.size() == 1) {
byte[] md5Hash = this.digest == null ? null : this.digest.digest();
store.storeFile(this.key,
new ByteBufferInputStream(this.currentBlockBuffer.getByteBuffer()),
md5Hash, this.currentBlockBuffer.getByteBuffer().remaining());
} else {
PartETag partETag = null;
if (this.blockWritten > 0) {
LOG.info("Upload the last part..., blockId: [{}], written bytes: [{}]",
this.currentBlockId, this.blockWritten);
partETag = store.uploadPart(
new ByteBufferInputStream(currentBlockBuffer.getByteBuffer()),
key, uploadId, currentBlockId + 1,
currentBlockBuffer.getByteBuffer().remaining());
}
final List<PartETag> futurePartETagList = this.waitForFinishPartUploads();
if (null == futurePartETagList) {
throw new IOException("Failed to multipart upload to cos, abort it.");
}
List<PartETag> tmpPartEtagList = new LinkedList<>(futurePartETagList);
if (null != partETag) {
tmpPartEtagList.add(partETag);
}
store.completeMultipartUpload(this.key, this.uploadId, tmpPartEtagList);
}
try {
BufferPool.getInstance().returnBuffer(this.currentBlockBuffer);
} catch (InterruptedException e) {
LOG.error("An exception occurred "
+ "while returning the buffer to the buffer pool.", e);
}
LOG.info("The outputStream for key: [{}] has been uploaded.", key);
this.blockWritten = 0;
this.closed = true;
}
private List<PartETag> waitForFinishPartUploads() throws IOException {
try {
LOG.info("Wait for all parts to finish their uploading.");
return Futures.allAsList(this.etagList).get();
} catch (InterruptedException e) {
LOG.error("Interrupt the part upload.", e);
return null;
} catch (ExecutionException e) {
LOG.error("Cancelling futures.");
for (ListenableFuture<PartETag> future : this.etagList) {
future.cancel(true);
}
(store).abortMultipartUpload(this.key, this.uploadId);
LOG.error("Multipart upload with id: [{}] to COS key: [{}]",
this.uploadId, this.key, e);
throw new IOException("Multipart upload with id: "
+ this.uploadId + " to " + this.key, e);
}
}
private void uploadPart() throws IOException {
this.currentBlockOutputStream.flush();
this.currentBlockOutputStream.close();
this.blockCacheBuffers.add(this.currentBlockBuffer);
if (this.currentBlockId == 0) {
uploadId = (store).getUploadId(key);
}
ListenableFuture<PartETag> partETagListenableFuture =
this.executorService.submit(
new Callable<PartETag>() {
private final ByteBufferWrapper buf = currentBlockBuffer;
private final String localKey = key;
private final String localUploadId = uploadId;
private final int blockId = currentBlockId;
@Override
public PartETag call() throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("{} is uploading a part.",
Thread.currentThread().getName());
}
PartETag partETag = (store).uploadPart(
new ByteBufferInputStream(this.buf.getByteBuffer()),
this.localKey, this.localUploadId,
this.blockId + 1, this.buf.getByteBuffer().remaining());
BufferPool.getInstance().returnBuffer(this.buf);
return partETag;
}
});
this.etagList.add(partETagListenableFuture);
try {
this.currentBlockBuffer =
BufferPool.getInstance().getBuffer((int) this.blockSize);
} catch (IOException e) {
String errMsg = String.format("Getting a buffer [size:%d] from "
+ "the buffer pool failed.", this.blockSize);
throw new IOException(errMsg, e);
}
this.currentBlockId++;
if (null != this.digest) {
this.digest.reset();
this.currentBlockOutputStream = new DigestOutputStream(
new ByteBufferOutputStream(this.currentBlockBuffer.getByteBuffer()),
this.digest);
} else {
this.currentBlockOutputStream =
new ByteBufferOutputStream(this.currentBlockBuffer.getByteBuffer());
}
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
if (this.closed) {
throw new IOException("block stream has been closed.");
}
while (len > 0) {
long writeBytes;
if (this.blockWritten + len > this.blockSize) {
writeBytes = this.blockSize - this.blockWritten;
} else {
writeBytes = len;
}
this.currentBlockOutputStream.write(b, off, (int) writeBytes);
this.blockWritten += writeBytes;
if (this.blockWritten >= this.blockSize) {
this.uploadPart();
this.blockWritten = 0;
}
len -= writeBytes;
off += writeBytes;
}
}
@Override
public void write(byte[] b) throws IOException {
this.write(b, 0, b.length);
}
@Override
public void write(int b) throws IOException {
if (this.closed) {
throw new IOException("block stream has been closed.");
}
byte[] singleBytes = new byte[1];
singleBytes[0] = (byte) b;
this.currentBlockOutputStream.write(singleBytes, 0, 1);
this.blockWritten += 1;
if (this.blockWritten >= this.blockSize) {
this.uploadPart();
this.blockWritten = 0;
}
}
}