Lazily open parts during LocalBlobStore complete MPU
This removes a previous workaround for opening too many
FileInputStream and exhausting rlimits.
diff --git a/blobstore/src/main/java/org/jclouds/blobstore/config/LocalBlobStore.java b/blobstore/src/main/java/org/jclouds/blobstore/config/LocalBlobStore.java
index c50a405..abb7bd0 100644
--- a/blobstore/src/main/java/org/jclouds/blobstore/config/LocalBlobStore.java
+++ b/blobstore/src/main/java/org/jclouds/blobstore/config/LocalBlobStore.java
@@ -28,7 +28,6 @@
import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive;
import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
@@ -731,14 +730,8 @@
Payload payload;
try {
InputStream is = blob.getPayload().openStream();
- if (is instanceof FileInputStream) {
- // except for FileInputStream since large MPU can open too many fds
- is.close();
- payload = blob.getPayload();
- } else {
- blob.resetPayload(/*release=*/ false);
- payload = new InputStreamPayload(is);
- }
+ blob.resetPayload(/*release=*/ false);
+ payload = new InputStreamPayload(is);
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
@@ -825,16 +818,16 @@
@Override
public String completeMultipartUpload(MultipartUpload mpu, List<MultipartPart> parts) {
- ImmutableList.Builder<Blob> blobs = ImmutableList.builder();
+ ImmutableList.Builder<BlobMetadata> metas = ImmutableList.builder();
long contentLength = 0;
Hasher md5Hasher = Hashing.md5().newHasher();
for (MultipartPart part : parts) {
- Blob blobPart = getBlob(mpu.containerName(), MULTIPART_PREFIX + mpu.id() + "-" + mpu.blobName() + "-" + part.partNumber());
- contentLength += blobPart.getMetadata().getContentMetadata().getContentLength();
- blobs.add(blobPart);
- if (blobPart.getMetadata().getETag() != null) {
- md5Hasher.putBytes(BaseEncoding.base16().lowerCase().decode(blobPart.getMetadata().getETag()));
+ BlobMetadata meta = blobMetadata(mpu.containerName(), MULTIPART_PREFIX + mpu.id() + "-" + mpu.blobName() + "-" + part.partNumber());
+ contentLength += meta.getContentMetadata().getContentLength();
+ metas.add(meta);
+ if (meta.getETag() != null) {
+ md5Hasher.putBytes(BaseEncoding.base16().lowerCase().decode(meta.getETag()));
}
}
String mpuETag = new StringBuilder("\"")
@@ -845,7 +838,7 @@
.toString();
PayloadBlobBuilder blobBuilder = blobBuilder(mpu.blobName())
.userMetadata(mpu.blobMetadata().getUserMetadata())
- .payload(new MultiBlobInputStream(blobs.build()))
+ .payload(new MultiBlobInputStream(this, metas.build()))
.contentLength(contentLength)
.eTag(mpuETag);
String cacheControl = mpu.blobMetadata().getContentMetadata().getCacheControl();
@@ -995,21 +988,24 @@
}
private static final class MultiBlobInputStream extends InputStream {
- private final Iterator<Blob> blobs;
+ private final BlobStore blobStore;
+ private final Iterator<BlobMetadata> metas;
private InputStream current;
- MultiBlobInputStream(List<Blob> blobs) {
- this.blobs = blobs.iterator();
+ MultiBlobInputStream(BlobStore blobStore, List<BlobMetadata> metas) {
+ this.blobStore = blobStore;
+ this.metas = metas.iterator();
}
@Override
public int read() throws IOException {
while (true) {
if (current == null) {
- if (!blobs.hasNext()) {
+ if (!metas.hasNext()) {
return -1;
}
- current = blobs.next().getPayload().openStream();
+ BlobMetadata meta = metas.next();
+ current = blobStore.getBlob(meta.getContainer(), meta.getName()).getPayload().openStream();
}
int result = current.read();
if (result == -1) {
@@ -1025,10 +1021,11 @@
public int read(byte[] b, int off, int len) throws IOException {
while (true) {
if (current == null) {
- if (!blobs.hasNext()) {
+ if (!metas.hasNext()) {
return -1;
}
- current = blobs.next().getPayload().openStream();
+ BlobMetadata meta = metas.next();
+ current = blobStore.getBlob(meta.getContainer(), meta.getName()).getPayload().openStream();
}
int result = current.read(b, off, len);
if (result == -1) {