blob: 6d11207c479b510ba47bef9b9fba75b56bce8402 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.TimeZone;
import java.util.List;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.net.URLCodec;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.fs.Path;
import org.apache.http.client.utils.URIBuilder;
import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.RetryPolicyFactory;
import com.microsoft.azure.storage.StorageCredentials;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.StorageUri;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobProperties;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.BlockEntry;
import com.microsoft.azure.storage.blob.BlockListingFilter;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlobDirectory;
import com.microsoft.azure.storage.blob.CopyState;
import com.microsoft.azure.storage.blob.ListBlobItem;
import com.microsoft.azure.storage.blob.PageRange;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriBuilderException;
/**
* A mock implementation of the Azure Storage interaction layer for unit tests.
* Just does in-memory storage.
*/
public class MockStorageInterface extends StorageInterface {
private InMemoryBlockBlobStore backingStore;
private final ArrayList<PreExistingContainer> preExistingContainers =
new ArrayList<MockStorageInterface.PreExistingContainer>();
private String baseUriString;
private static final URLCodec codec = new URLCodec();
public InMemoryBlockBlobStore getBackingStore() {
return backingStore;
}
/**
* Mocks the situation where a container already exists before WASB comes in,
* i.e. the situation where a user creates a container then mounts WASB on the
* pre-existing container.
*
* @param uri
* The URI of the container.
* @param metadata
* The metadata on the container.
*/
public void addPreExistingContainer(String uri,
HashMap<String, String> metadata) {
preExistingContainers.add(new PreExistingContainer(uri, metadata));
}
@Override
public void setRetryPolicyFactory(final RetryPolicyFactory retryPolicyFactory) {
}
@Override
public void setTimeoutInMs(int timeoutInMs) {
}
@Override
public void createBlobClient(CloudStorageAccount account) {
backingStore = new InMemoryBlockBlobStore();
}
@Override
public void createBlobClient(URI baseUri) {
backingStore = new InMemoryBlockBlobStore();
}
@Override
public void createBlobClient(URI baseUri, StorageCredentials credentials) {
this.baseUriString = baseUri.toString();
backingStore = new InMemoryBlockBlobStore();
}
@Override
public StorageCredentials getCredentials() {
// Not implemented for mock interface.
return null;
}
/**
* Utility function used to convert a given URI to a decoded string
* representation sent to the backing store. URIs coming as input
* to this class will be encoded by the URI class, and we want
* the underlying storage to store keys in their original UTF-8 form.
*/
private static String convertUriToDecodedString(URI uri) {
try {
return codec.decode(uri.toString());
} catch (DecoderException e) {
throw new AssertionError("Failed to decode URI: " + uri.toString());
}
}
private static URI convertKeyToEncodedUri(String key) {
try {
Path p = new Path(key);
URI unEncodedURI = p.toUri();
return new URIBuilder().setPath(unEncodedURI.getPath())
.setScheme(unEncodedURI.getScheme()).build();
} catch (URISyntaxException e) {
int i = e.getIndex();
String details;
if (i >= 0) {
details = " -- \"" + e.getInput().charAt(i) + "\"";
} else {
details = "";
}
throw new AssertionError("Failed to encode key: " + key
+ ": " + e + details);
}
}
@Override
public CloudBlobContainerWrapper getContainerReference(String name)
throws URISyntaxException, StorageException {
String fullUri;
URIBuilder builder = new URIBuilder(baseUriString);
String path = builder.getPath() == null ? "" : builder.getPath() + "/";
fullUri = builder.setPath(path + name).toString();
MockCloudBlobContainerWrapper container = new MockCloudBlobContainerWrapper(
fullUri, name);
// Check if we have a pre-existing container with that name, and prime
// the wrapper with that knowledge if it's found.
for (PreExistingContainer existing : preExistingContainers) {
if (fullUri.equalsIgnoreCase(existing.containerUri)) {
// We have a pre-existing container. Mark the wrapper as created and
// make sure we use the metadata for it.
container.created = true;
backingStore.setContainerMetadata(existing.containerMetadata);
break;
}
}
return container;
}
class MockCloudBlobContainerWrapper extends CloudBlobContainerWrapper {
private boolean created = false;
private HashMap<String, String> metadata;
private final String baseUri;
private final String name;
public MockCloudBlobContainerWrapper(String baseUri, String name) {
this.baseUri = baseUri;
this.name = name;
}
@Override
public String getName() {
return name;
}
@Override
public boolean exists(OperationContext opContext) throws StorageException {
return created;
}
@Override
public void create(OperationContext opContext) throws StorageException {
created = true;
backingStore.setContainerMetadata(metadata);
}
@Override
public HashMap<String, String> getMetadata() {
return metadata;
}
@Override
public void setMetadata(HashMap<String, String> metadata) {
this.metadata = metadata;
}
@Override
public void downloadAttributes(OperationContext opContext)
throws StorageException {
metadata = backingStore.getContainerMetadata();
}
@Override
public void uploadMetadata(OperationContext opContext)
throws StorageException {
backingStore.setContainerMetadata(metadata);
}
@Override
public CloudBlobDirectoryWrapper getDirectoryReference(String relativePath)
throws URISyntaxException, StorageException {
return new MockCloudBlobDirectoryWrapper(new URI(fullUriString(
relativePath, true)));
}
@Override
public CloudBlockBlobWrapper getBlockBlobReference(String relativePath)
throws URISyntaxException, StorageException {
return new MockCloudBlockBlobWrapper(new URI(fullUriString(relativePath,
false)), null, 0);
}
@Override
public CloudPageBlobWrapper getPageBlobReference(String blobAddressUri)
throws URISyntaxException, StorageException {
return new MockCloudPageBlobWrapper(new URI(blobAddressUri), null, 0);
}
// helper to create full URIs for directory and blob.
// use withTrailingSlash=true to get a good path for a directory.
private String fullUriString(String relativePath, boolean withTrailingSlash) {
String baseUri = this.baseUri;
if (!baseUri.endsWith("/")) {
baseUri += "/";
}
if (withTrailingSlash && !relativePath.equals("")
&& !relativePath.endsWith("/")) {
relativePath += "/";
}
try {
URIBuilder builder = new URIBuilder(baseUri);
return builder.setPath(builder.getPath() + relativePath).toString();
} catch (URISyntaxException e) {
throw new RuntimeException("problem encoding fullUri", e);
}
}
}
private static class PreExistingContainer {
final String containerUri;
final HashMap<String, String> containerMetadata;
public PreExistingContainer(String uri, HashMap<String, String> metadata) {
this.containerUri = uri;
this.containerMetadata = metadata;
}
}
class MockCloudBlobDirectoryWrapper extends CloudBlobDirectoryWrapper {
private URI uri;
public MockCloudBlobDirectoryWrapper(URI uri) {
this.uri = uri;
}
@Override
public CloudBlobContainer getContainer() throws URISyntaxException,
StorageException {
return null;
}
@Override
public CloudBlobDirectory getParent() throws URISyntaxException,
StorageException {
return null;
}
@Override
public URI getUri() {
return uri;
}
@Override
public Iterable<ListBlobItem> listBlobs(String prefix,
boolean useFlatBlobListing, EnumSet<BlobListingDetails> listingDetails,
BlobRequestOptions options, OperationContext opContext)
throws URISyntaxException, StorageException {
ArrayList<ListBlobItem> ret = new ArrayList<ListBlobItem>();
URI searchUri = null;
if (prefix == null) {
searchUri = uri;
} else {
try {
searchUri = UriBuilder.fromUri(uri).path(prefix).build();
} catch (UriBuilderException e) {
throw new AssertionError("Failed to encode path: " + prefix);
}
}
String fullPrefix = convertUriToDecodedString(searchUri);
boolean includeMetadata = listingDetails.contains(BlobListingDetails.METADATA);
HashSet<String> addedDirectories = new HashSet<String>();
for (InMemoryBlockBlobStore.ListBlobEntry current : backingStore.listBlobs(
fullPrefix, includeMetadata)) {
int indexOfSlash = current.getKey().indexOf('/', fullPrefix.length());
if (useFlatBlobListing || indexOfSlash < 0) {
if (current.isPageBlob()) {
ret.add(new MockCloudPageBlobWrapper(
convertKeyToEncodedUri(current.getKey()),
current.getMetadata(),
current.getContentLength()));
} else {
ret.add(new MockCloudBlockBlobWrapper(
convertKeyToEncodedUri(current.getKey()),
current.getMetadata(),
current.getContentLength()));
}
} else {
String directoryName = current.getKey().substring(0, indexOfSlash);
if (!addedDirectories.contains(directoryName)) {
addedDirectories.add(current.getKey());
ret.add(new MockCloudBlobDirectoryWrapper(new URI(
directoryName + "/")));
}
}
}
return ret;
}
@Override
public StorageUri getStorageUri() {
throw new NotImplementedException("Code is not implemented");
}
}
abstract class MockCloudBlobWrapper implements CloudBlobWrapper {
protected final URI uri;
protected HashMap<String, String> metadata =
new HashMap<String, String>();
protected BlobProperties properties;
protected MockCloudBlobWrapper(URI uri, HashMap<String, String> metadata,
int length) {
this.uri = uri;
this.metadata = metadata;
this.properties = new BlobProperties();
this.properties=updateLastModifed(this.properties);
this.properties=updateLength(this.properties,length);
}
protected BlobProperties updateLastModifed(BlobProperties properties){
try{
Method setLastModified =properties.getClass().
getDeclaredMethod("setLastModified", Date.class);
setLastModified.setAccessible(true);
setLastModified.invoke(this.properties,
Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTime());
}catch(Exception e){
throw new RuntimeException(e);
}
return properties;
}
protected BlobProperties updateLength(BlobProperties properties,int length) {
try{
Method setLength =properties.getClass().
getDeclaredMethod("setLength", long.class);
setLength.setAccessible(true);
setLength.invoke(this.properties, length);
}catch (Exception e){
throw new RuntimeException(e);
}
return properties;
}
protected void refreshProperties(boolean getMetadata) {
if (backingStore.exists(convertUriToDecodedString(uri))) {
byte[] content = backingStore.getContent(convertUriToDecodedString(uri));
properties = new BlobProperties();
this.properties=updateLastModifed(this.properties);
this.properties=updateLength(this.properties, content.length);
if (getMetadata) {
metadata = backingStore.getMetadata(convertUriToDecodedString(uri));
}
}
}
@Override
public CloudBlobContainer getContainer() throws URISyntaxException,
StorageException {
return null;
}
@Override
public CloudBlobDirectory getParent() throws URISyntaxException,
StorageException {
return null;
}
@Override
public URI getUri() {
return uri;
}
@Override
public HashMap<String, String> getMetadata() {
return metadata;
}
@Override
public void setMetadata(HashMap<String, String> metadata) {
this.metadata = metadata;
}
@Override
public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
OperationContext opContext, boolean overwriteDestination) throws StorageException, URISyntaxException {
if (!overwriteDestination && backingStore.exists(convertUriToDecodedString(uri))) {
throw new StorageException("BlobAlreadyExists",
"The blob already exists.",
HttpURLConnection.HTTP_CONFLICT,
null,
null);
}
backingStore.copy(convertUriToDecodedString(sourceBlob.getUri()), convertUriToDecodedString(uri));
//TODO: set the backingStore.properties.CopyState and
// update azureNativeFileSystemStore.waitForCopyToComplete
}
@Override
public CopyState getCopyState() {
return this.properties.getCopyState();
}
@Override
public void delete(OperationContext opContext, SelfRenewingLease lease)
throws StorageException {
backingStore.delete(convertUriToDecodedString(uri));
}
@Override
public boolean exists(OperationContext opContext) throws StorageException {
return backingStore.exists(convertUriToDecodedString(uri));
}
@Override
public void downloadAttributes(OperationContext opContext)
throws StorageException {
refreshProperties(true);
}
@Override
public BlobProperties getProperties() {
return properties;
}
@Override
public InputStream openInputStream(BlobRequestOptions options,
OperationContext opContext) throws StorageException {
return new ByteArrayInputStream(
backingStore.getContent(convertUriToDecodedString(uri)));
}
@Override
public void uploadMetadata(OperationContext opContext)
throws StorageException {
backingStore.setMetadata(convertUriToDecodedString(uri), metadata);
}
@Override
public void downloadRange(long offset, long length, OutputStream os,
BlobRequestOptions options, OperationContext opContext)
throws StorageException {
if (offset < 0 || length <= 0) {
throw new IndexOutOfBoundsException();
}
if (!backingStore.exists(convertUriToDecodedString(uri))) {
throw new StorageException("BlobNotFound",
"Resource does not exist.",
HttpURLConnection.HTTP_NOT_FOUND,
null,
null);
}
byte[] content = backingStore.getContent(convertUriToDecodedString(uri));
try {
os.write(content, (int) offset, (int) length);
} catch (IOException e) {
throw new StorageException("Unknown error", "Unexpected error", e);
}
}
}
class MockCloudBlockBlobWrapper extends MockCloudBlobWrapper
implements CloudBlockBlobWrapper {
int minimumReadSize = AzureNativeFileSystemStore.DEFAULT_DOWNLOAD_BLOCK_SIZE;
public MockCloudBlockBlobWrapper(URI uri, HashMap<String, String> metadata,
int length) {
super(uri, metadata, length);
}
@Override
public OutputStream openOutputStream(BlobRequestOptions options,
OperationContext opContext) throws StorageException {
return backingStore.uploadBlockBlob(convertUriToDecodedString(uri),
metadata);
}
@Override
public int getStreamMinimumReadSizeInBytes() {
return this.minimumReadSize;
}
@Override
public void setStreamMinimumReadSizeInBytes(int minimumReadSizeBytes) {
this.minimumReadSize = minimumReadSizeBytes;
}
@Override
public void setWriteBlockSizeInBytes(int writeBlockSizeBytes) {
}
@Override
public StorageUri getStorageUri() {
return null;
}
@Override
public void uploadProperties(OperationContext context, SelfRenewingLease lease) {
}
@Override
public SelfRenewingLease acquireLease() {
return null;
}
@Override
public CloudBlob getBlob() {
return null;
}
@Override
public List<BlockEntry> downloadBlockList(BlockListingFilter filter, BlobRequestOptions options,
OperationContext opContext) throws IOException, StorageException {
throw new UnsupportedOperationException("downloadBlockList not used in Mock Tests");
}
@Override
public void uploadBlock(String blockId, AccessCondition accessCondition,
InputStream sourceStream,
long length, BlobRequestOptions options,
OperationContext opContext) throws IOException, StorageException {
throw new UnsupportedOperationException("uploadBlock not used in Mock Tests");
}
@Override
public void commitBlockList(List<BlockEntry> blockList, AccessCondition accessCondition,
BlobRequestOptions options, OperationContext opContext) throws IOException, StorageException {
throw new UnsupportedOperationException("commitBlockList not used in Mock Tests");
}
public void uploadMetadata(AccessCondition accessCondition, BlobRequestOptions options,
OperationContext opContext) throws StorageException {
throw new UnsupportedOperationException("uploadMetadata not used in Mock Tests");
}
}
class MockCloudPageBlobWrapper extends MockCloudBlobWrapper
implements CloudPageBlobWrapper {
int minimumReadSize = AzureNativeFileSystemStore.DEFAULT_DOWNLOAD_BLOCK_SIZE;
public MockCloudPageBlobWrapper(URI uri, HashMap<String, String> metadata,
int length) {
super(uri, metadata, length);
}
@Override
public void create(long length, BlobRequestOptions options,
OperationContext opContext) throws StorageException {
throw new NotImplementedException("Code is not implemented");
}
@Override
public void uploadPages(InputStream sourceStream, long offset, long length,
BlobRequestOptions options, OperationContext opContext)
throws StorageException, IOException {
throw new NotImplementedException("Code is not implemented");
}
@Override
public ArrayList<PageRange> downloadPageRanges(BlobRequestOptions options,
OperationContext opContext) throws StorageException {
throw new NotImplementedException("Code is not implemented");
}
@Override
public int getStreamMinimumReadSizeInBytes() {
return this.minimumReadSize;
}
@Override
public void setStreamMinimumReadSizeInBytes(int minimumReadSize) {
this.minimumReadSize = minimumReadSize;
}
@Override
public void setWriteBlockSizeInBytes(int writeBlockSizeInBytes) {
}
@Override
public StorageUri getStorageUri() {
throw new NotImplementedException("Code is not implemented");
}
@Override
public void uploadProperties(OperationContext opContext,
SelfRenewingLease lease)
throws StorageException {
}
@Override
public SelfRenewingLease acquireLease() {
return null;
}
@Override
public CloudBlob getBlob() {
return null;
}
public void uploadMetadata(AccessCondition accessCondition, BlobRequestOptions options,
OperationContext opContext) throws StorageException {
throw new UnsupportedOperationException("uploadMetadata not used in Mock Tests");
}
}
}