blob: 935bf7185e5f6294bb6198fa3c2768aa83e133e7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.hadoop.classification.InterfaceAudience;
import com.microsoft.windowsazure.storage.AccessCondition;
import com.microsoft.windowsazure.storage.CloudStorageAccount;
import com.microsoft.windowsazure.storage.OperationContext;
import com.microsoft.windowsazure.storage.RetryPolicyFactory;
import com.microsoft.windowsazure.storage.StorageCredentials;
import com.microsoft.windowsazure.storage.StorageException;
import com.microsoft.windowsazure.storage.StorageUri;
import com.microsoft.windowsazure.storage.blob.BlobListingDetails;
import com.microsoft.windowsazure.storage.blob.BlobProperties;
import com.microsoft.windowsazure.storage.blob.BlobRequestOptions;
import com.microsoft.windowsazure.storage.blob.CloudBlobClient;
import com.microsoft.windowsazure.storage.blob.CloudBlobContainer;
import com.microsoft.windowsazure.storage.blob.CloudBlobDirectory;
import com.microsoft.windowsazure.storage.blob.CloudBlockBlob;
import com.microsoft.windowsazure.storage.blob.CopyState;
import com.microsoft.windowsazure.storage.blob.DeleteSnapshotsOption;
import com.microsoft.windowsazure.storage.blob.ListBlobItem;
/**
* A real implementation of the Azure interaction layer that just redirects
* calls to the Windows Azure storage SDK.
*/
@InterfaceAudience.Private
class StorageInterfaceImpl extends StorageInterface {
private CloudBlobClient serviceClient;
@Override
public void setRetryPolicyFactory(final RetryPolicyFactory retryPolicyFactory) {
serviceClient.setRetryPolicyFactory(retryPolicyFactory);
}
@Override
public void setTimeoutInMs(int timeoutInMs) {
serviceClient.setTimeoutInMs(timeoutInMs);
}
@Override
public void createBlobClient(CloudStorageAccount account) {
serviceClient = account.createCloudBlobClient();
}
@Override
public void createBlobClient(URI baseUri) {
serviceClient = new CloudBlobClient(baseUri);
}
@Override
public void createBlobClient(URI baseUri, StorageCredentials credentials) {
serviceClient = new CloudBlobClient(baseUri, credentials);
}
@Override
public StorageCredentials getCredentials() {
return serviceClient.getCredentials();
}
@Override
public CloudBlobContainerWrapper getContainerReference(String uri)
throws URISyntaxException, StorageException {
return new CloudBlobContainerWrapperImpl(
serviceClient.getContainerReference(uri));
}
//
// WrappingIterator
//
/**
* This iterator wraps every ListBlobItem as they come from the listBlobs()
* calls to their proper wrapping objects.
*/
private static class WrappingIterator implements Iterator<ListBlobItem> {
private final Iterator<ListBlobItem> present;
public WrappingIterator(Iterator<ListBlobItem> present) {
this.present = present;
}
public static Iterable<ListBlobItem> wrap(
final Iterable<ListBlobItem> present) {
return new Iterable<ListBlobItem>() {
@Override
public Iterator<ListBlobItem> iterator() {
return new WrappingIterator(present.iterator());
}
};
}
@Override
public boolean hasNext() {
return present.hasNext();
}
@Override
public ListBlobItem next() {
ListBlobItem unwrapped = present.next();
if (unwrapped instanceof CloudBlobDirectory) {
return new CloudBlobDirectoryWrapperImpl((CloudBlobDirectory) unwrapped);
} else if (unwrapped instanceof CloudBlockBlob) {
return new CloudBlockBlobWrapperImpl((CloudBlockBlob) unwrapped);
} else {
return unwrapped;
}
}
@Override
public void remove() {
present.remove();
}
}
//
// CloudBlobDirectoryWrapperImpl
//
@InterfaceAudience.Private
static class CloudBlobDirectoryWrapperImpl extends CloudBlobDirectoryWrapper {
private final CloudBlobDirectory directory;
public CloudBlobDirectoryWrapperImpl(CloudBlobDirectory directory) {
this.directory = directory;
}
@Override
public URI getUri() {
return directory.getUri();
}
@Override
public Iterable<ListBlobItem> listBlobs(String prefix,
boolean useFlatBlobListing, EnumSet<BlobListingDetails> listingDetails,
BlobRequestOptions options, OperationContext opContext)
throws URISyntaxException, StorageException {
return WrappingIterator.wrap(directory.listBlobs(prefix,
useFlatBlobListing, listingDetails, options, opContext));
}
@Override
public CloudBlobContainer getContainer() throws URISyntaxException,
StorageException {
return directory.getContainer();
}
@Override
public CloudBlobDirectory getParent() throws URISyntaxException,
StorageException {
return directory.getParent();
}
@Override
public StorageUri getStorageUri() {
return directory.getStorageUri();
}
}
//
// CloudBlobContainerWrapperImpl
//
@InterfaceAudience.Private
static class CloudBlobContainerWrapperImpl extends CloudBlobContainerWrapper {
private final CloudBlobContainer container;
public CloudBlobContainerWrapperImpl(CloudBlobContainer container) {
this.container = container;
}
@Override
public String getName() {
return container.getName();
}
@Override
public boolean exists(OperationContext opContext) throws StorageException {
return container.exists(AccessCondition.generateEmptyCondition(), null,
opContext);
}
@Override
public void create(OperationContext opContext) throws StorageException {
container.create(null, opContext);
}
@Override
public HashMap<String, String> getMetadata() {
return container.getMetadata();
}
@Override
public void setMetadata(HashMap<String, String> metadata) {
container.setMetadata(metadata);
}
@Override
public void downloadAttributes(OperationContext opContext)
throws StorageException {
container.downloadAttributes(AccessCondition.generateEmptyCondition(),
null, opContext);
}
@Override
public void uploadMetadata(OperationContext opContext)
throws StorageException {
container.uploadMetadata(AccessCondition.generateEmptyCondition(), null,
opContext);
}
@Override
public CloudBlobDirectoryWrapper getDirectoryReference(String relativePath)
throws URISyntaxException, StorageException {
CloudBlobDirectory dir = container.getDirectoryReference(relativePath);
return new CloudBlobDirectoryWrapperImpl(dir);
}
@Override
public CloudBlockBlobWrapper getBlockBlobReference(String relativePath)
throws URISyntaxException, StorageException {
return new CloudBlockBlobWrapperImpl(
container.getBlockBlobReference(relativePath));
}
}
//
// CloudBlockBlobWrapperImpl
//
@InterfaceAudience.Private
static class CloudBlockBlobWrapperImpl extends CloudBlockBlobWrapper {
private final CloudBlockBlob blob;
public URI getUri() {
return blob.getUri();
}
public CloudBlockBlobWrapperImpl(CloudBlockBlob blob) {
this.blob = blob;
}
@Override
public HashMap<String, String> getMetadata() {
return blob.getMetadata();
}
@Override
public void startCopyFromBlob(CloudBlockBlobWrapper sourceBlob,
OperationContext opContext) throws StorageException, URISyntaxException {
blob.startCopyFromBlob(((CloudBlockBlobWrapperImpl) sourceBlob).blob,
null, null, null, opContext);
}
@Override
public void delete(OperationContext opContext) throws StorageException {
blob.delete(DeleteSnapshotsOption.NONE, null, null, opContext);
}
@Override
public boolean exists(OperationContext opContext) throws StorageException {
return blob.exists(null, null, opContext);
}
@Override
public void downloadAttributes(OperationContext opContext)
throws StorageException {
blob.downloadAttributes(null, null, opContext);
}
@Override
public BlobProperties getProperties() {
return blob.getProperties();
}
@Override
public void setMetadata(HashMap<String, String> metadata) {
blob.setMetadata(metadata);
}
@Override
public InputStream openInputStream(BlobRequestOptions options,
OperationContext opContext) throws StorageException {
return blob.openInputStream(null, options, opContext);
}
@Override
public OutputStream openOutputStream(BlobRequestOptions options,
OperationContext opContext) throws StorageException {
return blob.openOutputStream(null, options, opContext);
}
@Override
public void upload(InputStream sourceStream, OperationContext opContext)
throws StorageException, IOException {
blob.upload(sourceStream, 0, null, null, opContext);
}
@Override
public CloudBlobContainer getContainer() throws URISyntaxException,
StorageException {
return blob.getContainer();
}
@Override
public CloudBlobDirectory getParent() throws URISyntaxException,
StorageException {
return blob.getParent();
}
@Override
public void uploadMetadata(OperationContext opContext)
throws StorageException {
blob.uploadMetadata(null, null, opContext);
}
@Override
public void uploadProperties(OperationContext opContext)
throws StorageException {
blob.uploadProperties(null, null, opContext);
}
@Override
public void setStreamMinimumReadSizeInBytes(int minimumReadSizeBytes) {
blob.setStreamMinimumReadSizeInBytes(minimumReadSizeBytes);
}
@Override
public void setWriteBlockSizeInBytes(int writeBlockSizeBytes) {
blob.setStreamWriteSizeInBytes(writeBlockSizeBytes);
}
@Override
public StorageUri getStorageUri() {
return blob.getStorageUri();
}
@Override
public CopyState getCopyState() {
return blob.getCopyState();
}
}
}