blob: c7d338caf517f4d7f572cfe6292f432c061589a5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jclouds.openstack.swift.v1.blobstore;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Iterables.tryFind;
import static com.google.common.collect.Lists.transform;
import static org.jclouds.Constants.PROPERTY_USER_THREADS;
import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive;
import static org.jclouds.location.predicates.LocationPredicates.idEquals;
import static org.jclouds.openstack.swift.v1.options.PutOptions.Builder.metadata;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.RandomAccessFile;
import java.lang.reflect.Method;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.Resource;
import javax.inject.Inject;
import javax.inject.Named;
import org.jclouds.Constants;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.KeyNotFoundException;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.domain.BlobAccess;
import org.jclouds.blobstore.domain.BlobBuilder;
import org.jclouds.blobstore.domain.BlobMetadata;
import org.jclouds.blobstore.domain.ContainerAccess;
import org.jclouds.blobstore.domain.MultipartPart;
import org.jclouds.blobstore.domain.MultipartUpload;
import org.jclouds.blobstore.domain.MutableBlobMetadata;
import org.jclouds.blobstore.domain.PageSet;
import org.jclouds.blobstore.domain.StorageMetadata;
import org.jclouds.blobstore.domain.StorageType;
import org.jclouds.blobstore.domain.internal.BlobBuilderImpl;
import org.jclouds.blobstore.domain.internal.BlobImpl;
import org.jclouds.blobstore.domain.internal.PageSetImpl;
import org.jclouds.blobstore.functions.BlobToHttpGetOptions;
import org.jclouds.blobstore.options.CopyOptions;
import org.jclouds.blobstore.options.CreateContainerOptions;
import org.jclouds.blobstore.options.GetOptions;
import org.jclouds.blobstore.options.ListContainerOptions;
import org.jclouds.blobstore.options.PutOptions;
import org.jclouds.blobstore.strategy.ClearListStrategy;
import org.jclouds.blobstore.strategy.internal.MultipartUploadSlicingAlgorithm;
import org.jclouds.collect.Memoized;
import org.jclouds.domain.Location;
import org.jclouds.io.ContentMetadata;
import org.jclouds.io.Payload;
import org.jclouds.io.PayloadSlicer;
import org.jclouds.io.payloads.ByteSourcePayload;
import org.jclouds.logging.Logger;
import org.jclouds.openstack.swift.v1.SwiftApi;
import org.jclouds.openstack.swift.v1.blobstore.functions.ToBlobMetadata;
import org.jclouds.openstack.swift.v1.blobstore.functions.ToListContainerOptions;
import org.jclouds.openstack.swift.v1.blobstore.functions.ToResourceMetadata;
import org.jclouds.openstack.swift.v1.domain.Container;
import org.jclouds.openstack.swift.v1.domain.DeleteStaticLargeObjectResponse;
import org.jclouds.openstack.swift.v1.domain.ObjectList;
import org.jclouds.openstack.swift.v1.domain.Segment;
import org.jclouds.openstack.swift.v1.domain.SwiftObject;
import org.jclouds.openstack.swift.v1.features.BulkApi;
import org.jclouds.openstack.swift.v1.features.ObjectApi;
import org.jclouds.openstack.swift.v1.options.UpdateContainerOptions;
import org.jclouds.openstack.swift.v1.reference.SwiftHeaders;
import org.jclouds.util.Closeables2;
import com.google.common.annotations.Beta;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.net.HttpHeaders;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.AbstractModule;
import com.google.inject.Injector;
import com.google.inject.assistedinject.Assisted;
public class RegionScopedSwiftBlobStore implements BlobStore {
@Inject
protected RegionScopedSwiftBlobStore(Injector baseGraph, BlobStoreContext context, SwiftApi api,
@Memoized Supplier<Set<? extends Location>> locations, @Assisted String regionId,
PayloadSlicer slicer, @Named(PROPERTY_USER_THREADS) ListeningExecutorService userExecutor) {
checkNotNull(regionId, "regionId");
Optional<? extends Location> found = tryFind(locations.get(), idEquals(regionId));
checkArgument(found.isPresent(), "region %s not in %s", regionId, locations.get());
this.region = found.get();
this.regionId = regionId;
this.slicer = slicer;
this.toResourceMetadata = new ToResourceMetadata(found.get());
this.context = context;
this.api = api;
this.userExecutor = userExecutor;
// until we parameterize ClearListStrategy with a factory
this.clearList = baseGraph.createChildInjector(new AbstractModule() {
@Override
protected void configure() {
bind(BlobStore.class).toInstance(RegionScopedSwiftBlobStore.this);
}
}).getInstance(ClearListStrategy.class);
}
private final BlobStoreContext context;
private final ClearListStrategy clearList;
private final SwiftApi api;
private final Location region;
private final String regionId;
private final BlobToHttpGetOptions toGetOptions = new BlobToHttpGetOptions();
private final ToListContainerOptions toListContainerOptions = new ToListContainerOptions();
private final ToResourceMetadata toResourceMetadata;
protected final PayloadSlicer slicer;
protected final ListeningExecutorService userExecutor;
@Resource
protected Logger logger = Logger.NULL;
@Override
public Set<? extends Location> listAssignableLocations() {
return ImmutableSet.of(region);
}
@Override
public PageSet<? extends StorageMetadata> list() {
// TODO: there may eventually be >10k containers..
FluentIterable<StorageMetadata> containers = api.getContainerApi(regionId).list()
.transform(toResourceMetadata);
return new PageSetImpl<StorageMetadata>(containers, null);
}
@Override
public boolean containerExists(String container) {
Container val = api.getContainerApi(regionId).get(container);
containerCache.put(container, Optional.fromNullable(val));
return val != null;
}
@Override
public boolean createContainerInLocation(Location location, String container) {
return createContainerInLocation(location, container, CreateContainerOptions.NONE);
}
@Override
public boolean createContainerInLocation(Location location, String container, CreateContainerOptions options) {
checkArgument(location == null || location.equals(region), "location must be null or %s", region);
boolean containerCreated = api.getContainerApi(regionId).create(container, options.isPublicRead() ? ANYBODY_READ : BASIC_CONTAINER);
if (containerCreated) {
containerCache.put(container, Optional.fromNullable(api.getContainerApi(regionId).get(container)));
}
return containerCreated;
}
@Override
public ContainerAccess getContainerAccess(String name) {
Container container = api.getContainerApi(regionId).get(name);
if (container.getAnybodyRead().get()) {
return ContainerAccess.PUBLIC_READ;
} else {
return ContainerAccess.PRIVATE;
}
}
@Override
public void setContainerAccess(String name, ContainerAccess access) {
UpdateContainerOptions options = new UpdateContainerOptions();
if (access == ContainerAccess.PUBLIC_READ) {
options.anybodyRead();
} else {
options.headers(ImmutableMultimap.of(SwiftHeaders.CONTAINER_READ, SwiftHeaders.CONTAINER_ACL_PRIVATE));
}
api.getContainerApi(regionId).update(name, options);
}
private static final org.jclouds.openstack.swift.v1.options.CreateContainerOptions BASIC_CONTAINER = new org.jclouds.openstack.swift.v1.options.CreateContainerOptions();
private static final org.jclouds.openstack.swift.v1.options.CreateContainerOptions ANYBODY_READ = new org.jclouds.openstack.swift.v1.options.CreateContainerOptions()
.anybodyRead();
@Override
public PageSet<? extends StorageMetadata> list(String container) {
return list(container, ListContainerOptions.NONE);
}
@Override
public PageSet<? extends StorageMetadata> list(final String container, ListContainerOptions options) {
ObjectApi objectApi = api.getObjectApi(regionId, container);
ObjectList objects = objectApi.list(toListContainerOptions.apply(options));
if (objects == null) {
containerCache.put(container, Optional.<Container> absent());
return new PageSetImpl<StorageMetadata>(ImmutableList.<StorageMetadata> of(), null);
} else {
containerCache.put(container, Optional.of(objects.getContainer()));
List<? extends StorageMetadata> list = transform(objects, toBlobMetadata(container));
int limit = Optional.fromNullable(options.getMaxResults()).or(10000);
String marker = null;
if (!list.isEmpty() && list.size() == limit) {
marker = list.get(limit - 1).getName();
}
// TODO: we should probably deprecate this option
if (options.isDetailed()) {
list = transform(list, new Function<StorageMetadata, StorageMetadata>() {
@Override
public StorageMetadata apply(StorageMetadata input) {
if (input.getType() != StorageType.BLOB) {
return input;
}
return blobMetadata(container, input.getName());
}
});
}
return new PageSetImpl<StorageMetadata>(list, marker);
}
}
@Override
public boolean blobExists(String container, String name) {
return blobMetadata(container, name) != null;
}
@Override
public String putBlob(String container, Blob blob) {
return putBlob(container, blob, PutOptions.NONE);
}
@Override
public String putBlob(String container, Blob blob, PutOptions options) {
if (options.getBlobAccess() != BlobAccess.PRIVATE) {
throw new UnsupportedOperationException("blob access not supported by swift");
}
if (options.isMultipart()) {
return putMultipartBlob(container, blob, options);
}
ObjectApi objectApi = api.getObjectApi(regionId, container);
return objectApi.put(blob.getMetadata().getName(), blob.getPayload(), metadata(blob.getMetadata().getUserMetadata()));
}
@Override
public String copyBlob(String fromContainer, String fromName, String toContainer, String toName,
CopyOptions options) {
ObjectApi objectApi = api.getObjectApi(regionId, toContainer);
org.jclouds.openstack.swift.v1.options.CopyOptions swiftOptions = new org.jclouds.openstack.swift.v1.options.CopyOptions();
if (options.ifMatch() != null) {
swiftOptions.ifMatch(options.ifMatch());
}
if (options.ifNoneMatch() != null) {
throw new UnsupportedOperationException("Swift does not support ifNoneMatch");
}
if (options.ifModifiedSince() != null) {
swiftOptions.ifModifiedSince(options.ifModifiedSince());
}
if (options.ifUnmodifiedSince() != null) {
swiftOptions.ifUnmodifiedSince(options.ifUnmodifiedSince());
}
Map<String, String> systemMetadata = Maps.newHashMap();
ContentMetadata contentMetadata = options.contentMetadata();
Map<String, String> userMetadata = options.userMetadata();
if (contentMetadata != null || userMetadata != null) {
if (contentMetadata != null) {
String contentDisposition = contentMetadata.getContentDisposition();
if (contentDisposition != null) {
systemMetadata.put(HttpHeaders.CONTENT_DISPOSITION, contentDisposition);
}
String contentEncoding = contentMetadata.getContentEncoding();
if (contentEncoding != null) {
systemMetadata.put(HttpHeaders.CONTENT_ENCODING, contentEncoding);
}
String contentLanguage = contentMetadata.getContentLanguage();
if (contentLanguage != null) {
systemMetadata.put(HttpHeaders.CONTENT_LANGUAGE, contentLanguage);
}
String contentType = contentMetadata.getContentType();
if (contentType != null) {
systemMetadata.put(HttpHeaders.CONTENT_TYPE, contentType);
}
}
if (userMetadata == null) {
userMetadata = Maps.newHashMap();
}
} else {
SwiftObject metadata = api.getObjectApi(regionId, fromContainer).getWithoutBody(fromName);
if (metadata == null) {
throw new KeyNotFoundException(fromContainer, fromName, "Swift could not find the specified source key");
}
contentMetadata = metadata.getPayload().getContentMetadata();
String contentDisposition = contentMetadata.getContentDisposition();
if (contentDisposition != null) {
systemMetadata.put(HttpHeaders.CONTENT_DISPOSITION, contentDisposition);
}
String contentEncoding = contentMetadata.getContentEncoding();
if (contentEncoding != null) {
systemMetadata.put(HttpHeaders.CONTENT_ENCODING, contentEncoding);
}
String contentLanguage = contentMetadata.getContentLanguage();
if (contentLanguage != null) {
systemMetadata.put(HttpHeaders.CONTENT_LANGUAGE, contentLanguage);
}
String contentType = contentMetadata.getContentType();
if (contentType != null) {
systemMetadata.put(HttpHeaders.CONTENT_TYPE, contentType);
}
userMetadata = metadata.getMetadata();
}
objectApi.copy(toName, fromContainer, fromName, userMetadata, systemMetadata, swiftOptions);
// TODO: Swift copy object *appends* user metadata, does not overwrite
return objectApi.getWithoutBody(toName).getETag();
}
@Override
public BlobMetadata blobMetadata(String container, String name) {
SwiftObject object = api.getObjectApi(regionId, container).getWithoutBody(name);
if (object == null) {
return null;
}
return toBlobMetadata(container).apply(object);
}
@Override
public Blob getBlob(String container, String key) {
return getBlob(container, key, GetOptions.NONE);
}
@Override
public Blob getBlob(String container, String name, GetOptions options) {
ObjectApi objectApi = api.getObjectApi(regionId, container);
SwiftObject object = objectApi.get(name, toGetOptions.apply(options));
if (object == null) {
return null;
}
Blob blob = new BlobImpl(toBlobMetadata(container).apply(object));
blob.setPayload(object.getPayload());
blob.setAllHeaders(object.getHeaders());
return blob;
}
@Override
public void removeBlob(String container, String name) {
// Multipart objects have a manifest which points to subobjects. Normally
// deleting a object only deletes the manifest, leaving the subobjects.
// We first try a multipart delete and if that fails since the object is
// not an MPU we fall back to single-part delete.
DeleteStaticLargeObjectResponse response = api.getStaticLargeObjectApi(regionId, container).delete(name);
if (!response.status().equals("200 OK")) {
api.getObjectApi(regionId, container).delete(name);
}
}
/**
* Delete multiple single-part objects. Note that this does not remove the
* subobjects of a multi-part upload.
*/
@Override
public void removeBlobs(String container, Iterable<String> names) {
BulkApi bulkApi = api.getBulkApi(regionId);
for (List<String> partition : Iterables.partition(names, 1000)) {
ImmutableList.Builder<String> builder = ImmutableList.builder();
for (String name : partition) {
builder.add(container + "/" + name);
}
bulkApi.bulkDelete(builder.build());
}
}
@Override
public BlobAccess getBlobAccess(String container, String name) {
return BlobAccess.PRIVATE;
}
@Override
public void setBlobAccess(String container, String name, BlobAccess access) {
throw new UnsupportedOperationException("unsupported in swift");
}
@Override
public BlobStoreContext getContext() {
return context;
}
@Override
public BlobBuilder blobBuilder(String name) {
return new BlobBuilderImpl().name(name);
}
@Override
public boolean directoryExists(String containerName, String directory) {
return api.getObjectApi(regionId, containerName)
.get(directory) != null;
}
@Override
public void createDirectory(String containerName, String directory) {
api.getObjectApi(regionId, containerName)
.put(directory, directoryPayload);
}
private final Payload directoryPayload = new ByteSourcePayload(ByteSource.wrap(new byte[] {})) {
{
getContentMetadata().setContentType("application/directory");
}
};
@Override
public void deleteDirectory(String containerName, String directory) {
api.getObjectApi(regionId, containerName).delete(directory);
}
@Override
public long countBlobs(String containerName) {
Container container = api.getContainerApi(regionId).get(containerName);
// undefined if container doesn't exist, so default to zero
return container != null && container.getObjectCount() != null ? container.getObjectCount() : 0;
}
@Override
public MultipartUpload initiateMultipartUpload(String container, BlobMetadata blobMetadata, PutOptions options) {
if (options.getBlobAccess() != BlobAccess.PRIVATE) {
throw new UnsupportedOperationException("blob ACLs not supported in swift");
}
return initiateMultipartUpload(container, blobMetadata, 0, options);
}
private MultipartUpload initiateMultipartUpload(String container, BlobMetadata blobMetadata, long partSize, PutOptions options) {
Long contentLength = blobMetadata.getContentMetadata().getContentLength();
String uploadId = String.format(Locale.ENGLISH, "%s/slo/%.6f/%s/%s", blobMetadata.getName(),
System.currentTimeMillis() / 1000.0, contentLength == null ? Long.valueOf(0) : contentLength,
partSize);
return MultipartUpload.create(container, blobMetadata.getName(), uploadId, blobMetadata, options);
}
@Override
public void abortMultipartUpload(MultipartUpload mpu) {
ImmutableList.Builder<String> names = ImmutableList.builder();
for (MultipartPart part : listMultipartUpload(mpu)) {
names.add(getMPUPartName(mpu, part.partNumber()));
}
removeBlobs(mpu.containerName(), names.build());
}
private ImmutableMap<String, String> getContentMetadataForManifest(ContentMetadata contentMetadata) {
Builder<String, String> mapBuilder = ImmutableMap.builder();
if (contentMetadata.getContentType() != null) {
mapBuilder.put("content-type", contentMetadata.getContentType());
}
/**
* Do not set content-length. Set automatically to manifest json string length by BindToJsonPayload
*/
if (contentMetadata.getContentDisposition() != null) {
mapBuilder.put("content-disposition", contentMetadata.getContentDisposition());
}
if (contentMetadata.getContentEncoding() != null) {
mapBuilder.put("content-encoding", contentMetadata.getContentEncoding());
}
if (contentMetadata.getContentLanguage() != null) {
mapBuilder.put("content-language", contentMetadata.getContentLanguage());
}
return mapBuilder.build();
}
private String getMPUPartName(MultipartUpload mpu, int partNumber) {
return String.format("%s/%08d", mpu.id(), partNumber);
}
@Override
public String completeMultipartUpload(MultipartUpload mpu, List<MultipartPart> parts) {
ImmutableList.Builder<Segment> builder = ImmutableList.builder();
for (MultipartPart part : parts) {
String path = mpu.containerName() + "/" + getMPUPartName(mpu, part.partNumber());
builder.add(Segment.builder().path(path).etag(part.partETag()).sizeBytes(part.partSize()).build());
}
return api.getStaticLargeObjectApi(regionId, mpu.containerName()).replaceManifest(mpu.blobName(),
builder.build(), mpu.blobMetadata().getUserMetadata(), getContentMetadataForManifest(mpu.blobMetadata().getContentMetadata()));
}
@Override
public MultipartPart uploadMultipartPart(MultipartUpload mpu, int partNumber, Payload payload) {
String partName = getMPUPartName(mpu, partNumber);
String eTag = api.getObjectApi(regionId, mpu.containerName()).put(partName, payload);
long partSize = payload.getContentMetadata().getContentLength();
Date lastModified = null; // Swift does not return Last-Modified
return MultipartPart.create(partNumber, partSize, eTag, lastModified);
}
@Override
public List<MultipartPart> listMultipartUpload(MultipartUpload mpu) {
ImmutableList.Builder<MultipartPart> parts = ImmutableList.builder();
PageSet<? extends StorageMetadata> pageSet = list(mpu.containerName(),
new ListContainerOptions().prefix(mpu.id() + "/"));
// TODO: pagination
for (StorageMetadata sm : pageSet) {
int lastSlash = sm.getName().lastIndexOf('/');
int partNumber = Integer.parseInt(sm.getName().substring(lastSlash + 1));
parts.add(MultipartPart.create(partNumber, sm.getSize(), sm.getETag(), sm.getLastModified()));
}
return parts.build();
}
@Override
public List<MultipartUpload> listMultipartUploads(String container) {
throw new UnsupportedOperationException();
}
@Override
public long getMinimumMultipartPartSize() {
return 1024 * 1024 + 1;
}
@Override
public long getMaximumMultipartPartSize() {
return 5L * 1024L * 1024L * 1024L;
}
@Override
public int getMaximumNumberOfParts() {
return Integer.MAX_VALUE;
}
@Override
public void clearContainer(String containerName) {
clearContainer(containerName, recursive());
}
@Override
public void clearContainer(String containerName, ListContainerOptions options) {
// this could be implemented to use bulk delete
clearList.execute(containerName, options);
}
@Override
public void deleteContainer(String container) {
clearContainer(container, recursive());
api.getContainerApi(regionId).deleteIfEmpty(container);
containerCache.invalidate(container);
}
@Override
public boolean deleteContainerIfEmpty(String container) {
boolean deleted = api.getContainerApi(regionId).deleteIfEmpty(container);
if (deleted) {
containerCache.invalidate(container);
}
return deleted;
}
protected final LoadingCache<String, Optional<Container>> containerCache = CacheBuilder.newBuilder().build(
new CacheLoader<String, Optional<Container>>() {
public Optional<Container> load(String container) {
return Optional.fromNullable(api.getContainerApi(regionId).get(container));
}
});
protected Function<SwiftObject, MutableBlobMetadata> toBlobMetadata(String container) {
return new ToBlobMetadata(containerCache.getUnchecked(container).get());
}
@Override
public long countBlobs(String containerName, ListContainerOptions options) {
throw new UnsupportedOperationException();
}
@com.google.inject.Inject(optional = true)
@Named(Constants.PROPERTY_MAX_RETRIES)
protected int retryCountLimit = 5;
/**
* Upload using a user-provided executor, or the jclouds userExecutor
*
* @param container
* @param blob
* @param overrides
* @return the multipart blob etag
*/
@Beta
protected String putMultipartBlob(String container, Blob blob, PutOptions overrides) {
if (overrides.getUseCustomExecutor()) {
return putMultipartBlob(container, blob, overrides, overrides.getCustomExecutor());
} else {
return putMultipartBlob(container, blob, overrides, userExecutor);
}
}
@Beta
protected String putMultipartBlob(String container, Blob blob, PutOptions overrides, ListeningExecutorService executor) {
ArrayList<ListenableFuture<MultipartPart>> parts = new ArrayList<ListenableFuture<MultipartPart>>();
long contentLength = checkNotNull(blob.getMetadata().getContentMetadata().getContentLength(),
"must provide content-length to use multi-part upload");
MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm(
getMinimumMultipartPartSize(), getMaximumMultipartPartSize(), getMaximumNumberOfParts());
long partSize = algorithm.calculateChunkSize(contentLength);
MultipartUpload mpu = initiateMultipartUpload(container, blob.getMetadata(), partSize, overrides);
int partNumber = 0;
for (Payload payload : slicer.slice(blob.getPayload(), partSize)) {
BlobUploader b =
new BlobUploader(mpu, partNumber++, payload);
parts.add(executor.submit(b));
}
return completeMultipartUpload(mpu, Futures.getUnchecked(Futures.allAsList(parts)));
}
private final class BlobUploader implements Callable<MultipartPart> {
private final MultipartUpload mpu;
private final int partNumber;
private final Payload payload;
BlobUploader(MultipartUpload mpu, int partNumber, Payload payload) {
this.mpu = mpu;
this.partNumber = partNumber;
this.payload = payload;
}
@Override
public MultipartPart call() {
return uploadMultipartPart(mpu, partNumber, payload);
}
}
@Override
@Beta
public void downloadBlob(String container, String name, File destination) {
downloadBlob(container, name, destination, userExecutor);
}
@Override
@Beta
public void downloadBlob(String container, String name, File destination, ExecutorService executor) {
ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(executor);
RandomAccessFile raf = null;
File tempFile = new File(destination + "." + UUID.randomUUID());
try {
long contentLength = api
.getObjectApi(regionId, container)
.getWithoutBody(name)
.getPayload()
.getContentMetadata()
.getContentLength();
// Reserve space for performance reasons
raf = new RandomAccessFile(tempFile, "rw");
raf.seek(contentLength - 1);
raf.write(0);
// Determine download buffer size, smaller means less memory usage; larger is faster as long as threads are saturated
long partSize = getMinimumMultipartPartSize();
// Loop through ranges within the file
long from;
long to;
List<ListenableFuture<Void>> results = new ArrayList<ListenableFuture<Void>>();
for (from = 0; from < contentLength; from = from + partSize) {
to = (from + partSize >= contentLength) ? contentLength - 1 : from + partSize - 1;
BlobDownloader b = new BlobDownloader(regionId, container, name, raf, from, to);
results.add(listeningExecutor.submit(b));
}
Futures.getUnchecked(Futures.allAsList(results));
raf.getChannel().force(true);
raf.getChannel().close();
raf.close();
if (destination.exists()) {
destination.delete();
}
if (!tempFile.renameTo(destination)) {
throw new RuntimeException("Could not move temporary downloaded file to destination " + destination);
}
tempFile = null;
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
Closeables2.closeQuietly(raf);
if (tempFile != null) {
tempFile.delete();
}
}
}
private final class BlobDownloader implements Callable<Void> {
String regionId;
String containerName;
String objectName;
private final RandomAccessFile raf;
private final long begin;
private final long end;
BlobDownloader(String regionId, String containerName, String objectName, RandomAccessFile raf, long begin, long end) {
this.regionId = regionId;
this.containerName = containerName;
this.objectName = objectName;
this.raf = raf;
this.begin = begin;
this.end = end;
}
@Override
public Void call() {
IOException lastException = null;
for (int retry = 0; retry < retryCountLimit; retry++) {
try {
SwiftObject object = api.getObjectApi(regionId, containerName)
.get(objectName, org.jclouds.http.options.GetOptions.Builder.range(begin, end));
// Download first, this is the part that usually fails
byte[] targetArray;
InputStream is = object.getPayload().openStream();
try {
targetArray = ByteStreams.toByteArray(is);
} finally {
Closeables.closeQuietly(is);
}
// Map file region
MappedByteBuffer out = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, begin, end - begin + 1);
out.put(targetArray);
out.force();
// JDK-4715154 ; TODO: Java 8 FileChannels
if (System.getProperty("os.name").toLowerCase().contains("windows")) {
closeDirectBuffer(out);
}
} catch (IOException e) {
lastException = e;
continue;
}
// Success!
return null;
}
throw new RuntimeException("After " + retryCountLimit + " retries: " + lastException);
}
// JDK-4715154
private void closeDirectBuffer(MappedByteBuffer mbb) {
if ( mbb == null || !mbb.isDirect() )
return;
try {
Method cleaner = mbb.getClass().getMethod("cleaner");
cleaner.setAccessible(true);
Method clean = Class.forName("sun.misc.Cleaner").getMethod("clean");
clean.setAccessible(true);
clean.invoke(cleaner.invoke(mbb));
} catch (Exception e) {
logger.warn(e.toString());
}
}
}
@Beta
@Override
public InputStream streamBlob(final String container, final String name) {
return streamBlob(container, name, userExecutor);
}
@Beta
@Override
public InputStream streamBlob(final String container, final String name, final ExecutorService executor) {
final ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(executor);
// User will receive the Input end of the piped stream
final PipedOutputStream output;
final PipedInputStream input;
try {
output = new PipedOutputStream();
input = new PipedInputStream(output,
getMinimumMultipartPartSize() * 5 > Integer.MAX_VALUE ?
Integer.MAX_VALUE : (int) getMinimumMultipartPartSize() * 5);
} catch (IOException e) {
throw new RuntimeException(e);
}
// The total length of the file to download is needed to determine ranges
// It has to be obtainable without downloading the whole file
final long contentLength = api
.getObjectApi(regionId, container)
.getWithoutBody(name)
.getPayload()
.getContentMetadata()
.getContentLength();
// Determine download buffer size, smaller means less memory usage; larger is faster as long as threads are saturated
final long partSize = getMinimumMultipartPartSize();
// Used to communicate between the producer and consumer threads
final LinkedBlockingQueue<ListenableFuture<byte[]>> results = new LinkedBlockingQueue<ListenableFuture<byte[]>>();
listeningExecutor.submit(new Runnable() {
@Override
public void run() {
ListenableFuture<byte[]> result;
long from;
try {
for (from = 0; from < contentLength; from = from + partSize) {
logger.debug(Thread.currentThread() + " writing to output");
result = results.take();
if (result == null) {
output.close();
input.close();
throw new RuntimeException("Error downloading file part to stream");
}
output.write(result.get());
}
} catch (Exception e) {
logger.debug(e.toString());
// Close pipe so client is notified of an exception
Closeables2.closeQuietly(input);
throw new RuntimeException(e);
} finally {
// Finished writing results to stream
Closeables2.closeQuietly(output);
}
}
});
listeningExecutor.submit(new Runnable() {
@Override
public void run() {
long from;
long to;
// Loop through ranges within the file
for (from = 0; from < contentLength; from = from + partSize) {
to = (from + partSize >= contentLength) ? contentLength - 1 : from + partSize - 1;
BlobStreamDownloader b = new BlobStreamDownloader(container, name, from, to);
results.add(listeningExecutor.submit(b));
}
}
});
return input;
}
private final class BlobStreamDownloader implements Callable<byte[]> {
String containerName;
String objectName;
private final long begin;
private final long end;
BlobStreamDownloader(String containerName, String objectName, long begin, long end) {
this.containerName = containerName;
this.objectName = objectName;
this.begin = begin;
this.end = end;
}
@Override
public byte[] call() {
IOException lastException = null;
for (int retry = 0; retry < retryCountLimit; retry++) {
try {
long time = System.nanoTime();
SwiftObject object = api.getObjectApi(regionId, containerName)
.get(objectName, org.jclouds.http.options.GetOptions.Builder.range(begin, end));
byte[] downloadedBlock;
InputStream is = object.getPayload().openStream();
try {
downloadedBlock = ByteStreams.toByteArray(is);
} finally {
Closeables.closeQuietly(is);
}
return downloadedBlock;
} catch (IOException e) {
logger.debug(e.toString());
lastException = e;
continue;
}
}
throw new RuntimeException("After " + retryCountLimit + " retries: " + lastException);
}
}
}