blob: 577711f4ce0d2308e082981d59efca181f7a526e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import java.io.DataInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BufferedFSInputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import com.google.common.annotations.VisibleForTesting;
import com.microsoft.windowsazure.storage.core.Utility;
/**
* <p>
* A {@link FileSystem} for reading and writing files stored on <a
* href="http://store.azure.com/">Windows Azure</a>. This implementation is
* blob-based and stores files on Azure in their native form so they can be read
* by other Azure tools.
* </p>
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class NativeAzureFileSystem extends FileSystem {
@Override
public String getScheme() {
return "wasb";
}
/**
* <p>
* A {@link FileSystem} for reading and writing files stored on <a
* href="http://store.azure.com/">Windows Azure</a>. This implementation is
* blob-based and stores files on Azure in their native form so they can be read
* by other Azure tools. This implementation uses HTTPS for secure network communication.
* </p>
*/
public static class Secure extends NativeAzureFileSystem {
@Override
public String getScheme() {
return "wasbs";
}
}
public static final Log LOG = LogFactory.getLog(NativeAzureFileSystem.class);
static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size";
/**
* The time span in seconds before which we consider a temp blob to be
* dangling (not being actively uploaded to) and up for reclamation.
*
* So e.g. if this is 60, then any temporary blobs more than a minute old
* would be considered dangling.
*/
static final String AZURE_TEMP_EXPIRY_PROPERTY_NAME = "fs.azure.fsck.temp.expiry.seconds";
private static final int AZURE_TEMP_EXPIRY_DEFAULT = 3600;
static final String PATH_DELIMITER = Path.SEPARATOR;
static final String AZURE_TEMP_FOLDER = "_$azuretmpfolder$";
private static final int AZURE_LIST_ALL = -1;
private static final int AZURE_UNBOUNDED_DEPTH = -1;
private static final long MAX_AZURE_BLOCK_SIZE = 512 * 1024 * 1024L;
/**
* The configuration property that determines which group owns files created
* in WASB.
*/
private static final String AZURE_DEFAULT_GROUP_PROPERTY_NAME = "fs.azure.permissions.supergroup";
/**
* The default value for fs.azure.permissions.supergroup. Chosen as the same
* default as DFS.
*/
static final String AZURE_DEFAULT_GROUP_DEFAULT = "supergroup";
static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME = "fs.azure.block.location.impersonatedhost";
private static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT = "localhost";
private class NativeAzureFsInputStream extends FSInputStream {
private InputStream in;
private final String key;
private long pos = 0;
public NativeAzureFsInputStream(DataInputStream in, String key) {
this.in = in;
this.key = key;
}
/*
* Reads the next byte of data from the input stream. The value byte is
* returned as an integer in the range 0 to 255. If no byte is available
* because the end of the stream has been reached, the value -1 is returned.
* This method blocks until input data is available, the end of the stream
* is detected, or an exception is thrown.
*
* @returns int An integer corresponding to the byte read.
*/
@Override
public synchronized int read() throws IOException {
int result = 0;
result = in.read();
if (result != -1) {
pos++;
if (statistics != null) {
statistics.incrementBytesRead(1);
}
}
// Return to the caller with the result.
//
return result;
}
/*
* Reads up to len bytes of data from the input stream into an array of
* bytes. An attempt is made to read as many as len bytes, but a smaller
* number may be read. The number of bytes actually read is returned as an
* integer. This method blocks until input data is available, end of file is
* detected, or an exception is thrown. If len is zero, then no bytes are
* read and 0 is returned; otherwise, there is an attempt to read at least
* one byte. If no byte is available because the stream is at end of file,
* the value -1 is returned; otherwise, at least one byte is read and stored
* into b.
*
* @param b -- the buffer into which data is read
*
* @param off -- the start offset in the array b at which data is written
*
* @param len -- the maximum number of bytes read
*
* @ returns int The total number of byes read into the buffer, or -1 if
* there is no more data because the end of stream is reached.
*/
@Override
public synchronized int read(byte[] b, int off, int len) throws IOException {
int result = 0;
result = in.read(b, off, len);
if (result > 0) {
pos += result;
}
if (null != statistics) {
statistics.incrementBytesRead(result);
}
// Return to the caller with the result.
return result;
}
@Override
public synchronized void close() throws IOException {
in.close();
}
@Override
public synchronized void seek(long pos) throws IOException {
in.close();
in = store.retrieve(key, pos);
this.pos = pos;
}
@Override
public synchronized long getPos() throws IOException {
return pos;
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
}
private class NativeAzureFsOutputStream extends OutputStream {
// We should not override flush() to actually close current block and flush
// to DFS, this will break applications that assume flush() is a no-op.
// Applications are advised to use Syncable.hflush() for that purpose.
// NativeAzureFsOutputStream needs to implement Syncable if needed.
private String key;
private String keyEncoded;
private OutputStream out;
public NativeAzureFsOutputStream(OutputStream out, String aKey,
String anEncodedKey) throws IOException {
// Check input arguments. The output stream should be non-null and the
// keys
// should be valid strings.
if (null == out) {
throw new IllegalArgumentException(
"Illegal argument: the output stream is null.");
}
if (null == aKey || 0 == aKey.length()) {
throw new IllegalArgumentException(
"Illegal argument the key string is null or empty");
}
if (null == anEncodedKey || 0 == anEncodedKey.length()) {
throw new IllegalArgumentException(
"Illegal argument the encoded key string is null or empty");
}
// Initialize the member variables with the incoming parameters.
this.out = out;
setKey(aKey);
setEncodedKey(anEncodedKey);
}
@Override
public synchronized void close() throws IOException {
if (out != null) {
// Close the output stream and decode the key for the output stream
// before returning to the caller.
//
out.close();
restoreKey();
out = null;
}
}
/**
* Writes the specified byte to this output stream. The general contract for
* write is that one byte is written to the output stream. The byte to be
* written is the eight low-order bits of the argument b. The 24 high-order
* bits of b are ignored.
*
* @param b
* 32-bit integer of block of 4 bytes
*/
@Override
public void write(int b) throws IOException {
out.write(b);
}
/**
* Writes b.length bytes from the specified byte array to this output
* stream. The general contract for write(b) is that it should have exactly
* the same effect as the call write(b, 0, b.length).
*
* @param b
* Block of bytes to be written to the output stream.
*/
@Override
public void write(byte[] b) throws IOException {
out.write(b);
}
/**
* Writes <code>len</code> from the specified byte array starting at offset
* <code>off</code> to the output stream. The general contract for write(b,
* off, len) is that some of the bytes in the array <code>
* b</code b> are written to the output stream in order; element
* <code>b[off]</code> is the first byte written and
* <code>b[off+len-1]</code> is the last byte written by this operation.
*
* @param b
* Byte array to be written.
* @param off
* Write this offset in stream.
* @param len
* Number of bytes to be written.
*/
@Override
public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len);
}
/**
* Get the blob name.
*
* @return String Blob name.
*/
public String getKey() {
return key;
}
/**
* Set the blob name.
*
* @param key
* Blob name.
*/
public void setKey(String key) {
this.key = key;
}
/**
* Get the blob name.
*
* @return String Blob name.
*/
public String getEncodedKey() {
return keyEncoded;
}
/**
* Set the blob name.
*
* @param anEncodedKey
* Blob name.
*/
public void setEncodedKey(String anEncodedKey) {
this.keyEncoded = anEncodedKey;
}
/**
* Restore the original key name from the m_key member variable. Note: The
* output file stream is created with an encoded blob store key to guarantee
* load balancing on the front end of the Azure storage partition servers.
* The create also includes the name of the original key value which is
* stored in the m_key member variable. This method should only be called
* when the stream is closed.
*
* @param anEncodedKey
* Encoding of the original key stored in m_key member.
*/
private void restoreKey() throws IOException {
store.rename(getEncodedKey(), getKey());
}
}
private URI uri;
private NativeFileSystemStore store;
private AzureNativeFileSystemStore actualStore;
private Path workingDir;
private long blockSize = MAX_AZURE_BLOCK_SIZE;
private AzureFileSystemInstrumentation instrumentation;
private String metricsSourceName;
private boolean isClosed = false;
private static boolean suppressRetryPolicy = false;
// A counter to create unique (within-process) names for my metrics sources.
private static AtomicInteger metricsSourceNameCounter = new AtomicInteger();
public NativeAzureFileSystem() {
// set store in initialize()
}
public NativeAzureFileSystem(NativeFileSystemStore store) {
this.store = store;
}
/**
* Suppress the default retry policy for the Storage, useful in unit tests to
* test negative cases without waiting forever.
*/
@VisibleForTesting
static void suppressRetryPolicy() {
suppressRetryPolicy = true;
}
/**
* Undo the effect of suppressRetryPolicy.
*/
@VisibleForTesting
static void resumeRetryPolicy() {
suppressRetryPolicy = false;
}
/**
* Creates a new metrics source name that's unique within this process.
*/
@VisibleForTesting
public static String newMetricsSourceName() {
int number = metricsSourceNameCounter.incrementAndGet();
final String baseName = "AzureFileSystemMetrics";
if (number == 1) { // No need for a suffix for the first one
return baseName;
} else {
return baseName + number;
}
}
/**
* Checks if the given URI scheme is a scheme that's affiliated with the Azure
* File System.
*
* @param scheme
* The URI scheme.
* @return true iff it's an Azure File System URI scheme.
*/
private static boolean isWasbScheme(String scheme) {
// The valid schemes are: asv (old name), asvs (old name over HTTPS),
// wasb (new name), wasbs (new name over HTTPS).
return scheme != null
&& (scheme.equalsIgnoreCase("asv") || scheme.equalsIgnoreCase("asvs")
|| scheme.equalsIgnoreCase("wasb") || scheme
.equalsIgnoreCase("wasbs"));
}
/**
* Puts in the authority of the default file system if it is a WASB file
* system and the given URI's authority is null.
*
* @return The URI with reconstructed authority if necessary and possible.
*/
private static URI reconstructAuthorityIfNeeded(URI uri, Configuration conf) {
if (null == uri.getAuthority()) {
// If WASB is the default file system, get the authority from there
URI defaultUri = FileSystem.getDefaultUri(conf);
if (defaultUri != null && isWasbScheme(defaultUri.getScheme())) {
try {
// Reconstruct the URI with the authority from the default URI.
return new URI(uri.getScheme(), defaultUri.getAuthority(),
uri.getPath(), uri.getQuery(), uri.getFragment());
} catch (URISyntaxException e) {
// This should never happen.
throw new Error("Bad URI construction", e);
}
}
}
return uri;
}
@Override
protected void checkPath(Path path) {
// Make sure to reconstruct the path's authority if needed
super.checkPath(new Path(reconstructAuthorityIfNeeded(path.toUri(),
getConf())));
}
@Override
public void initialize(URI uri, Configuration conf) throws IOException {
// Check authority for the URI to guarantee that it is non-null.
uri = reconstructAuthorityIfNeeded(uri, conf);
if (null == uri.getAuthority()) {
final String errMsg = String
.format("Cannot initialize WASB file system, URI authority not recognized.");
throw new IllegalArgumentException(errMsg);
}
super.initialize(uri, conf);
if (store == null) {
store = createDefaultStore(conf);
}
// Make sure the metrics system is available before interacting with Azure
AzureFileSystemMetricsSystem.fileSystemStarted();
metricsSourceName = newMetricsSourceName();
String sourceDesc = "Azure Storage Volume File System metrics";
instrumentation = new AzureFileSystemInstrumentation(conf);
AzureFileSystemMetricsSystem.registerSource(metricsSourceName, sourceDesc,
instrumentation);
store.initialize(uri, conf, instrumentation);
setConf(conf);
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
this.workingDir = new Path("/user", UserGroupInformation.getCurrentUser()
.getShortUserName()).makeQualified(getUri(), getWorkingDirectory());
this.blockSize = conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME,
MAX_AZURE_BLOCK_SIZE);
if (LOG.isDebugEnabled()) {
LOG.debug("NativeAzureFileSystem. Initializing.");
LOG.debug(" blockSize = "
+ conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE));
}
}
private NativeFileSystemStore createDefaultStore(Configuration conf) {
actualStore = new AzureNativeFileSystemStore();
if (suppressRetryPolicy) {
actualStore.suppressRetryPolicy();
}
return actualStore;
}
// Note: The logic for this method is confusing as to whether it strips the
// last slash or not (it adds it in the beginning, then strips it at the end).
// We should revisit that.
private String pathToKey(Path path) {
// Convert the path to a URI to parse the scheme, the authority, and the
// path from the path object.
URI tmpUri = path.toUri();
String pathUri = tmpUri.getPath();
// The scheme and authority is valid. If the path does not exist add a "/"
// separator to list the root of the container.
Path newPath = path;
if ("".equals(pathUri)) {
newPath = new Path(tmpUri.toString() + Path.SEPARATOR);
}
// Verify path is absolute if the path refers to a windows drive scheme.
if (!newPath.isAbsolute()) {
throw new IllegalArgumentException("Path must be absolute: " + path);
}
String key = null;
key = newPath.toUri().getPath();
if (key.length() == 1) {
return key;
} else {
return key.substring(1); // remove initial slash
}
}
private static Path keyToPath(String key) {
if (key.equals("/")) {
return new Path("/"); // container
}
return new Path("/" + key);
}
private Path makeAbsolute(Path path) {
if (path.isAbsolute()) {
return path;
}
return new Path(workingDir, path);
}
/**
* For unit test purposes, retrieves the AzureNativeFileSystemStore store
* backing this file system.
*
* @return The store object.
*/
@VisibleForTesting
public AzureNativeFileSystemStore getStore() {
return actualStore;
}
/**
* Gets the metrics source for this file system.
* This is mainly here for unit testing purposes.
*
* @return the metrics source.
*/
public AzureFileSystemInstrumentation getInstrumentation() {
return instrumentation;
}
/** This optional operation is not yet supported. */
@Override
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
throws IOException {
throw new IOException("Not supported");
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Creating file: " + f.toString());
}
if (containsColon(f)) {
throw new IOException("Cannot create file " + f
+ " through WASB that has colons in the name");
}
Path absolutePath = makeAbsolute(f);
String key = pathToKey(absolutePath);
FileMetadata existingMetadata = store.retrieveMetadata(key);
if (existingMetadata != null) {
if (existingMetadata.isDir()) {
throw new IOException("Cannot create file " + f
+ "; already exists as a directory.");
}
if (!overwrite) {
throw new IOException("File already exists:" + f);
}
}
Path parentFolder = absolutePath.getParent();
if (parentFolder != null && parentFolder.getParent() != null) { // skip root
// Update the parent folder last modified time if the parent folder
// already exists.
String parentKey = pathToKey(parentFolder);
FileMetadata parentMetadata = store.retrieveMetadata(parentKey);
if (parentMetadata != null
&& parentMetadata.isDir()
&& parentMetadata.getBlobMaterialization() == BlobMaterialization.Explicit) {
store.updateFolderLastModifiedTime(parentKey);
} else {
// Make sure that the parent folder exists.
mkdirs(parentFolder, permission);
}
}
// Open the output blob stream based on the encoded key.
String keyEncoded = encodeKey(key);
// Mask the permission first (with the default permission mask as well).
FsPermission masked = applyUMask(permission, UMaskApplyMode.NewFile);
PermissionStatus permissionStatus = createPermissionStatus(masked);
// First create a blob at the real key, pointing back to the temporary file
// This accomplishes a few things:
// 1. Makes sure we can create a file there.
// 2. Makes it visible to other concurrent threads/processes/nodes what
// we're
// doing.
// 3. Makes it easier to restore/cleanup data in the event of us crashing.
store.storeEmptyLinkFile(key, keyEncoded, permissionStatus);
// The key is encoded to point to a common container at the storage server.
// This reduces the number of splits on the server side when load balancing.
// Ingress to Azure storage can take advantage of earlier splits. We remove
// the root path to the key and prefix a random GUID to the tail (or leaf
// filename) of the key. Keys are thus broadly and randomly distributed over
// a single container to ease load balancing on the storage server. When the
// blob is committed it is renamed to its earlier key. Uncommitted blocks
// are not cleaned up and we leave it to Azure storage to garbage collect
// these
// blocks.
OutputStream bufOutStream = new NativeAzureFsOutputStream(store.storefile(
keyEncoded, permissionStatus), key, keyEncoded);
// Construct the data output stream from the buffered output stream.
FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, statistics);
// Increment the counter
instrumentation.fileCreated();
// Return data output stream to caller.
return fsOut;
}
@Override
@Deprecated
public boolean delete(Path path) throws IOException {
return delete(path, true);
}
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Deleting file: " + f.toString());
}
Path absolutePath = makeAbsolute(f);
String key = pathToKey(absolutePath);
// Capture the metadata for the path.
//
FileMetadata metaFile = store.retrieveMetadata(key);
if (null == metaFile) {
// The path to be deleted does not exist.
return false;
}
// The path exists, determine if it is a folder containing objects,
// an empty folder, or a simple file and take the appropriate actions.
if (!metaFile.isDir()) {
// The path specifies a file. We need to check the parent path
// to make sure it's a proper materialized directory before we
// delete the file. Otherwise we may get into a situation where
// the file we were deleting was the last one in an implicit directory
// (e.g. the blob store only contains the blob a/b and there's no
// corresponding directory blob a) and that would implicitly delete
// the directory as well, which is not correct.
Path parentPath = absolutePath.getParent();
if (parentPath.getParent() != null) {// Not root
String parentKey = pathToKey(parentPath);
FileMetadata parentMetadata = store.retrieveMetadata(parentKey);
if (!parentMetadata.isDir()) {
// Invalid state: the parent path is actually a file. Throw.
throw new AzureException("File " + f + " has a parent directory "
+ parentPath + " which is also a file. Can't resolve.");
}
if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
if (LOG.isDebugEnabled()) {
LOG.debug("Found an implicit parent directory while trying to"
+ " delete the file " + f + ". Creating the directory blob for"
+ " it in " + parentKey + ".");
}
store.storeEmptyFolder(parentKey,
createPermissionStatus(FsPermission.getDefault()));
} else {
store.updateFolderLastModifiedTime(parentKey);
}
}
instrumentation.fileDeleted();
store.delete(key);
} else {
// The path specifies a folder. Recursively delete all entries under the
// folder.
Path parentPath = absolutePath.getParent();
if (parentPath.getParent() != null) {
String parentKey = pathToKey(parentPath);
FileMetadata parentMetadata = store.retrieveMetadata(parentKey);
if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
if (LOG.isDebugEnabled()) {
LOG.debug("Found an implicit parent directory while trying to"
+ " delete the directory " + f
+ ". Creating the directory blob for" + " it in " + parentKey
+ ".");
}
store.storeEmptyFolder(parentKey,
createPermissionStatus(FsPermission.getDefault()));
}
}
// List all the blobs in the current folder.
String priorLastKey = null;
PartialListing listing = store.listAll(key, AZURE_LIST_ALL, 1,
priorLastKey);
FileMetadata[] contents = listing.getFiles();
if (!recursive && contents.length > 0) {
// The folder is non-empty and recursive delete was not specified.
// Throw an exception indicating that a non-recursive delete was
// specified for a non-empty folder.
throw new IOException("Non-recursive delete of non-empty directory "
+ f.toString());
}
// Delete all the files in the folder.
for (FileMetadata p : contents) {
// Tag on the directory name found as the suffix of the suffix of the
// parent directory to get the new absolute path.
String suffix = p.getKey().substring(
p.getKey().lastIndexOf(PATH_DELIMITER));
if (!p.isDir()) {
store.delete(key + suffix);
instrumentation.fileDeleted();
} else {
// Recursively delete contents of the sub-folders. Notice this also
// deletes the blob for the directory.
if (!delete(new Path(f.toString() + suffix), true)) {
return false;
}
}
}
store.delete(key);
// Update parent directory last modified time
Path parent = absolutePath.getParent();
if (parent != null && parent.getParent() != null) { // not root
String parentKey = pathToKey(parent);
store.updateFolderLastModifiedTime(parentKey);
}
instrumentation.directoryDeleted();
}
// File or directory was successfully deleted.
return true;
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Getting the file status for " + f.toString());
}
// Capture the absolute path and the path to key.
Path absolutePath = makeAbsolute(f);
String key = pathToKey(absolutePath);
if (key.length() == 0) { // root always exists
return newDirectory(null, absolutePath);
}
// The path is either a folder or a file. Retrieve metadata to
// determine if it is a directory or file.
FileMetadata meta = store.retrieveMetadata(key);
if (meta != null) {
if (meta.isDir()) {
// The path is a folder with files in it.
//
if (LOG.isDebugEnabled()) {
LOG.debug("Path " + f.toString() + "is a folder.");
}
// Return reference to the directory object.
return newDirectory(meta, absolutePath);
}
// The path is a file.
if (LOG.isDebugEnabled()) {
LOG.debug("Found the path: " + f.toString() + " as a file.");
}
// Return with reference to a file object.
return newFile(meta, absolutePath);
}
// File not found. Throw exception no such file or directory.
// Note: Should never get to this point since the root always exists.
throw new FileNotFoundException(absolutePath
+ ": No such file or directory.");
}
@Override
public URI getUri() {
return uri;
}
/**
* Retrieve the status of a given path if it is a file, or of all the
* contained files if it is a directory.
*/
@Override
public FileStatus[] listStatus(Path f) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Listing status for " + f.toString());
}
Path absolutePath = makeAbsolute(f);
String key = pathToKey(absolutePath);
Set<FileStatus> status = new TreeSet<FileStatus>();
FileMetadata meta = store.retrieveMetadata(key);
if (meta != null) {
if (!meta.isDir()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Found path as a file");
}
return new FileStatus[] { newFile(meta, absolutePath) };
}
String partialKey = null;
PartialListing listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
for (FileMetadata fileMetadata : listing.getFiles()) {
Path subpath = keyToPath(fileMetadata.getKey());
// Test whether the metadata represents a file or directory and
// add the appropriate metadata object.
//
// Note: There was a very old bug here where directories were added
// to the status set as files flattening out recursive listings
// using "-lsr" down the file system hierarchy.
if (fileMetadata.isDir()) {
// Make sure we hide the temp upload folder
if (fileMetadata.getKey().equals(AZURE_TEMP_FOLDER)) {
// Don't expose that.
continue;
}
status.add(newDirectory(fileMetadata, subpath));
} else {
status.add(newFile(fileMetadata, subpath));
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Found path as a directory with " + status.size()
+ " files in it.");
}
} else {
// There is no metadata found for the path.
if (LOG.isDebugEnabled()) {
LOG.debug("Did not find any metadata for path: " + key);
}
throw new FileNotFoundException("File" + f + " does not exist.");
}
return status.toArray(new FileStatus[0]);
}
private FileStatus newFile(FileMetadata meta, Path path) {
return new FileStatus(meta.getLength(), false, 1, blockSize,
meta.getLastModified(), 0, meta.getPermissionStatus().getPermission(),
meta.getPermissionStatus().getUserName(), meta.getPermissionStatus()
.getGroupName(),
path.makeQualified(getUri(), getWorkingDirectory()));
}
private FileStatus newDirectory(FileMetadata meta, Path path) {
return new FileStatus(0, true, 1, blockSize, meta == null ? 0
: meta.getLastModified(), 0, meta == null ? FsPermission.getDefault()
: meta.getPermissionStatus().getPermission(), meta == null ? "" : meta
.getPermissionStatus().getUserName(), meta == null ? "" : meta
.getPermissionStatus().getGroupName(), path.makeQualified(getUri(),
getWorkingDirectory()));
}
private static enum UMaskApplyMode {
NewFile, NewDirectory, ChangeExistingFile, ChangeExistingDirectory,
}
/**
* Applies the applicable UMASK's on the given permission.
*
* @param permission
* The permission to mask.
* @param applyDefaultUmask
* Whether to also apply the default umask.
* @return The masked persmission.
*/
private FsPermission applyUMask(final FsPermission permission,
final UMaskApplyMode applyMode) {
FsPermission newPermission = new FsPermission(permission);
// Apply the default umask - this applies for new files or directories.
if (applyMode == UMaskApplyMode.NewFile
|| applyMode == UMaskApplyMode.NewDirectory) {
newPermission = newPermission
.applyUMask(FsPermission.getUMask(getConf()));
}
return newPermission;
}
/**
* Creates the PermissionStatus object to use for the given permission, based
* on the current user in context.
*
* @param permission
* The permission for the file.
* @return The permission status object to use.
* @throws IOException
* If login fails in getCurrentUser
*/
private PermissionStatus createPermissionStatus(FsPermission permission)
throws IOException {
// Create the permission status for this file based on current user
return new PermissionStatus(UserGroupInformation.getCurrentUser()
.getShortUserName(), getConf().get(AZURE_DEFAULT_GROUP_PROPERTY_NAME,
AZURE_DEFAULT_GROUP_DEFAULT), permission);
}
@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Creating directory: " + f.toString());
}
if (containsColon(f)) {
throw new IOException("Cannot create directory " + f
+ " through WASB that has colons in the name");
}
Path absolutePath = makeAbsolute(f);
PermissionStatus permissionStatus = createPermissionStatus(applyUMask(
permission, UMaskApplyMode.NewDirectory));
ArrayList<String> keysToCreateAsFolder = new ArrayList<String>();
ArrayList<String> keysToUpdateAsFolder = new ArrayList<String>();
boolean childCreated = false;
// Check that there is no file in the parent chain of the given path.
// Stop when you get to the root
for (Path current = absolutePath, parent = current.getParent(); parent != null; current = parent, parent = current
.getParent()) {
String currentKey = pathToKey(current);
FileMetadata currentMetadata = store.retrieveMetadata(currentKey);
if (currentMetadata != null && !currentMetadata.isDir()) {
throw new IOException("Cannot create directory " + f + " because "
+ current + " is an existing file.");
} else if (currentMetadata == null
|| (currentMetadata.isDir() && currentMetadata
.getBlobMaterialization() == BlobMaterialization.Implicit)) {
keysToCreateAsFolder.add(currentKey);
childCreated = true;
} else {
// The directory already exists. Its last modified time need to be
// updated if there is a child directory created under it.
if (childCreated) {
keysToUpdateAsFolder.add(currentKey);
}
childCreated = false;
}
}
for (String currentKey : keysToCreateAsFolder) {
store.storeEmptyFolder(currentKey, permissionStatus);
}
// Take the time after finishing mkdirs as the modified time, and update all
// the existing directories' modified time to it uniformly.
final Calendar lastModifiedCalendar = Calendar
.getInstance(Utility.LOCALE_US);
lastModifiedCalendar.setTimeZone(Utility.UTC_ZONE);
Date lastModified = lastModifiedCalendar.getTime();
for (String key : keysToUpdateAsFolder) {
store.updateFolderLastModifiedTime(key, lastModified);
}
instrumentation.directoryCreated();
// otherwise throws exception
return true;
}
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Opening file: " + f.toString());
}
Path absolutePath = makeAbsolute(f);
String key = pathToKey(absolutePath);
FileMetadata meta = store.retrieveMetadata(key);
if (meta == null) {
throw new FileNotFoundException(f.toString());
}
if (meta.isDir()) {
throw new FileNotFoundException(f.toString()
+ " is a directory not a file.");
}
return new FSDataInputStream(new BufferedFSInputStream(
new NativeAzureFsInputStream(store.retrieve(key), key), bufferSize));
}
@Override
public boolean rename(Path src, Path dst) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Moving " + src + " to " + dst);
}
if (containsColon(dst)) {
throw new IOException("Cannot rename to file " + dst
+ " through WASB that has colons in the name");
}
String srcKey = pathToKey(makeAbsolute(src));
if (srcKey.length() == 0) {
// Cannot rename root of file system
return false;
}
FileMetadata srcMetadata = store.retrieveMetadata(srcKey);
if (srcMetadata == null) {
// Source doesn't exist
if (LOG.isDebugEnabled()) {
LOG.debug("Source " + src + " doesn't exist, failing the rename.");
}
return false;
}
// Figure out the final destination
Path absoluteDst = makeAbsolute(dst);
String dstKey = pathToKey(absoluteDst);
FileMetadata dstMetadata = store.retrieveMetadata(dstKey);
// directory rename validations
if (srcMetadata.isDir()) {
// rename dir to self is an error
if (srcKey.equals(dstKey)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Renaming directory to itself is disallowed. path=" + src);
}
return false;
}
// rename dir to (sub-)child of self is an error. see
// FileSystemContractBaseTest.testRenameChildDirForbidden
if (dstKey.startsWith(srcKey + PATH_DELIMITER)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Renaming directory to a itself is disallowed. src=" + src
+ " dest=" + dst);
}
return false;
}
}
// file rename early checks
if (!srcMetadata.isDir()) {
if (srcKey.equals(dstKey)) {
// rename file to self is OK
if (LOG.isDebugEnabled()) {
LOG.debug("Renaming file to itself. This is allowed and is treated as no-op. path="
+ src);
}
return true;
}
}
// More validations..
// If target is dir but target already exists, alter the dst to be a
// subfolder.
// eg move("/a/file.txt", "/b") where "/b" already exists causes the target
// to be "/c/file.txt
if (dstMetadata != null && dstMetadata.isDir()) {
dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName())));
// Best would be to update dstMetadata, but it is not used further, so set
// it to null and skip the additional cost
dstMetadata = null;
// dstMetadata = store.retrieveMetadata(dstKey);
if (LOG.isDebugEnabled()) {
LOG.debug("Destination " + dst
+ " is a directory, adjusted the destination to be " + dstKey);
}
// rename dir to self is an error
if (srcKey.equals(dstKey)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Renaming directory to itself is disallowed. path=" + src);
}
return false;
}
} else if (dstMetadata != null) {
// Otherwise, attempting to overwrite a file is error
if (LOG.isDebugEnabled()) {
LOG.debug("Destination " + dst
+ " is an already existing file, failing the rename.");
}
return false;
} else {
// Either dir or file and target doesn't exist.. Check that the parent
// directory exists.
FileMetadata parentOfDestMetadata = store
.retrieveMetadata(pathToKey(absoluteDst.getParent()));
if (parentOfDestMetadata == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Parent of the destination " + dst
+ " doesn't exist, failing the rename.");
}
return false;
} else if (!parentOfDestMetadata.isDir()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Parent of the destination " + dst
+ " is a file, failing the rename.");
}
return false;
}
}
// Validations complete, do the move.
if (!srcMetadata.isDir()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Source " + src + " found as a file, renaming.");
}
store.rename(srcKey, dstKey);
} else {
// Move everything inside the folder.
String priorLastKey = null;
// Calculate the index of the part of the string to be moved. That
// is everything on the path up to the folder name.
do {
// List all blobs rooted at the source folder.
PartialListing listing = store.listAll(srcKey, AZURE_LIST_ALL,
AZURE_UNBOUNDED_DEPTH, priorLastKey);
// Rename all the files in the folder.
for (FileMetadata file : listing.getFiles()) {
// Rename all materialized entries under the folder to point to the
// final destination.
if (file.getBlobMaterialization() == BlobMaterialization.Explicit) {
String srcName = file.getKey();
String suffix = srcName.substring(srcKey.length());
String dstName = dstKey + suffix;
store.rename(srcName, dstName);
}
}
priorLastKey = listing.getPriorLastKey();
} while (priorLastKey != null);
// Rename the top level empty blob for the folder.
if (srcMetadata.getBlobMaterialization() == BlobMaterialization.Explicit) {
store.rename(srcKey, dstKey);
}
}
// Update both source and destination parent folder last modified time.
Path srcParent = makeAbsolute(keyToPath(srcKey)).getParent();
if (srcParent != null && srcParent.getParent() != null) { // not root
String srcParentKey = pathToKey(srcParent);
// ensure the srcParent is a materialized folder
FileMetadata srcParentMetadata = store.retrieveMetadata(srcParentKey);
if (srcParentMetadata.isDir()
&& srcParentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
store.storeEmptyFolder(srcParentKey,
createPermissionStatus(FsPermission.getDefault()));
}
store.updateFolderLastModifiedTime(srcParentKey);
}
Path destParent = makeAbsolute(keyToPath(dstKey)).getParent();
if (destParent != null && destParent.getParent() != null) { // not root
String dstParentKey = pathToKey(destParent);
// ensure the dstParent is a materialized folder
FileMetadata dstParentMetadata = store.retrieveMetadata(dstParentKey);
if (dstParentMetadata.isDir()
&& dstParentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
store.storeEmptyFolder(dstParentKey,
createPermissionStatus(FsPermission.getDefault()));
}
store.updateFolderLastModifiedTime(dstParentKey);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Renamed " + src + " to " + dst + " successfully.");
}
return true;
}
/**
* Return an array containing hostnames, offset and size of portions of the
* given file. For WASB we'll just lie and give fake hosts to make sure we get
* many splits in MR jobs.
*/
@Override
public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
long len) throws IOException {
if (file == null) {
return null;
}
if ((start < 0) || (len < 0)) {
throw new IllegalArgumentException("Invalid start or len parameter");
}
if (file.getLen() < start) {
return new BlockLocation[0];
}
final String blobLocationHost = getConf().get(
AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME,
AZURE_BLOCK_LOCATION_HOST_DEFAULT);
final String[] name = { blobLocationHost };
final String[] host = { blobLocationHost };
long blockSize = file.getBlockSize();
if (blockSize <= 0) {
throw new IllegalArgumentException(
"The block size for the given file is not a positive number: "
+ blockSize);
}
int numberOfLocations = (int) (len / blockSize)
+ ((len % blockSize == 0) ? 0 : 1);
BlockLocation[] locations = new BlockLocation[numberOfLocations];
for (int i = 0; i < locations.length; i++) {
long currentOffset = start + (i * blockSize);
long currentLength = Math.min(blockSize, start + len - currentOffset);
locations[i] = new BlockLocation(name, host, currentOffset, currentLength);
}
return locations;
}
/**
* Set the working directory to the given directory.
*/
@Override
public void setWorkingDirectory(Path newDir) {
workingDir = makeAbsolute(newDir);
}
@Override
public Path getWorkingDirectory() {
return workingDir;
}
@Override
public void setPermission(Path p, FsPermission permission) throws IOException {
Path absolutePath = makeAbsolute(p);
String key = pathToKey(absolutePath);
FileMetadata metadata = store.retrieveMetadata(key);
if (metadata == null) {
throw new FileNotFoundException("File doesn't exist: " + p);
}
permission = applyUMask(permission,
metadata.isDir() ? UMaskApplyMode.ChangeExistingDirectory
: UMaskApplyMode.ChangeExistingFile);
if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
// It's an implicit folder, need to materialize it.
store.storeEmptyFolder(key, createPermissionStatus(permission));
} else if (!metadata.getPermissionStatus().getPermission()
.equals(permission)) {
store.changePermissionStatus(key, new PermissionStatus(metadata
.getPermissionStatus().getUserName(), metadata.getPermissionStatus()
.getGroupName(), permission));
}
}
@Override
public void setOwner(Path p, String username, String groupname)
throws IOException {
Path absolutePath = makeAbsolute(p);
String key = pathToKey(absolutePath);
FileMetadata metadata = store.retrieveMetadata(key);
if (metadata == null) {
throw new FileNotFoundException("File doesn't exist: " + p);
}
PermissionStatus newPermissionStatus = new PermissionStatus(
username == null ? metadata.getPermissionStatus().getUserName()
: username, groupname == null ? metadata.getPermissionStatus()
.getGroupName() : groupname, metadata.getPermissionStatus()
.getPermission());
if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
// It's an implicit folder, need to materialize it.
store.storeEmptyFolder(key, newPermissionStatus);
} else {
store.changePermissionStatus(key, newPermissionStatus);
}
}
@Override
public synchronized void close() throws IOException {
if (isClosed) {
return;
}
// Call the base close() to close any resources there.
super.close();
// Close the store
store.close();
// Notify the metrics system that this file system is closed, which may
// trigger one final metrics push to get the accurate final file system
// metrics out.
long startTime = System.currentTimeMillis();
AzureFileSystemMetricsSystem.unregisterSource(metricsSourceName);
AzureFileSystemMetricsSystem.fileSystemClosed();
if (LOG.isDebugEnabled()) {
LOG.debug("Submitting metrics when file system closed took "
+ (System.currentTimeMillis() - startTime) + " ms.");
}
isClosed = true;
}
/**
* A handler that defines what to do with blobs whose upload was interrupted.
*/
private abstract class DanglingFileHandler {
abstract void handleFile(FileMetadata file, FileMetadata tempFile)
throws IOException;
}
/**
* Handler implementation for just deleting dangling files and cleaning them
* up.
*/
private class DanglingFileDeleter extends DanglingFileHandler {
@Override
void handleFile(FileMetadata file, FileMetadata tempFile)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Deleting dangling file " + file.getKey());
}
store.delete(file.getKey());
store.delete(tempFile.getKey());
}
}
/**
* Handler implementation for just moving dangling files to recovery location
* (/lost+found).
*/
private class DanglingFileRecoverer extends DanglingFileHandler {
private final Path destination;
DanglingFileRecoverer(Path destination) {
this.destination = destination;
}
@Override
void handleFile(FileMetadata file, FileMetadata tempFile)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Recovering " + file.getKey());
}
// Move to the final destination
String finalDestinationKey = pathToKey(new Path(destination,
file.getKey()));
store.rename(tempFile.getKey(), finalDestinationKey);
if (!finalDestinationKey.equals(file.getKey())) {
// Delete the empty link file now that we've restored it.
store.delete(file.getKey());
}
}
}
/**
* Check if a path has colons in its name
*/
private boolean containsColon(Path p) {
return p.toUri().getPath().toString().contains(":");
}
/**
* Implements recover and delete (-move and -delete) behaviors for handling
* dangling files (blobs whose upload was interrupted).
*
* @param root
* The root path to check from.
* @param handler
* The handler that deals with dangling files.
*/
private void handleFilesWithDanglingTempData(Path root,
DanglingFileHandler handler) throws IOException {
// Calculate the cut-off for when to consider a blob to be dangling.
long cutoffForDangling = new Date().getTime()
- getConf().getInt(AZURE_TEMP_EXPIRY_PROPERTY_NAME,
AZURE_TEMP_EXPIRY_DEFAULT) * 1000;
// Go over all the blobs under the given root and look for blobs to
// recover.
String priorLastKey = null;
do {
PartialListing listing = store.listAll(pathToKey(root), AZURE_LIST_ALL,
AZURE_UNBOUNDED_DEPTH, priorLastKey);
for (FileMetadata file : listing.getFiles()) {
if (!file.isDir()) { // We don't recover directory blobs
// See if this blob has a link in it (meaning it's a place-holder
// blob for when the upload to the temp blob is complete).
String link = store.getLinkInFileMetadata(file.getKey());
if (link != null) {
// It has a link, see if the temp blob it is pointing to is
// existent and old enough to be considered dangling.
FileMetadata linkMetadata = store.retrieveMetadata(link);
if (linkMetadata != null
&& linkMetadata.getLastModified() >= cutoffForDangling) {
// Found one!
handler.handleFile(file, linkMetadata);
}
}
}
}
priorLastKey = listing.getPriorLastKey();
} while (priorLastKey != null);
}
/**
* Looks under the given root path for any blob that are left "dangling",
* meaning that they are place-holder blobs that we created while we upload
* the data to a temporary blob, but for some reason we crashed in the middle
* of the upload and left them there. If any are found, we move them to the
* destination given.
*
* @param root
* The root path to consider.
* @param destination
* The destination path to move any recovered files to.
* @throws IOException
*/
public void recoverFilesWithDanglingTempData(Path root, Path destination)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Recovering files with dangling temp data in " + root);
}
handleFilesWithDanglingTempData(root,
new DanglingFileRecoverer(destination));
}
/**
* Looks under the given root path for any blob that are left "dangling",
* meaning that they are place-holder blobs that we created while we upload
* the data to a temporary blob, but for some reason we crashed in the middle
* of the upload and left them there. If any are found, we delete them.
*
* @param root
* The root path to consider.
* @throws IOException
*/
public void deleteFilesWithDanglingTempData(Path root) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Deleting files with dangling temp data in " + root);
}
handleFilesWithDanglingTempData(root, new DanglingFileDeleter());
}
@Override
protected void finalize() throws Throwable {
LOG.debug("finalize() called.");
close();
super.finalize();
}
/**
* Encode the key with a random prefix for load balancing in Azure storage.
* Upload data to a random temporary file then do storage side renaming to
* recover the original key.
*
* @param aKey
* @param numBuckets
* @return Encoded version of the original key.
*/
private static String encodeKey(String aKey) {
// Get the tail end of the key name.
//
String fileName = aKey.substring(aKey.lastIndexOf(Path.SEPARATOR) + 1,
aKey.length());
// Construct the randomized prefix of the file name. The prefix ensures the
// file always drops into the same folder but with a varying tail key name.
String filePrefix = AZURE_TEMP_FOLDER + Path.SEPARATOR
+ UUID.randomUUID().toString();
// Concatenate the randomized prefix with the tail of the key name.
String randomizedKey = filePrefix + fileName;
// Return to the caller with the randomized key.
return randomizedKey;
}
}