HDFS-988. saveNamespace race can corrupt the edits log. Contributed by Eli Collins
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@1134951 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 2979bea..b462dc4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1205,6 +1205,8 @@
HDFS-2039. TestNameNodeMetrics uses a bad test root path, preventing it
from running inside Eclipse. (todd)
+ HDFS-988. saveNamespace race can corrupt the edits log. (eli)
+
Release 0.21.1 - Unreleased
HDFS-1466. TestFcHdfsSymlink relies on /tmp/test not existing. (eli)
diff --git a/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 860cd9f..b6e5cc2 100644
--- a/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -1410,9 +1410,8 @@
}
/**
- * flushes out to all replicas of the block.
- * The data is in the buffers of the DNs
- * but not neccessary on the DN's OS buffers.
+ * Flushes out to all replicas of the block. The data is in the buffers
+ * of the DNs but not necessarily in the DN's OS buffers.
*
* It is a synchronous operation. When it returns,
* it guarantees that flushed data become visible to new readers.
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java b/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
index 57d061c..bc9babb 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
@@ -570,6 +570,7 @@
* dumps the contents of recentInvalidateSets
*/
private void dumpRecentInvalidateSets(PrintWriter out) {
+ assert namesystem.hasWriteLock();
int size = recentInvalidateSets.values().size();
out.println("Metasave: Blocks " + pendingDeletionBlocksCount
+ " waiting deletion from " + size + " datanodes.");
@@ -1392,7 +1393,7 @@
DatanodeDescriptor delNodeHint,
boolean logEveryBlock)
throws IOException {
- assert (block != null && namesystem.hasWriteLock());
+ assert block != null && namesystem.hasWriteLock();
BlockInfo storedBlock;
if (block instanceof BlockInfoUnderConstruction) {
//refresh our copy in case the block got completed in another thread
@@ -1571,6 +1572,7 @@
*/
void processOverReplicatedBlock(Block block, short replication,
DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) {
+ assert namesystem.hasWriteLock();
if (addedNode == delNodeHint) {
delNodeHint = null;
}
@@ -1596,6 +1598,7 @@
}
void addToExcessReplicate(DatanodeInfo dn, Block block) {
+ assert namesystem.hasWriteLock();
Collection<Block> excessBlocks = excessReplicateMap.get(dn.getStorageID());
if (excessBlocks == null) {
excessBlocks = new TreeSet<Block>();
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 38b285a..08b10cb 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -53,7 +53,6 @@
import org.apache.hadoop.hdfs.util.ByteArray;
import static org.apache.hadoop.hdfs.server.common.Util.now;
import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.security.UserGroupInformation;
/*************************************************
* FSDirectory stores the filesystem directory state.
@@ -74,29 +73,33 @@
private final int maxDirItems;
private final int lsLimit; // max list limit
- // lock to protect BlockMap.
- private ReentrantReadWriteLock bLock;
+ // lock to protect the directory and BlockMap
+ private ReentrantReadWriteLock dirLock;
private Condition cond;
// utility methods to acquire and release read lock and write lock
void readLock() {
- this.bLock.readLock().lock();
+ this.dirLock.readLock().lock();
}
void readUnlock() {
- this.bLock.readLock().unlock();
+ this.dirLock.readLock().unlock();
}
void writeLock() {
- this.bLock.writeLock().lock();
+ this.dirLock.writeLock().lock();
}
void writeUnlock() {
- this.bLock.writeLock().unlock();
+ this.dirLock.writeLock().unlock();
}
boolean hasWriteLock() {
- return this.bLock.isWriteLockedByCurrentThread();
+ return this.dirLock.isWriteLockedByCurrentThread();
+ }
+
+ boolean hasReadLock() {
+ return this.dirLock.getReadHoldCount() > 0;
}
/**
@@ -111,8 +114,8 @@
}
FSDirectory(FSImage fsImage, FSNamesystem ns, Configuration conf) {
- this.bLock = new ReentrantReadWriteLock(true); // fair
- this.cond = bLock.writeLock().newCondition();
+ this.dirLock = new ReentrantReadWriteLock(true); // fair
+ this.cond = dirLock.writeLock().newCondition();
fsImage.setFSNamesystem(ns);
rootDir = new INodeDirectoryWithQuota(INodeDirectory.ROOT_NAME,
ns.createFsOwnerPermissions(new FsPermission((short)0755)),
@@ -275,6 +278,7 @@
throws UnresolvedLinkException {
INode newNode;
long diskspace = UNKNOWN_DISK_SPACE;
+ assert hasWriteLock();
if (blocks == null)
newNode = new INodeDirectory(permissions, modificationTime);
else {
@@ -463,8 +467,13 @@
}
waitForReady();
long now = now();
- if (!unprotectedRenameTo(src, dst, now))
- return false;
+ writeLock();
+ try {
+ if (!unprotectedRenameTo(src, dst, now))
+ return false;
+ } finally {
+ writeUnlock();
+ }
fsImage.getEditLog().logRename(src, dst, now);
return true;
}
@@ -482,8 +491,13 @@
}
waitForReady();
long now = now();
- if (unprotectedRenameTo(src, dst, now, options)) {
- incrDeletedFileCount(1);
+ writeLock();
+ try {
+ if (unprotectedRenameTo(src, dst, now, options)) {
+ incrDeletedFileCount(1);
+ }
+ } finally {
+ writeUnlock();
}
fsImage.getEditLog().logRename(src, dst, now, options);
}
@@ -502,108 +516,104 @@
boolean unprotectedRenameTo(String src, String dst, long timestamp)
throws QuotaExceededException, UnresolvedLinkException,
FileAlreadyExistsException {
- writeLock();
+ assert hasWriteLock();
+ INode[] srcInodes = rootDir.getExistingPathINodes(src, false);
+ INode srcInode = srcInodes[srcInodes.length-1];
+
+ // check the validation of the source
+ if (srcInode == null) {
+ NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ + "failed to rename " + src + " to " + dst
+ + " because source does not exist");
+ return false;
+ }
+ if (srcInodes.length == 1) {
+ NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ +"failed to rename "+src+" to "+dst+ " because source is the root");
+ return false;
+ }
+ if (isDir(dst)) {
+ dst += Path.SEPARATOR + new Path(src).getName();
+ }
+
+ // check the validity of the destination
+ if (dst.equals(src)) {
+ return true;
+ }
+ if (srcInode.isLink() &&
+ dst.equals(((INodeSymlink)srcInode).getLinkValue())) {
+ throw new FileAlreadyExistsException(
+ "Cannot rename symlink "+src+" to its target "+dst);
+ }
+
+ // dst cannot be directory or a file under src
+ if (dst.startsWith(src) &&
+ dst.charAt(src.length()) == Path.SEPARATOR_CHAR) {
+ NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ + "failed to rename " + src + " to " + dst
+ + " because destination starts with src");
+ return false;
+ }
+
+ byte[][] dstComponents = INode.getPathComponents(dst);
+ INode[] dstInodes = new INode[dstComponents.length];
+ rootDir.getExistingPathINodes(dstComponents, dstInodes, false);
+ if (dstInodes[dstInodes.length-1] != null) {
+ NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ +"failed to rename "+src+" to "+dst+
+ " because destination exists");
+ return false;
+ }
+ if (dstInodes[dstInodes.length-2] == null) {
+ NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ +"failed to rename "+src+" to "+dst+
+ " because destination's parent does not exist");
+ return false;
+ }
+
+ // Ensure dst has quota to accommodate rename
+ verifyQuotaForRename(srcInodes,dstInodes);
+
+ INode dstChild = null;
+ INode srcChild = null;
+ String srcChildName = null;
try {
- INode[] srcInodes = rootDir.getExistingPathINodes(src, false);
- INode srcInode = srcInodes[srcInodes.length-1];
-
- // check the validation of the source
- if (srcInode == null) {
+ // remove src
+ srcChild = removeChild(srcInodes, srcInodes.length-1);
+ if (srcChild == null) {
NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ "failed to rename " + src + " to " + dst
- + " because source does not exist");
- return false;
- }
- if (srcInodes.length == 1) {
- NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
- +"failed to rename "+src+" to "+dst+ " because source is the root");
+ + " because the source can not be removed");
return false;
}
- if (isDir(dst)) {
- dst += Path.SEPARATOR + new Path(src).getName();
- }
+ srcChildName = srcChild.getLocalName();
+ srcChild.setLocalName(dstComponents[dstInodes.length-1]);
- // check the validity of the destination
- if (dst.equals(src)) {
+ // add src to the destination
+ dstChild = addChildNoQuotaCheck(dstInodes, dstInodes.length - 1,
+ srcChild, UNKNOWN_DISK_SPACE, false);
+ if (dstChild != null) {
+ srcChild = null;
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: "
+ + src + " is renamed to " + dst);
+ }
+ // update modification time of dst and the parent of src
+ srcInodes[srcInodes.length-2].setModificationTime(timestamp);
+ dstInodes[dstInodes.length-2].setModificationTime(timestamp);
return true;
}
- if (srcInode.isLink() &&
- dst.equals(((INodeSymlink)srcInode).getLinkValue())) {
- throw new FileAlreadyExistsException(
- "Cannot rename symlink "+src+" to its target "+dst);
- }
-
- // dst cannot be directory or a file under src
- if (dst.startsWith(src) &&
- dst.charAt(src.length()) == Path.SEPARATOR_CHAR) {
- NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
- + "failed to rename " + src + " to " + dst
- + " because destination starts with src");
- return false;
- }
-
- byte[][] dstComponents = INode.getPathComponents(dst);
- INode[] dstInodes = new INode[dstComponents.length];
- rootDir.getExistingPathINodes(dstComponents, dstInodes, false);
- if (dstInodes[dstInodes.length-1] != null) {
- NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
- +"failed to rename "+src+" to "+dst+
- " because destination exists");
- return false;
- }
- if (dstInodes[dstInodes.length-2] == null) {
- NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
- +"failed to rename "+src+" to "+dst+
- " because destination's parent does not exist");
- return false;
- }
-
- // Ensure dst has quota to accommodate rename
- verifyQuotaForRename(srcInodes,dstInodes);
-
- INode dstChild = null;
- INode srcChild = null;
- String srcChildName = null;
- try {
- // remove src
- srcChild = removeChild(srcInodes, srcInodes.length-1);
- if (srcChild == null) {
- NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
- + "failed to rename " + src + " to " + dst
- + " because the source can not be removed");
- return false;
- }
- srcChildName = srcChild.getLocalName();
- srcChild.setLocalName(dstComponents[dstInodes.length-1]);
-
- // add src to the destination
- dstChild = addChildNoQuotaCheck(dstInodes, dstInodes.length - 1,
- srcChild, UNKNOWN_DISK_SPACE, false);
- if (dstChild != null) {
- srcChild = null;
- if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: "
- + src + " is renamed to " + dst);
- }
- // update modification time of dst and the parent of src
- srcInodes[srcInodes.length-2].setModificationTime(timestamp);
- dstInodes[dstInodes.length-2].setModificationTime(timestamp);
- return true;
- }
- } finally {
- if (dstChild == null && srcChild != null) {
- // put it back
- srcChild.setLocalName(srcChildName);
- addChildNoQuotaCheck(srcInodes, srcInodes.length - 1, srcChild,
- UNKNOWN_DISK_SPACE, false);
- }
- }
- NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
- +"failed to rename "+src+" to "+dst);
- return false;
} finally {
- writeUnlock();
+ if (dstChild == null && srcChild != null) {
+ // put it back
+ srcChild.setLocalName(srcChildName);
+ addChildNoQuotaCheck(srcInodes, srcInodes.length - 1, srcChild,
+ UNKNOWN_DISK_SPACE, false);
+ }
}
+ NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ +"failed to rename "+src+" to "+dst);
+ return false;
}
/**
@@ -620,6 +630,7 @@
Options.Rename... options) throws FileAlreadyExistsException,
FileNotFoundException, ParentNotDirectoryException,
QuotaExceededException, UnresolvedLinkException, IOException {
+ assert hasWriteLock();
boolean overwrite = false;
if (null != options) {
for (Rename option : options) {
@@ -629,157 +640,152 @@
}
}
String error = null;
- writeLock();
- try {
- final INode[] srcInodes = rootDir.getExistingPathINodes(src, false);
- final INode srcInode = srcInodes[srcInodes.length - 1];
- // validate source
- if (srcInode == null) {
- error = "rename source " + src + " is not found.";
- NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
- + error);
- throw new FileNotFoundException(error);
- }
- if (srcInodes.length == 1) {
- error = "rename source cannot be the root";
- NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
- + error);
- throw new IOException(error);
- }
-
- // validate the destination
- if (dst.equals(src)) {
- throw new FileAlreadyExistsException(
- "The source "+src+" and destination "+dst+" are the same");
- }
- if (srcInode.isLink() &&
- dst.equals(((INodeSymlink)srcInode).getLinkValue())) {
- throw new FileAlreadyExistsException(
- "Cannot rename symlink "+src+" to its target "+dst);
- }
- // dst cannot be a directory or a file under src
- if (dst.startsWith(src) &&
- dst.charAt(src.length()) == Path.SEPARATOR_CHAR) {
- error = "Rename destination " + dst
- + " is a directory or file under source " + src;
- NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
- + error);
- throw new IOException(error);
- }
- final byte[][] dstComponents = INode.getPathComponents(dst);
- final INode[] dstInodes = new INode[dstComponents.length];
- rootDir.getExistingPathINodes(dstComponents, dstInodes, false);
- INode dstInode = dstInodes[dstInodes.length - 1];
- if (dstInodes.length == 1) {
- error = "rename destination cannot be the root";
- NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
- + error);
- throw new IOException(error);
- }
- if (dstInode != null) { // Destination exists
- // It's OK to rename a file to a symlink and vice versa
- if (dstInode.isDirectory() != srcInode.isDirectory()) {
- error = "Source " + src + " and destination " + dst
- + " must both be directories";
- NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
- + error);
- throw new IOException(error);
- }
- if (!overwrite) { // If destination exists, overwrite flag must be true
- error = "rename destination " + dst + " already exists";
- NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
- + error);
- throw new FileAlreadyExistsException(error);
- }
- List<INode> children = dstInode.isDirectory() ?
- ((INodeDirectory) dstInode).getChildrenRaw() : null;
- if (children != null && children.size() != 0) {
- error = "rename cannot overwrite non empty destination directory "
- + dst;
- NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
- + error);
- throw new IOException(error);
- }
- }
- if (dstInodes[dstInodes.length - 2] == null) {
- error = "rename destination parent " + dst + " not found.";
- NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
- + error);
- throw new FileNotFoundException(error);
- }
- if (!dstInodes[dstInodes.length - 2].isDirectory()) {
- error = "rename destination parent " + dst + " is a file.";
- NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
- + error);
- throw new ParentNotDirectoryException(error);
- }
-
- // Ensure dst has quota to accommodate rename
- verifyQuotaForRename(srcInodes, dstInodes);
- INode removedSrc = removeChild(srcInodes, srcInodes.length - 1);
- if (removedSrc == null) {
- error = "Failed to rename " + src + " to " + dst
- + " because the source can not be removed";
- NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
- + error);
- throw new IOException(error);
- }
- final String srcChildName = removedSrc.getLocalName();
- String dstChildName = null;
- INode removedDst = null;
- try {
- if (dstInode != null) { // dst exists remove it
- removedDst = removeChild(dstInodes, dstInodes.length - 1);
- dstChildName = removedDst.getLocalName();
- }
-
- INode dstChild = null;
- removedSrc.setLocalName(dstComponents[dstInodes.length - 1]);
- // add src as dst to complete rename
- dstChild = addChildNoQuotaCheck(dstInodes, dstInodes.length - 1,
- removedSrc, UNKNOWN_DISK_SPACE, false);
-
- int filesDeleted = 0;
- if (dstChild != null) {
- removedSrc = null;
- if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug(
- "DIR* FSDirectory.unprotectedRenameTo: " + src
- + " is renamed to " + dst);
- }
- srcInodes[srcInodes.length - 2].setModificationTime(timestamp);
- dstInodes[dstInodes.length - 2].setModificationTime(timestamp);
-
- // Collect the blocks and remove the lease for previous dst
- if (removedDst != null) {
- INode rmdst = removedDst;
- removedDst = null;
- List<Block> collectedBlocks = new ArrayList<Block>();
- filesDeleted = rmdst.collectSubtreeBlocksAndClear(collectedBlocks);
- getFSNamesystem().removePathAndBlocks(src, collectedBlocks);
- }
- return filesDeleted >0;
- }
- } finally {
- if (removedSrc != null) {
- // Rename failed - restore src
- removedSrc.setLocalName(srcChildName);
- addChildNoQuotaCheck(srcInodes, srcInodes.length - 1, removedSrc,
- UNKNOWN_DISK_SPACE, false);
- }
- if (removedDst != null) {
- // Rename failed - restore dst
- removedDst.setLocalName(dstChildName);
- addChildNoQuotaCheck(dstInodes, dstInodes.length - 1, removedDst,
- UNKNOWN_DISK_SPACE, false);
- }
- }
+ final INode[] srcInodes = rootDir.getExistingPathINodes(src, false);
+ final INode srcInode = srcInodes[srcInodes.length - 1];
+ // validate source
+ if (srcInode == null) {
+ error = "rename source " + src + " is not found.";
NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
- + "failed to rename " + src + " to " + dst);
- throw new IOException("rename from " + src + " to " + dst + " failed.");
- } finally {
- writeUnlock();
+ + error);
+ throw new FileNotFoundException(error);
}
+ if (srcInodes.length == 1) {
+ error = "rename source cannot be the root";
+ NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ + error);
+ throw new IOException(error);
+ }
+
+ // validate the destination
+ if (dst.equals(src)) {
+ throw new FileAlreadyExistsException(
+ "The source "+src+" and destination "+dst+" are the same");
+ }
+ if (srcInode.isLink() &&
+ dst.equals(((INodeSymlink)srcInode).getLinkValue())) {
+ throw new FileAlreadyExistsException(
+ "Cannot rename symlink "+src+" to its target "+dst);
+ }
+ // dst cannot be a directory or a file under src
+ if (dst.startsWith(src) &&
+ dst.charAt(src.length()) == Path.SEPARATOR_CHAR) {
+ error = "Rename destination " + dst
+ + " is a directory or file under source " + src;
+ NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ + error);
+ throw new IOException(error);
+ }
+ final byte[][] dstComponents = INode.getPathComponents(dst);
+ final INode[] dstInodes = new INode[dstComponents.length];
+ rootDir.getExistingPathINodes(dstComponents, dstInodes, false);
+ INode dstInode = dstInodes[dstInodes.length - 1];
+ if (dstInodes.length == 1) {
+ error = "rename destination cannot be the root";
+ NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ + error);
+ throw new IOException(error);
+ }
+ if (dstInode != null) { // Destination exists
+ // It's OK to rename a file to a symlink and vice versa
+ if (dstInode.isDirectory() != srcInode.isDirectory()) {
+ error = "Source " + src + " and destination " + dst
+ + " must both be directories";
+ NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ + error);
+ throw new IOException(error);
+ }
+ if (!overwrite) { // If destination exists, overwrite flag must be true
+ error = "rename destination " + dst + " already exists";
+ NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ + error);
+ throw new FileAlreadyExistsException(error);
+ }
+ List<INode> children = dstInode.isDirectory() ?
+ ((INodeDirectory) dstInode).getChildrenRaw() : null;
+ if (children != null && children.size() != 0) {
+ error = "rename cannot overwrite non empty destination directory "
+ + dst;
+ NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ + error);
+ throw new IOException(error);
+ }
+ }
+ if (dstInodes[dstInodes.length - 2] == null) {
+ error = "rename destination parent " + dst + " not found.";
+ NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ + error);
+ throw new FileNotFoundException(error);
+ }
+ if (!dstInodes[dstInodes.length - 2].isDirectory()) {
+ error = "rename destination parent " + dst + " is a file.";
+ NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ + error);
+ throw new ParentNotDirectoryException(error);
+ }
+
+ // Ensure dst has quota to accommodate rename
+ verifyQuotaForRename(srcInodes, dstInodes);
+ INode removedSrc = removeChild(srcInodes, srcInodes.length - 1);
+ if (removedSrc == null) {
+ error = "Failed to rename " + src + " to " + dst
+ + " because the source can not be removed";
+ NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ + error);
+ throw new IOException(error);
+ }
+ final String srcChildName = removedSrc.getLocalName();
+ String dstChildName = null;
+ INode removedDst = null;
+ try {
+ if (dstInode != null) { // dst exists remove it
+ removedDst = removeChild(dstInodes, dstInodes.length - 1);
+ dstChildName = removedDst.getLocalName();
+ }
+
+ INode dstChild = null;
+ removedSrc.setLocalName(dstComponents[dstInodes.length - 1]);
+ // add src as dst to complete rename
+ dstChild = addChildNoQuotaCheck(dstInodes, dstInodes.length - 1,
+ removedSrc, UNKNOWN_DISK_SPACE, false);
+
+ int filesDeleted = 0;
+ if (dstChild != null) {
+ removedSrc = null;
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug(
+ "DIR* FSDirectory.unprotectedRenameTo: " + src
+ + " is renamed to " + dst);
+ }
+ srcInodes[srcInodes.length - 2].setModificationTime(timestamp);
+ dstInodes[dstInodes.length - 2].setModificationTime(timestamp);
+
+ // Collect the blocks and remove the lease for previous dst
+ if (removedDst != null) {
+ INode rmdst = removedDst;
+ removedDst = null;
+ List<Block> collectedBlocks = new ArrayList<Block>();
+ filesDeleted = rmdst.collectSubtreeBlocksAndClear(collectedBlocks);
+ getFSNamesystem().removePathAndBlocks(src, collectedBlocks);
+ }
+ return filesDeleted >0;
+ }
+ } finally {
+ if (removedSrc != null) {
+ // Rename failed - restore src
+ removedSrc.setLocalName(srcChildName);
+ addChildNoQuotaCheck(srcInodes, srcInodes.length - 1, removedSrc,
+ UNKNOWN_DISK_SPACE, false);
+ }
+ if (removedDst != null) {
+ // Rename failed - restore dst
+ removedDst.setLocalName(dstChildName);
+ addChildNoQuotaCheck(dstInodes, dstInodes.length - 1, removedDst,
+ UNKNOWN_DISK_SPACE, false);
+ }
+ }
+ NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ + "failed to rename " + src + " to " + dst);
+ throw new IOException("rename from " + src + " to " + dst + " failed.");
}
/**
@@ -794,10 +800,16 @@
Block[] setReplication(String src, short replication, int[] oldReplication)
throws QuotaExceededException, UnresolvedLinkException {
waitForReady();
- Block[] fileBlocks = unprotectedSetReplication(src, replication, oldReplication);
- if (fileBlocks != null) // log replication change
- fsImage.getEditLog().logSetReplication(src, replication);
- return fileBlocks;
+ Block[] fileBlocks = null;
+ writeLock();
+ try {
+ fileBlocks = unprotectedSetReplication(src, replication, oldReplication);
+ if (fileBlocks != null) // log replication change
+ fsImage.getEditLog().logSetReplication(src, replication);
+ return fileBlocks;
+ } finally {
+ writeUnlock();
+ }
}
Block[] unprotectedSetReplication(String src,
@@ -805,37 +817,31 @@
int[] oldReplication
) throws QuotaExceededException,
UnresolvedLinkException {
+ assert hasWriteLock();
if (oldReplication == null) {
oldReplication = new int[1];
}
oldReplication[0] = -1;
- Block[] fileBlocks = null;
- writeLock();
- try {
- INode[] inodes = rootDir.getExistingPathINodes(src, true);
- INode inode = inodes[inodes.length - 1];
- if (inode == null) {
- return null;
- }
- assert !inode.isLink();
- if (inode.isDirectory()) {
- return null;
- }
- INodeFile fileNode = (INodeFile)inode;
- oldReplication[0] = fileNode.getReplication();
-
- // check disk quota
- long dsDelta = (replication - oldReplication[0]) *
- (fileNode.diskspaceConsumed()/oldReplication[0]);
- updateCount(inodes, inodes.length-1, 0, dsDelta, true);
-
- fileNode.setReplication(replication);
- fileBlocks = fileNode.getBlocks();
- } finally {
- writeUnlock();
+ INode[] inodes = rootDir.getExistingPathINodes(src, true);
+ INode inode = inodes[inodes.length - 1];
+ if (inode == null) {
+ return null;
}
- return fileBlocks;
+ assert !inode.isLink();
+ if (inode.isDirectory()) {
+ return null;
+ }
+ INodeFile fileNode = (INodeFile)inode;
+ oldReplication[0] = fileNode.getReplication();
+
+ // check disk quota
+ long dsDelta = (replication - oldReplication[0]) *
+ (fileNode.diskspaceConsumed()/oldReplication[0]);
+ updateCount(inodes, inodes.length-1, 0, dsDelta, true);
+
+ fileNode.setReplication(replication);
+ return fileNode.getBlocks();
}
/**
@@ -876,55 +882,57 @@
}
}
- void setPermission(String src, FsPermission permission
- ) throws FileNotFoundException, UnresolvedLinkException {
- unprotectedSetPermission(src, permission);
+ void setPermission(String src, FsPermission permission)
+ throws FileNotFoundException, UnresolvedLinkException {
+ writeLock();
+ try {
+ unprotectedSetPermission(src, permission);
+ } finally {
+ writeUnlock();
+ }
fsImage.getEditLog().logSetPermissions(src, permission);
}
void unprotectedSetPermission(String src, FsPermission permissions)
- throws FileNotFoundException, UnresolvedLinkException {
+ throws FileNotFoundException, UnresolvedLinkException {
+ assert hasWriteLock();
+ INode inode = rootDir.getNode(src, true);
+ if (inode == null) {
+ throw new FileNotFoundException("File does not exist: " + src);
+ }
+ inode.setPermission(permissions);
+ }
+
+ void setOwner(String src, String username, String groupname)
+ throws FileNotFoundException, UnresolvedLinkException {
writeLock();
try {
- INode inode = rootDir.getNode(src, true);
- if (inode == null) {
- throw new FileNotFoundException("File does not exist: " + src);
- }
- inode.setPermission(permissions);
+ unprotectedSetOwner(src, username, groupname);
} finally {
writeUnlock();
}
- }
-
- void setOwner(String src, String username, String groupname
- ) throws FileNotFoundException, UnresolvedLinkException {
- unprotectedSetOwner(src, username, groupname);
fsImage.getEditLog().logSetOwner(src, username, groupname);
}
void unprotectedSetOwner(String src, String username, String groupname)
- throws FileNotFoundException, UnresolvedLinkException {
- writeLock();
- try {
- INode inode = rootDir.getNode(src, true);
- if (inode == null) {
- throw new FileNotFoundException("File does not exist: " + src);
- }
- if (username != null) {
- inode.setUser(username);
- }
- if (groupname != null) {
- inode.setGroup(groupname);
- }
- } finally {
- writeUnlock();
+ throws FileNotFoundException, UnresolvedLinkException {
+ assert hasWriteLock();
+ INode inode = rootDir.getNode(src, true);
+ if (inode == null) {
+ throw new FileNotFoundException("File does not exist: " + src);
+ }
+ if (username != null) {
+ inode.setUser(username);
+ }
+ if (groupname != null) {
+ inode.setGroup(groupname);
}
}
/**
* Concat all the blocks from srcs to trg and delete the srcs files
*/
- public void concatInternal(String target, String [] srcs)
+ public void concat(String target, String [] srcs)
throws UnresolvedLinkException {
writeLock();
try {
@@ -950,6 +958,7 @@
*/
public void unprotectedConcat(String target, String [] srcs, long timestamp)
throws UnresolvedLinkException {
+ assert hasWriteLock();
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSNamesystem.concat to "+target);
}
@@ -999,7 +1008,13 @@
}
waitForReady();
long now = now();
- int filesRemoved = unprotectedDelete(src, collectedBlocks, now);
+ int filesRemoved;
+ writeLock();
+ try {
+ filesRemoved = unprotectedDelete(src, collectedBlocks, now);
+ } finally {
+ writeUnlock();
+ }
if (filesRemoved <= 0) {
return false;
}
@@ -1052,6 +1067,7 @@
*/
void unprotectedDelete(String src, long mtime)
throws UnresolvedLinkException {
+ assert hasWriteLock();
List<Block> collectedBlocks = new ArrayList<Block>();
int filesRemoved = unprotectedDelete(src, collectedBlocks, mtime);
if (filesRemoved > 0) {
@@ -1069,43 +1085,39 @@
*/
int unprotectedDelete(String src, List<Block> collectedBlocks,
long mtime) throws UnresolvedLinkException {
+ assert hasWriteLock();
src = normalizePath(src);
- writeLock();
- try {
- INode[] inodes = rootDir.getExistingPathINodes(src, false);
- INode targetNode = inodes[inodes.length-1];
+ INode[] inodes = rootDir.getExistingPathINodes(src, false);
+ INode targetNode = inodes[inodes.length-1];
- if (targetNode == null) { // non-existent src
- if(NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
- +"failed to remove "+src+" because it does not exist");
- }
- return 0;
- }
- if (inodes.length == 1) { // src is the root
- NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: " +
- "failed to remove " + src +
- " because the root is not allowed to be deleted");
- return 0;
- }
- int pos = inodes.length - 1;
- // Remove the node from the namespace
- targetNode = removeChild(inodes, pos);
- if (targetNode == null) {
- return 0;
- }
- // set the parent's modification time
- inodes[pos-1].setModificationTime(mtime);
- int filesRemoved = targetNode.collectSubtreeBlocksAndClear(collectedBlocks);
- if (NameNode.stateChangeLog.isDebugEnabled()) {
+ if (targetNode == null) { // non-existent src
+ if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
- +src+" is removed");
+ +"failed to remove "+src+" because it does not exist");
}
- return filesRemoved;
- } finally {
- writeUnlock();
+ return 0;
}
+ if (inodes.length == 1) { // src is the root
+ NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: " +
+ "failed to remove " + src +
+ " because the root is not allowed to be deleted");
+ return 0;
+ }
+ int pos = inodes.length - 1;
+ // Remove the node from the namespace
+ targetNode = removeChild(inodes, pos);
+ if (targetNode == null) {
+ return 0;
+ }
+ // set the parent's modification time
+ inodes[pos-1].setModificationTime(mtime);
+ int filesRemoved = targetNode.collectSubtreeBlocksAndClear(collectedBlocks);
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
+ +src+" is removed");
+ }
+ return filesRemoved;
}
/**
@@ -1350,6 +1362,7 @@
private void updateCount(INode[] inodes, int numOfINodes,
long nsDelta, long dsDelta, boolean checkQuota)
throws QuotaExceededException {
+ assert hasWriteLock();
if (!ready) {
//still initializing. do not check or update quotas.
return;
@@ -1374,6 +1387,7 @@
*/
private void updateCountNoQuotaCheck(INode[] inodes, int numOfINodes,
long nsDelta, long dsDelta) {
+ assert hasWriteLock();
try {
updateCount(inodes, numOfINodes, nsDelta, dsDelta, false);
} catch (QuotaExceededException e) {
@@ -1391,6 +1405,7 @@
*/
void unprotectedUpdateCount(INode[] inodes, int numOfINodes,
long nsDelta, long dsDelta) {
+ assert hasWriteLock();
for(int i=0; i < numOfINodes; i++) {
if (inodes[i].isQuotaSet()) { // a directory with quota
INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i];
@@ -1501,17 +1516,14 @@
INode unprotectedMkdir(String src, PermissionStatus permissions,
long timestamp) throws QuotaExceededException,
UnresolvedLinkException {
+ assert hasWriteLock();
byte[][] components = INode.getPathComponents(src);
INode[] inodes = new INode[components.length];
- writeLock();
- try {
- rootDir.getExistingPathINodes(components, inodes, false);
- unprotectedMkdir(inodes, inodes.length-1, components[inodes.length-1],
- permissions, false, timestamp);
- return inodes[inodes.length-1];
- } finally {
- writeUnlock();
- }
+
+ rootDir.getExistingPathINodes(components, inodes, false);
+ unprotectedMkdir(inodes, inodes.length-1, components[inodes.length-1],
+ permissions, false, timestamp);
+ return inodes[inodes.length-1];
}
/** create a directory at index pos.
@@ -1521,6 +1533,7 @@
private void unprotectedMkdir(INode[] inodes, int pos,
byte[] name, PermissionStatus permission, boolean inheritPermission,
long timestamp) throws QuotaExceededException {
+ assert hasWriteLock();
inodes[pos] = addChild(inodes, pos,
new INodeDirectory(name, permission, timestamp),
-1, inheritPermission );
@@ -1851,7 +1864,8 @@
*/
INodeDirectory unprotectedSetQuota(String src, long nsQuota, long dsQuota)
throws FileNotFoundException, QuotaExceededException,
- UnresolvedLinkException {
+ UnresolvedLinkException {
+ assert hasWriteLock();
// sanity check
if ((nsQuota < 0 && nsQuota != FSConstants.QUOTA_DONT_SET &&
nsQuota < FSConstants.QUOTA_RESET) ||
@@ -1864,50 +1878,45 @@
String srcs = normalizePath(src);
- writeLock();
- try {
- INode[] inodes = rootDir.getExistingPathINodes(src, true);
- INode targetNode = inodes[inodes.length-1];
- if (targetNode == null) {
- throw new FileNotFoundException("Directory does not exist: " + srcs);
- } else if (!targetNode.isDirectory()) {
- throw new FileNotFoundException("Cannot set quota on a file: " + srcs);
- } else if (targetNode.isRoot() && nsQuota == FSConstants.QUOTA_RESET) {
- throw new IllegalArgumentException("Cannot clear namespace quota on root.");
- } else { // a directory inode
- INodeDirectory dirNode = (INodeDirectory)targetNode;
- long oldNsQuota = dirNode.getNsQuota();
- long oldDsQuota = dirNode.getDsQuota();
- if (nsQuota == FSConstants.QUOTA_DONT_SET) {
- nsQuota = oldNsQuota;
- }
- if (dsQuota == FSConstants.QUOTA_DONT_SET) {
- dsQuota = oldDsQuota;
- }
+ INode[] inodes = rootDir.getExistingPathINodes(src, true);
+ INode targetNode = inodes[inodes.length-1];
+ if (targetNode == null) {
+ throw new FileNotFoundException("Directory does not exist: " + srcs);
+ } else if (!targetNode.isDirectory()) {
+ throw new FileNotFoundException("Cannot set quota on a file: " + srcs);
+ } else if (targetNode.isRoot() && nsQuota == FSConstants.QUOTA_RESET) {
+ throw new IllegalArgumentException("Cannot clear namespace quota on root.");
+ } else { // a directory inode
+ INodeDirectory dirNode = (INodeDirectory)targetNode;
+ long oldNsQuota = dirNode.getNsQuota();
+ long oldDsQuota = dirNode.getDsQuota();
+ if (nsQuota == FSConstants.QUOTA_DONT_SET) {
+ nsQuota = oldNsQuota;
+ }
+ if (dsQuota == FSConstants.QUOTA_DONT_SET) {
+ dsQuota = oldDsQuota;
+ }
- if (dirNode instanceof INodeDirectoryWithQuota) {
- // a directory with quota; so set the quota to the new value
- ((INodeDirectoryWithQuota)dirNode).setQuota(nsQuota, dsQuota);
- if (!dirNode.isQuotaSet()) {
- // will not come here for root because root's nsQuota is always set
- INodeDirectory newNode = new INodeDirectory(dirNode);
- INodeDirectory parent = (INodeDirectory)inodes[inodes.length-2];
- dirNode = newNode;
- parent.replaceChild(newNode);
- }
- } else {
- // a non-quota directory; so replace it with a directory with quota
- INodeDirectoryWithQuota newNode =
- new INodeDirectoryWithQuota(nsQuota, dsQuota, dirNode);
- // non-root directory node; parent != null
+ if (dirNode instanceof INodeDirectoryWithQuota) {
+ // a directory with quota; so set the quota to the new value
+ ((INodeDirectoryWithQuota)dirNode).setQuota(nsQuota, dsQuota);
+ if (!dirNode.isQuotaSet()) {
+ // will not come here for root because root's nsQuota is always set
+ INodeDirectory newNode = new INodeDirectory(dirNode);
INodeDirectory parent = (INodeDirectory)inodes[inodes.length-2];
dirNode = newNode;
parent.replaceChild(newNode);
}
- return (oldNsQuota != nsQuota || oldDsQuota != dsQuota) ? dirNode : null;
+ } else {
+ // a non-quota directory; so replace it with a directory with quota
+ INodeDirectoryWithQuota newNode =
+ new INodeDirectoryWithQuota(nsQuota, dsQuota, dirNode);
+ // non-root directory node; parent != null
+ INodeDirectory parent = (INodeDirectory)inodes[inodes.length-2];
+ dirNode = newNode;
+ parent.replaceChild(newNode);
}
- } finally {
- writeUnlock();
+ return (oldNsQuota != nsQuota || oldDsQuota != dsQuota) ? dirNode : null;
}
}
@@ -1941,27 +1950,31 @@
}
/**
- * Sets the access time on the file. Logs it in the transaction log
+ * Sets the access time on the file. Logs it in the transaction log.
*/
void setTimes(String src, INodeFile inode, long mtime, long atime, boolean force) {
- if (unprotectedSetTimes(src, inode, mtime, atime, force)) {
+ boolean status = false;
+ writeLock();
+ try {
+ status = unprotectedSetTimes(src, inode, mtime, atime, force);
+ } finally {
+ writeUnlock();
+ }
+ if (status) {
fsImage.getEditLog().logTimes(src, mtime, atime);
}
}
boolean unprotectedSetTimes(String src, long mtime, long atime, boolean force)
- throws UnresolvedLinkException {
- writeLock();
- try {
- INodeFile inode = getFileINode(src);
- return unprotectedSetTimes(src, inode, mtime, atime, force);
- } finally {
- writeUnlock();
- }
+ throws UnresolvedLinkException {
+ assert hasWriteLock();
+ INodeFile inode = getFileINode(src);
+ return unprotectedSetTimes(src, inode, mtime, atime, force);
}
private boolean unprotectedSetTimes(String src, INodeFile inode, long mtime,
long atime, boolean force) {
+ assert hasWriteLock();
boolean status = false;
if (mtime != -1) {
inode.setModificationTimeForce(mtime);
@@ -2040,6 +2053,7 @@
*/
private HdfsLocatedFileStatus createLocatedFileStatus(
byte[] path, INode node) throws IOException {
+ assert hasReadLock();
long size = 0; // length is zero for directories
short replication = 0;
long blocksize = 0;
@@ -2088,8 +2102,14 @@
}
}
final String userName = dirPerms.getUserName();
- INodeSymlink newNode = unprotectedSymlink(path, target, modTime, modTime,
- new PermissionStatus(userName, null, FsPermission.getDefault()));
+ INodeSymlink newNode = null;
+ writeLock();
+ try {
+ newNode = unprotectedSymlink(path, target, modTime, modTime,
+ new PermissionStatus(userName, null, FsPermission.getDefault()));
+ } finally {
+ writeUnlock();
+ }
if (newNode == null) {
NameNode.stateChangeLog.info("DIR* FSDirectory.addSymlink: "
+"failed to add "+path
@@ -2111,14 +2131,10 @@
INodeSymlink unprotectedSymlink(String path, String target, long modTime,
long atime, PermissionStatus perm)
throws UnresolvedLinkException {
+ assert hasWriteLock();
INodeSymlink newNode = new INodeSymlink(target, modTime, atime, perm);
try {
- writeLock();
- try {
- newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE, false);
- } finally {
- writeUnlock();
- }
+ newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE, false);
} catch (UnresolvedLinkException e) {
/* All UnresolvedLinkExceptions should have been resolved by now, but we
* should re-throw them in case that changes so they are not swallowed
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 0c344bd..8d1ad10 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -129,6 +129,9 @@
numOpRenewDelegationToken = 0, numOpCancelDelegationToken = 0,
numOpUpdateMasterKey = 0, numOpReassignLease = 0, numOpOther = 0;
+ fsNamesys.writeLock();
+ fsDir.writeLock();
+
// Keep track of the file offsets of the last several opcodes.
// This is handy when manually recovering corrupted edits files.
PositionTrackingInputStream tracker = new PositionTrackingInputStream(in);
@@ -248,7 +251,7 @@
HdfsFileStatus dinfo = fsDir.getFileInfo(renameOp.dst, false);
fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
renameOp.timestamp);
- fsNamesys.changeLease(renameOp.src, renameOp.dst, dinfo);
+ fsNamesys.unprotectedChangeLease(renameOp.src, renameOp.dst, dinfo);
break;
}
case OP_DELETE: {
@@ -339,7 +342,7 @@
HdfsFileStatus dinfo = fsDir.getFileInfo(renameOp.dst, false);
fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
renameOp.timestamp, renameOp.options);
- fsNamesys.changeLease(renameOp.src, renameOp.dst, dinfo);
+ fsNamesys.unprotectedChangeLease(renameOp.src, renameOp.dst, dinfo);
break;
}
case OP_GET_DELEGATION_TOKEN: {
@@ -423,6 +426,9 @@
String errorMessage = sb.toString();
FSImage.LOG.error(errorMessage);
throw new IOException(errorMessage, t);
+ } finally {
+ fsDir.writeUnlock();
+ fsNamesys.writeUnlock();
}
if (FSImage.LOG.isDebugEnabled()) {
FSImage.LOG.debug("numOpAdd = " + numOpAdd + " numOpClose = " + numOpClose
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 4f1b645..19b5002 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -491,6 +491,14 @@
return this.fsLock.isWriteLockedByCurrentThread();
}
+ boolean hasReadLock() {
+ return this.fsLock.getReadHoldCount() > 0;
+ }
+
+ boolean hasReadOrWriteLock() {
+ return hasReadLock() || hasWriteLock();
+ }
+
/**
* dirs is a list of directories where the filesystem directory state
* is stored
@@ -601,11 +609,14 @@
}
NamespaceInfo getNamespaceInfo() {
- return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(),
- getClusterId(),
- getBlockPoolId(),
- dir.fsImage.getStorage().getCTime(),
- getDistributedUpgradeVersion());
+ readLock();
+ try {
+ return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(),
+ getClusterId(), getBlockPoolId(),
+ dir.fsImage.getStorage().getCTime(), getDistributedUpgradeVersion());
+ } finally {
+ readUnlock();
+ }
}
/**
@@ -654,32 +665,32 @@
void metaSave(String filename) throws IOException {
writeLock();
try {
- checkSuperuserPrivilege();
- File file = new File(System.getProperty("hadoop.log.dir"), filename);
- PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(file,
- true)));
-
- long totalInodes = this.dir.totalInodes();
- long totalBlocks = this.getBlocksTotal();
-
- ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
- ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
- this.DFSNodesStatus(live, dead);
-
- String str = totalInodes + " files and directories, " + totalBlocks
- + " blocks = " + (totalInodes + totalBlocks) + " total";
- out.println(str);
- out.println("Live Datanodes: "+live.size());
- out.println("Dead Datanodes: "+dead.size());
- blockManager.metaSave(out);
-
- //
- // Dump all datanodes
- //
- datanodeDump(out);
-
- out.flush();
- out.close();
+ checkSuperuserPrivilege();
+ File file = new File(System.getProperty("hadoop.log.dir"), filename);
+ PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(file,
+ true)));
+
+ long totalInodes = this.dir.totalInodes();
+ long totalBlocks = this.getBlocksTotal();
+
+ ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
+ ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
+ this.DFSNodesStatus(live, dead);
+
+ String str = totalInodes + " files and directories, " + totalBlocks
+ + " blocks = " + (totalInodes + totalBlocks) + " total";
+ out.println(str);
+ out.println("Live Datanodes: "+live.size());
+ out.println("Dead Datanodes: "+dead.size());
+ blockManager.metaSave(out);
+
+ //
+ // Dump all datanodes
+ //
+ datanodeDump(out);
+
+ out.flush();
+ out.close();
} finally {
writeUnlock();
}
@@ -717,46 +728,46 @@
throws IOException {
readLock();
try {
- checkSuperuserPrivilege();
-
- DatanodeDescriptor node = getDatanode(datanode);
- if (node == null) {
- NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: "
- + "Asking for blocks from an unrecorded node " + datanode.getName());
- throw new IllegalArgumentException(
- "Unexpected exception. Got getBlocks message for datanode " +
- datanode.getName() + ", but there is no info for it");
- }
-
- int numBlocks = node.numBlocks();
- if(numBlocks == 0) {
- return new BlocksWithLocations(new BlockWithLocations[0]);
- }
- Iterator<BlockInfo> iter = node.getBlockIterator();
- int startBlock = r.nextInt(numBlocks); // starting from a random block
- // skip blocks
- for(int i=0; i<startBlock; i++) {
- iter.next();
- }
- List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
- long totalSize = 0;
- BlockInfo curBlock;
- while(totalSize<size && iter.hasNext()) {
- curBlock = iter.next();
- if(!curBlock.isComplete()) continue;
- totalSize += addBlock(curBlock, results);
- }
- if(totalSize<size) {
- iter = node.getBlockIterator(); // start from the beginning
- for(int i=0; i<startBlock&&totalSize<size; i++) {
+ checkSuperuserPrivilege();
+
+ DatanodeDescriptor node = getDatanode(datanode);
+ if (node == null) {
+ NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: "
+ + "Asking for blocks from an unrecorded node " + datanode.getName());
+ throw new IllegalArgumentException(
+ "Unexpected exception. Got getBlocks message for datanode " +
+ datanode.getName() + ", but there is no info for it");
+ }
+
+ int numBlocks = node.numBlocks();
+ if(numBlocks == 0) {
+ return new BlocksWithLocations(new BlockWithLocations[0]);
+ }
+ Iterator<BlockInfo> iter = node.getBlockIterator();
+ int startBlock = r.nextInt(numBlocks); // starting from a random block
+ // skip blocks
+ for(int i=0; i<startBlock; i++) {
+ iter.next();
+ }
+ List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
+ long totalSize = 0;
+ BlockInfo curBlock;
+ while(totalSize<size && iter.hasNext()) {
curBlock = iter.next();
if(!curBlock.isComplete()) continue;
totalSize += addBlock(curBlock, results);
}
- }
-
- return new BlocksWithLocations(
- results.toArray(new BlockWithLocations[results.size()]));
+ if(totalSize<size) {
+ iter = node.getBlockIterator(); // start from the beginning
+ for(int i=0; i<startBlock&&totalSize<size; i++) {
+ curBlock = iter.next();
+ if(!curBlock.isComplete()) continue;
+ totalSize += addBlock(curBlock, results);
+ }
+ }
+
+ return new BlocksWithLocations(
+ results.toArray(new BlockWithLocations[results.size()]));
} finally {
readUnlock();
}
@@ -777,6 +788,7 @@
* return the length of the added block; 0 if the block is not added
*/
private long addBlock(Block block, List<BlockWithLocations> results) {
+ assert hasReadOrWriteLock();
ArrayList<String> machineSet = blockManager.getValidLocations(block);
if(machineSet.size() == 0) {
return 0;
@@ -799,22 +811,26 @@
public void setPermission(String src, FsPermission permission)
throws AccessControlException, FileNotFoundException, SafeModeException,
UnresolvedLinkException, IOException {
+ HdfsFileStatus resultingStat = null;
writeLock();
try {
- if (isInSafeMode())
- throw new SafeModeException("Cannot set permission for " + src, safeMode);
- checkOwner(src);
- dir.setPermission(src, permission);
- getEditLog().logSync();
- if (auditLog.isInfoEnabled() && isExternalInvocation()) {
- final HdfsFileStatus stat = dir.getFileInfo(src, false);
- logAuditEvent(UserGroupInformation.getCurrentUser(),
- Server.getRemoteIp(),
- "setPermission", src, null, stat);
- }
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot set permission for " + src, safeMode);
+ }
+ checkOwner(src);
+ dir.setPermission(src, permission);
+ if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+ resultingStat = dir.getFileInfo(src, false);
+ }
} finally {
writeUnlock();
}
+ getEditLog().logSync();
+ if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+ logAuditEvent(UserGroupInformation.getCurrentUser(),
+ Server.getRemoteIp(),
+ "setPermission", src, null, resultingStat);
+ }
}
/**
@@ -824,31 +840,35 @@
public void setOwner(String src, String username, String group)
throws AccessControlException, FileNotFoundException, SafeModeException,
UnresolvedLinkException, IOException {
+ HdfsFileStatus resultingStat = null;
writeLock();
try {
- if (isInSafeMode())
+ if (isInSafeMode()) {
throw new SafeModeException("Cannot set owner for " + src, safeMode);
- FSPermissionChecker pc = checkOwner(src);
- if (!pc.isSuper) {
- if (username != null && !pc.user.equals(username)) {
- throw new AccessControlException("Non-super user cannot change owner.");
}
- if (group != null && !pc.containsGroup(group)) {
- throw new AccessControlException("User does not belong to " + group
+ FSPermissionChecker pc = checkOwner(src);
+ if (!pc.isSuper) {
+ if (username != null && !pc.user.equals(username)) {
+ throw new AccessControlException("Non-super user cannot change owner.");
+ }
+ if (group != null && !pc.containsGroup(group)) {
+ throw new AccessControlException("User does not belong to " + group
+ " .");
+ }
}
- }
- dir.setOwner(src, username, group);
- getEditLog().logSync();
- if (auditLog.isInfoEnabled() && isExternalInvocation()) {
- final HdfsFileStatus stat = dir.getFileInfo(src, false);
- logAuditEvent(UserGroupInformation.getCurrentUser(),
- Server.getRemoteIp(),
- "setOwner", src, null, stat);
- }
+ dir.setOwner(src, username, group);
+ if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+ resultingStat = dir.getFileInfo(src, false);
+ }
} finally {
writeUnlock();
}
+ getEditLog().logSync();
+ if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+ logAuditEvent(UserGroupInformation.getCurrentUser(),
+ Server.getRemoteIp(),
+ "setOwner", src, null, resultingStat);
+ }
}
/**
@@ -876,7 +896,7 @@
/**
* Get block locations within the specified range.
* @see ClientProtocol#getBlockLocations(String, long, long)
- * @throws FileNotFoundException
+ * @throws FileNotFoundException, UnresolvedLinkException, IOException
*/
LocatedBlocks getBlockLocations(String src, long offset, long length,
boolean doAccessTime, boolean needBlockToken) throws FileNotFoundException,
@@ -893,7 +913,7 @@
throw new HadoopIllegalArgumentException(
"Negative length is not supported. File: " + src);
}
- final LocatedBlocks ret = getBlockLocationsInternal(src,
+ final LocatedBlocks ret = getBlockLocationsUpdateTimes(src,
offset, length, doAccessTime, needBlockToken);
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(),
@@ -903,7 +923,11 @@
return ret;
}
- private LocatedBlocks getBlockLocationsInternal(String src,
+ /*
+ * Get block locations within the specified range, updating the
+ * access times if necessary.
+ */
+ private LocatedBlocks getBlockLocationsUpdateTimes(String src,
long offset,
long length,
boolean doAccessTime,
@@ -954,8 +978,7 @@
LocatedBlocks getBlockLocationsInternal(INodeFile inode,
long offset, long length, boolean needBlockToken)
throws IOException {
- readLock();
- try {
+ assert hasReadOrWriteLock();
final BlockInfo[] blocks = inode.getBlocks();
if (LOG.isDebugEnabled()) {
LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
@@ -987,9 +1010,6 @@
return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
lastBlock, last.isComplete());
}
- } finally {
- readUnlock();
- }
}
/** Create a LocatedBlock. */
@@ -1021,146 +1041,152 @@
* @throws IOException
*/
public void concat(String target, String [] srcs)
- throws IOException, UnresolvedLinkException {
+ throws IOException, UnresolvedLinkException {
if(FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) +
" to " + target);
}
- // check safe mode
- if (isInSafeMode()) {
- throw new SafeModeException("concat: cannot concat " + target, safeMode);
- }
// verify args
if(target.isEmpty()) {
- throw new IllegalArgumentException("concat: trg file name is empty");
+ throw new IllegalArgumentException("Target file name is empty");
}
if(srcs == null || srcs.length == 0) {
- throw new IllegalArgumentException("concat: srcs list is empty or null");
+ throw new IllegalArgumentException("No sources given");
}
- // currently we require all the files to be in the same dir
+ // We require all files be in the same directory
String trgParent =
target.substring(0, target.lastIndexOf(Path.SEPARATOR_CHAR));
- for(String s : srcs) {
+ for (String s : srcs) {
String srcParent = s.substring(0, s.lastIndexOf(Path.SEPARATOR_CHAR));
- if(! srcParent.equals(trgParent)) {
- throw new IllegalArgumentException
- ("concat: srcs and target shoould be in same dir");
+ if (!srcParent.equals(trgParent)) {
+ throw new IllegalArgumentException(
+ "Sources and target are not in the same directory");
}
}
-
+
+ HdfsFileStatus resultingStat = null;
writeLock();
try {
- // write permission for the target
- if (isPermissionEnabled) {
- checkPathAccess(target, FsAction.WRITE);
-
- // and srcs
- for(String aSrc: srcs) {
- checkPathAccess(aSrc, FsAction.READ); // read the file
- checkParentAccess(aSrc, FsAction.WRITE); // for delete
- }
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot concat " + target, safeMode);
}
-
-
- // to make sure no two files are the same
- Set<INode> si = new HashSet<INode>();
-
- // we put the following prerequisite for the operation
- // replication and blocks sizes should be the same for ALL the blocks
- // check the target
- INode inode = dir.getFileINode(target);
-
- if(inode == null) {
- throw new IllegalArgumentException("concat: trg file doesn't exist");
+ concatInternal(target, srcs);
+ if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+ resultingStat = dir.getFileInfo(target, false);
}
- if(inode.isUnderConstruction()) {
- throw new IllegalArgumentException("concat: trg file is uner construction");
- }
-
- INodeFile trgInode = (INodeFile) inode;
-
- // per design trg shouldn't be empty and all the blocks same size
- if(trgInode.blocks.length == 0) {
- throw new IllegalArgumentException("concat: "+ target + " file is empty");
- }
-
- long blockSize = trgInode.getPreferredBlockSize();
-
- // check the end block to be full
- if(blockSize != trgInode.blocks[trgInode.blocks.length-1].getNumBytes()) {
- throw new IllegalArgumentException(target + " blocks size should be the same");
- }
-
- si.add(trgInode);
- short repl = trgInode.getReplication();
-
- // now check the srcs
- boolean endSrc = false; // final src file doesn't have to have full end block
- for(int i=0; i<srcs.length; i++) {
- String src = srcs[i];
- if(i==srcs.length-1)
- endSrc=true;
-
- INodeFile srcInode = dir.getFileINode(src);
-
- if(src.isEmpty()
- || srcInode == null
- || srcInode.isUnderConstruction()
- || srcInode.blocks.length == 0) {
- throw new IllegalArgumentException("concat: file " + src +
- " is invalid or empty or underConstruction");
- }
-
- // check replication and blocks size
- if(repl != srcInode.getReplication()) {
- throw new IllegalArgumentException(src + " and " + target + " " +
- "should have same replication: "
- + repl + " vs. " + srcInode.getReplication());
- }
-
- //boolean endBlock=false;
- // verify that all the blocks are of the same length as target
- // should be enough to check the end blocks
- int idx = srcInode.blocks.length-1;
- if(endSrc)
- idx = srcInode.blocks.length-2; // end block of endSrc is OK not to be full
- if(idx >= 0 && srcInode.blocks[idx].getNumBytes() != blockSize) {
- throw new IllegalArgumentException("concat: blocks sizes of " +
- src + " and " + target + " should all be the same");
- }
-
- si.add(srcInode);
- }
-
- // make sure no two files are the same
- if(si.size() < srcs.length+1) { // trg + srcs
- // it means at least two files are the same
- throw new IllegalArgumentException("at least two files are the same");
- }
-
- if(NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " +
- Arrays.toString(srcs) + " to " + target);
- }
-
- dir.concatInternal(target,srcs);
} finally {
writeUnlock();
}
getEditLog().logSync();
-
-
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
- final HdfsFileStatus stat = dir.getFileInfo(target, false);
logAuditEvent(UserGroupInformation.getLoginUser(),
Server.getRemoteIp(),
- "concat", Arrays.toString(srcs), target, stat);
+ "concat", Arrays.toString(srcs), target, resultingStat);
}
-
}
+ /** See {@link #concat(String, String[])} */
+ public void concatInternal(String target, String [] srcs)
+ throws IOException, UnresolvedLinkException {
+ assert hasWriteLock();
+
+ // write permission for the target
+ if (isPermissionEnabled) {
+ checkPathAccess(target, FsAction.WRITE);
+
+ // and srcs
+ for(String aSrc: srcs) {
+ checkPathAccess(aSrc, FsAction.READ); // read the file
+ checkParentAccess(aSrc, FsAction.WRITE); // for delete
+ }
+ }
+
+ // to make sure no two files are the same
+ Set<INode> si = new HashSet<INode>();
+
+ // we put the following prerequisite for the operation
+ // replication and blocks sizes should be the same for ALL the blocks
+ // check the target
+ INode inode = dir.getFileINode(target);
+
+ if(inode == null) {
+ throw new IllegalArgumentException("concat: trg file doesn't exist");
+ }
+ if(inode.isUnderConstruction()) {
+ throw new IllegalArgumentException("concat: trg file is uner construction");
+ }
+
+ INodeFile trgInode = (INodeFile) inode;
+
+ // per design trg shouldn't be empty and all the blocks same size
+ if(trgInode.blocks.length == 0) {
+ throw new IllegalArgumentException("concat: "+ target + " file is empty");
+ }
+
+ long blockSize = trgInode.getPreferredBlockSize();
+
+ // check the end block to be full
+ if(blockSize != trgInode.blocks[trgInode.blocks.length-1].getNumBytes()) {
+ throw new IllegalArgumentException(target + " blocks size should be the same");
+ }
+
+ si.add(trgInode);
+ short repl = trgInode.getReplication();
+
+ // now check the srcs
+ boolean endSrc = false; // final src file doesn't have to have full end block
+ for(int i=0; i<srcs.length; i++) {
+ String src = srcs[i];
+ if(i==srcs.length-1)
+ endSrc=true;
+
+ INodeFile srcInode = dir.getFileINode(src);
+
+ if(src.isEmpty()
+ || srcInode == null
+ || srcInode.isUnderConstruction()
+ || srcInode.blocks.length == 0) {
+ throw new IllegalArgumentException("concat: file " + src +
+ " is invalid or empty or underConstruction");
+ }
+
+ // check replication and blocks size
+ if(repl != srcInode.getReplication()) {
+ throw new IllegalArgumentException(src + " and " + target + " " +
+ "should have same replication: "
+ + repl + " vs. " + srcInode.getReplication());
+ }
+
+ //boolean endBlock=false;
+ // verify that all the blocks are of the same length as target
+ // should be enough to check the end blocks
+ int idx = srcInode.blocks.length-1;
+ if(endSrc)
+ idx = srcInode.blocks.length-2; // end block of endSrc is OK not to be full
+ if(idx >= 0 && srcInode.blocks[idx].getNumBytes() != blockSize) {
+ throw new IllegalArgumentException("concat: blocks sizes of " +
+ src + " and " + target + " should all be the same");
+ }
+
+ si.add(srcInode);
+ }
+
+ // make sure no two files are the same
+ if(si.size() < srcs.length+1) { // trg + srcs
+ // it means at least two files are the same
+ throw new IllegalArgumentException("at least two files are the same");
+ }
+
+ if(NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " +
+ Arrays.toString(srcs) + " to " + target);
+ }
+
+ dir.concat(target,srcs);
+ }
+
/**
* stores the modification and access time for this inode.
* The access time is precise upto an hour. The transaction, if needed, is
@@ -1174,23 +1200,22 @@
}
writeLock();
try {
- //
- // The caller needs to have write access to set access & modification times.
- if (isPermissionEnabled) {
- checkPathAccess(src, FsAction.WRITE);
- }
- INodeFile inode = dir.getFileINode(src);
- if (inode != null) {
- dir.setTimes(src, inode, mtime, atime, true);
- if (auditLog.isInfoEnabled() && isExternalInvocation()) {
- final HdfsFileStatus stat = dir.getFileInfo(src, false);
- logAuditEvent(UserGroupInformation.getCurrentUser(),
- Server.getRemoteIp(),
- "setTimes", src, null, stat);
+ // Write access is required to set access and modification times
+ if (isPermissionEnabled) {
+ checkPathAccess(src, FsAction.WRITE);
}
- } else {
- throw new FileNotFoundException("File " + src + " does not exist.");
- }
+ INodeFile inode = dir.getFileINode(src);
+ if (inode != null) {
+ dir.setTimes(src, inode, mtime, atime, true);
+ if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+ final HdfsFileStatus stat = dir.getFileInfo(src, false);
+ logAuditEvent(UserGroupInformation.getCurrentUser(),
+ Server.getRemoteIp(),
+ "setTimes", src, null, stat);
+ }
+ } else {
+ throw new FileNotFoundException("File " + src + " does not exist.");
+ }
} finally {
writeUnlock();
}
@@ -1202,21 +1227,24 @@
public void createSymlink(String target, String link,
PermissionStatus dirPerms, boolean createParent)
throws IOException, UnresolvedLinkException {
+ HdfsFileStatus resultingStat = null;
writeLock();
try {
- if (!createParent) {
- verifyParentDir(link);
- }
- createSymlinkInternal(target, link, dirPerms, createParent);
+ if (!createParent) {
+ verifyParentDir(link);
+ }
+ createSymlinkInternal(target, link, dirPerms, createParent);
+ if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+ resultingStat = dir.getFileInfo(link, false);
+ }
} finally {
writeUnlock();
}
getEditLog().logSync();
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
- final HdfsFileStatus stat = dir.getFileInfo(link, false);
logAuditEvent(UserGroupInformation.getCurrentUser(),
Server.getRemoteIp(),
- "createSymlink", link, target, stat);
+ "createSymlink", link, target, resultingStat);
}
}
@@ -1226,13 +1254,11 @@
private void createSymlinkInternal(String target, String link,
PermissionStatus dirPerms, boolean createParent)
throws IOException, UnresolvedLinkException {
- writeLock();
- try {
+ assert hasWriteLock();
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.createSymlink: target=" +
target + " link=" + link);
}
-
if (isInSafeMode()) {
throw new SafeModeException("Cannot create symlink " + link, safeMode);
}
@@ -1251,9 +1277,6 @@
// add symbolic link to namespace
dir.addSymlink(link, target, dirPerms, createParent);
- } finally {
- writeUnlock();
- }
}
/**
@@ -1271,7 +1294,16 @@
*/
public boolean setReplication(String src, short replication)
throws IOException, UnresolvedLinkException {
- boolean status = setReplicationInternal(src, replication);
+ boolean status = false;
+ writeLock();
+ try {
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot set replication for " + src, safeMode);
+ }
+ status = setReplicationInternal(src, replication);
+ } finally {
+ writeUnlock();
+ }
getEditLog().logSync();
if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(),
@@ -1284,10 +1316,7 @@
private boolean setReplicationInternal(String src,
short replication) throws AccessControlException, QuotaExceededException,
SafeModeException, UnresolvedLinkException, IOException {
- writeLock();
- try {
- if (isInSafeMode())
- throw new SafeModeException("Cannot set replication for " + src, safeMode);
+ assert hasWriteLock();
blockManager.verifyReplication(src, replication, null);
if (isPermissionEnabled) {
checkPathAccess(src, FsAction.WRITE);
@@ -1317,17 +1346,19 @@
+ ". New replication is " + replication);
}
return true;
- } finally {
- writeUnlock();
- }
}
long getPreferredBlockSize(String filename)
- throws IOException, UnresolvedLinkException {
- if (isPermissionEnabled) {
- checkTraverse(filename);
+ throws IOException, UnresolvedLinkException {
+ readLock();
+ try {
+ if (isPermissionEnabled) {
+ checkTraverse(filename);
+ }
+ return dir.getPreferredBlockSize(filename);
+ } finally {
+ readUnlock();
}
- return dir.getPreferredBlockSize(filename);
}
/*
@@ -1335,6 +1366,7 @@
*/
private void verifyParentDir(String src) throws FileNotFoundException,
ParentNotDirectoryException, UnresolvedLinkException {
+ assert hasReadOrWriteLock();
Path parent = new Path(src).getParent();
if (parent != null) {
INode[] pathINodes = dir.getExistingPathINodes(parent.toString());
@@ -1360,8 +1392,13 @@
short replication, long blockSize) throws AccessControlException,
SafeModeException, FileAlreadyExistsException, UnresolvedLinkException,
FileNotFoundException, ParentNotDirectoryException, IOException {
- startFileInternal(src, permissions, holder, clientMachine, flag,
- createParent, replication, blockSize);
+ writeLock();
+ try {
+ startFileInternal(src, permissions, holder, clientMachine, flag,
+ createParent, replication, blockSize);
+ } finally {
+ writeUnlock();
+ }
getEditLog().logSync();
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
final HdfsFileStatus stat = dir.getFileInfo(src, false);
@@ -1393,6 +1430,7 @@
long blockSize) throws SafeModeException, FileAlreadyExistsException,
AccessControlException, UnresolvedLinkException, FileNotFoundException,
ParentNotDirectoryException, IOException {
+ assert hasWriteLock();
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: src=" + src
+ ", holder=" + holder
@@ -1401,10 +1439,9 @@
+ ", replication=" + replication
+ ", createFlag=" + flag.toString());
}
- writeLock();
- try {
- if (isInSafeMode())
+ if (isInSafeMode()) {
throw new SafeModeException("Cannot create file" + src, safeMode);
+ }
if (!DFSUtil.isValidName(src)) {
throw new InvalidPathException(src);
}
@@ -1512,9 +1549,6 @@
+ie.getMessage());
throw ie;
}
- } finally {
- writeUnlock();
- }
return null;
}
@@ -1529,35 +1563,41 @@
* @return true if the file is already closed
* @throws IOException
*/
- synchronized boolean recoverLease(String src, String holder, String clientMachine)
- throws IOException {
- if (isInSafeMode()) {
- throw new SafeModeException(
- "Cannot recover the lease of " + src, safeMode);
+ boolean recoverLease(String src, String holder, String clientMachine)
+ throws IOException {
+ writeLock();
+ try {
+ if (isInSafeMode()) {
+ throw new SafeModeException(
+ "Cannot recover the lease of " + src, safeMode);
+ }
+ if (!DFSUtil.isValidName(src)) {
+ throw new IOException("Invalid file name: " + src);
+ }
+
+ INode inode = dir.getFileINode(src);
+ if (inode == null) {
+ throw new FileNotFoundException("File not found " + src);
+ }
+
+ if (!inode.isUnderConstruction()) {
+ return true;
+ }
+ if (isPermissionEnabled) {
+ checkPathAccess(src, FsAction.WRITE);
+ }
+
+ recoverLeaseInternal(inode, src, holder, clientMachine, true);
+ } finally {
+ writeUnlock();
}
- if (!DFSUtil.isValidName(src)) {
- throw new IOException("Invalid file name: " + src);
- }
-
- INode inode = dir.getFileINode(src);
- if (inode == null) {
- throw new FileNotFoundException("File not found " + src);
- }
-
- if (!inode.isUnderConstruction()) {
- return true;
- }
- if (isPermissionEnabled) {
- checkPathAccess(src, FsAction.WRITE);
- }
-
- recoverLeaseInternal(inode, src, holder, clientMachine, true);
return false;
}
private void recoverLeaseInternal(INode fileInode,
String src, String holder, String clientMachine, boolean force)
- throws IOException {
+ throws IOException {
+ assert hasWriteLock();
if (fileInode != null && fileInode.isUnderConstruction()) {
INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) fileInode;
//
@@ -1643,12 +1683,16 @@
throw new UnsupportedOperationException("Append to hdfs not supported." +
" Please refer to dfs.support.append configuration parameter.");
}
- LocatedBlock lb =
- startFileInternal(src, null, holder, clientMachine,
+ LocatedBlock lb = null;
+ writeLock();
+ try {
+ lb = startFileInternal(src, null, holder, clientMachine,
EnumSet.of(CreateFlag.APPEND),
false, (short)blockManager.maxReplication, (long)0);
+ } finally {
+ writeUnlock();
+ }
getEditLog().logSync();
-
if (lb != null) {
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: file "
@@ -1657,7 +1701,6 @@
+" block size " + lb.getBlock().getNumBytes());
}
}
-
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(),
Server.getRemoteIp(),
@@ -1749,6 +1792,9 @@
// Allocate a new block and record it in the INode.
writeLock();
try {
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot add block to " + src, safeMode);
+ }
INode[] pathINodes = dir.getExistingPathINodes(src);
int inodesLen = pathINodes.length;
checkLease(src, clientName, pathINodes[inodesLen-1]);
@@ -1768,7 +1814,7 @@
} finally {
writeUnlock();
}
-
+
// Create next block
LocatedBlock b = new LocatedBlock(getExtendedBlock(newBlock), targets, fileLength);
if (isBlockTokenEnabled) {
@@ -1788,6 +1834,7 @@
final DatanodeDescriptor clientnode;
final long preferredblocksize;
+ final List<DatanodeDescriptor> chosen;
readLock();
try {
//check safe mode
@@ -1800,19 +1847,19 @@
final INodeFileUnderConstruction file = checkLease(src, clientName);
clientnode = file.getClientNode();
preferredblocksize = file.getPreferredBlockSize();
+
+ //find datanode descriptors
+ chosen = new ArrayList<DatanodeDescriptor>();
+ for(DatanodeInfo d : existings) {
+ final DatanodeDescriptor descriptor = getDatanode(d);
+ if (descriptor != null) {
+ chosen.add(descriptor);
+ }
+ }
} finally {
readUnlock();
}
- //find datanode descriptors
- final List<DatanodeDescriptor> chosen = new ArrayList<DatanodeDescriptor>();
- for(DatanodeInfo d : existings) {
- final DatanodeDescriptor descriptor = getDatanode(d);
- if (descriptor != null) {
- chosen.add(descriptor);
- }
- }
-
// choose new datanodes.
final DatanodeInfo[] targets = blockManager.replicator.chooseTarget(
src, numAdditionalNodes, clientnode, chosen, true,
@@ -1833,21 +1880,24 @@
UnresolvedLinkException, IOException {
writeLock();
try {
- //
- // Remove the block from the pending creates list
- //
- if(NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
- +b+"of file "+src);
- }
- INodeFileUnderConstruction file = checkLease(src, holder);
- dir.removeBlock(src, file, ExtendedBlock.getLocalBlock(b));
- if(NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
- + b
- + " is removed from pendingCreates");
- }
- return true;
+ //
+ // Remove the block from the pending creates list
+ //
+ if(NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
+ +b+"of file "+src);
+ }
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot abandon block " + b +
+ " for fle" + src, safeMode);
+ }
+ INodeFileUnderConstruction file = checkLease(src, holder);
+ dir.removeBlock(src, file, ExtendedBlock.getLocalBlock(b));
+ if(NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
+ + b + " is removed from pendingCreates");
+ }
+ return true;
} finally {
writeUnlock();
}
@@ -1855,7 +1905,8 @@
// make sure that we still have the lease on this file.
private INodeFileUnderConstruction checkLease(String src, String holder)
- throws LeaseExpiredException, UnresolvedLinkException {
+ throws LeaseExpiredException, UnresolvedLinkException {
+ assert hasReadOrWriteLock();
INodeFile file = dir.getFileINode(src);
checkLease(src, holder, file);
return (INodeFileUnderConstruction)file;
@@ -1863,7 +1914,7 @@
private void checkLease(String src, String holder, INode file)
throws LeaseExpiredException {
-
+ assert hasReadOrWriteLock();
if (file == null || file.isDirectory()) {
Lease lease = leaseManager.getLease(holder);
throw new LeaseExpiredException("No lease on " + src +
@@ -1896,23 +1947,29 @@
public boolean completeFile(String src, String holder, ExtendedBlock last)
throws SafeModeException, UnresolvedLinkException, IOException {
checkBlock(last);
- boolean success = completeFileInternal(src, holder,
+ boolean success = false;
+ writeLock();
+ try {
+ success = completeFileInternal(src, holder,
ExtendedBlock.getLocalBlock(last));
+ } finally {
+ writeUnlock();
+ }
getEditLog().logSync();
- return success ;
+ return success;
}
private boolean completeFileInternal(String src,
String holder, Block last) throws SafeModeException,
UnresolvedLinkException, IOException {
- writeLock();
- try {
- if(NameNode.stateChangeLog.isDebugEnabled()) {
+ assert hasWriteLock();
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " +
src + " for " + holder);
}
- if (isInSafeMode())
+ if (isInSafeMode()) {
throw new SafeModeException("Cannot complete file " + src, safeMode);
+ }
INodeFileUnderConstruction pendingFile = checkLease(src, holder);
// commit the last block and complete it if it has minimum replicas
@@ -1927,9 +1984,6 @@
NameNode.stateChangeLog.info("DIR* NameSystem.completeFile: file " + src
+ " is closed by " + holder);
return true;
- } finally {
- writeUnlock();
- }
}
/**
@@ -1958,6 +2012,7 @@
*/
private Block allocateBlock(String src, INode[] inodes,
DatanodeDescriptor targets[]) throws QuotaExceededException {
+ assert hasWriteLock();
Block b = new Block(FSNamesystem.randBlockId.nextLong(), 0, 0);
while(isValidBlock(b)) {
b.setBlockId(FSNamesystem.randBlockId.nextLong());
@@ -1975,35 +2030,35 @@
* all blocks, otherwise check only penultimate block.
*/
boolean checkFileProgress(INodeFile v, boolean checkall) {
- writeLock();
+ readLock();
try {
- if (checkall) {
- //
- // check all blocks of the file.
- //
- for (BlockInfo block: v.getBlocks()) {
- if (!block.isComplete()) {
+ if (checkall) {
+ //
+ // check all blocks of the file.
+ //
+ for (BlockInfo block: v.getBlocks()) {
+ if (!block.isComplete()) {
+ LOG.info("BLOCK* NameSystem.checkFileProgress: "
+ + "block " + block + " has not reached minimal replication "
+ + blockManager.minReplication);
+ return false;
+ }
+ }
+ } else {
+ //
+ // check the penultimate block of this file
+ //
+ BlockInfo b = v.getPenultimateBlock();
+ if (b != null && !b.isComplete()) {
LOG.info("BLOCK* NameSystem.checkFileProgress: "
- + "block " + block + " has not reached minimal replication "
+ + "block " + b + " has not reached minimal replication "
+ blockManager.minReplication);
return false;
}
}
- } else {
- //
- // check the penultimate block of this file
- //
- BlockInfo b = v.getPenultimateBlock();
- if (b != null && !b.isComplete()) {
- LOG.info("BLOCK* NameSystem.checkFileProgress: "
- + "block " + b + " has not reached minimal replication "
- + blockManager.minReplication);
- return false;
- }
- }
- return true;
+ return true;
} finally {
- writeUnlock();
+ readUnlock();
}
}
@@ -2042,13 +2097,26 @@
@Deprecated
boolean renameTo(String src, String dst)
throws IOException, UnresolvedLinkException {
- boolean status = renameToInternal(src, dst);
+ boolean status = false;
+ HdfsFileStatus resultingStat = null;
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
+ " to " + dst);
+ }
+ writeLock();
+ try {
+ status = renameToInternal(src, dst);
+ if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
+ resultingStat = dir.getFileInfo(dst, false);
+ }
+ } finally {
+ writeUnlock();
+ }
getEditLog().logSync();
if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
- final HdfsFileStatus stat = dir.getFileInfo(dst, false);
logAuditEvent(UserGroupInformation.getCurrentUser(),
Server.getRemoteIp(),
- "rename", src, dst, stat);
+ "rename", src, dst, resultingStat);
}
return status;
}
@@ -2057,19 +2125,13 @@
@Deprecated
private boolean renameToInternal(String src, String dst)
throws IOException, UnresolvedLinkException {
-
- writeLock();
- try {
- if(NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
- " to " + dst);
- }
- if (isInSafeMode())
+ assert hasWriteLock();
+ if (isInSafeMode()) {
throw new SafeModeException("Cannot rename " + src, safeMode);
+ }
if (!DFSUtil.isValidName(dst)) {
throw new IOException("Invalid name: " + dst);
}
-
if (isPermissionEnabled) {
//We should not be doing this. This is move() not renameTo().
//but for now,
@@ -2081,40 +2143,44 @@
HdfsFileStatus dinfo = dir.getFileInfo(dst, false);
if (dir.renameTo(src, dst)) {
- changeLease(src, dst, dinfo); // update lease with new filename
+ unprotectedChangeLease(src, dst, dinfo); // update lease with new filename
return true;
}
return false;
- } finally {
- writeUnlock();
- }
}
/** Rename src to dst */
void renameTo(String src, String dst, Options.Rename... options)
throws IOException, UnresolvedLinkException {
- renameToInternal(src, dst, options);
+ HdfsFileStatus resultingStat = null;
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
+ + src + " to " + dst);
+ }
+ writeLock();
+ try {
+ renameToInternal(src, dst, options);
+ if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+ resultingStat = dir.getFileInfo(dst, false);
+ }
+ } finally {
+ writeUnlock();
+ }
getEditLog().logSync();
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
StringBuilder cmd = new StringBuilder("rename options=");
for (Rename option : options) {
cmd.append(option.value()).append(" ");
}
- final HdfsFileStatus stat = dir.getFileInfo(dst, false);
logAuditEvent(UserGroupInformation.getCurrentUser(), Server.getRemoteIp(),
- cmd.toString(), src, dst, stat);
+ cmd.toString(), src, dst, resultingStat);
}
}
private void renameToInternal(String src, String dst,
Options.Rename... options) throws IOException {
- writeLock();
- try {
- if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
- + src + " to " + dst);
- }
+ assert hasWriteLock();
if (isInSafeMode()) {
throw new SafeModeException("Cannot rename " + src, safeMode);
}
@@ -2128,10 +2194,7 @@
HdfsFileStatus dinfo = dir.getFileInfo(dst, false);
dir.renameTo(src, dst, options);
- changeLease(src, dst, dinfo); // update lease with new filename
- } finally {
- writeUnlock();
- }
+ unprotectedChangeLease(src, dst, dinfo); // update lease with new filename
}
/**
@@ -2141,15 +2204,12 @@
* description of exceptions
*/
public boolean delete(String src, boolean recursive)
- throws AccessControlException, SafeModeException,
- UnresolvedLinkException, IOException {
- if ((!recursive) && (!dir.isDirEmpty(src))) {
- throw new IOException(src + " is non empty");
- }
+ throws AccessControlException, SafeModeException,
+ UnresolvedLinkException, IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
}
- boolean status = deleteInternal(src, true);
+ boolean status = deleteInternal(src, recursive, true);
if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(),
Server.getRemoteIp(),
@@ -2169,9 +2229,10 @@
*
* @see ClientProtocol#delete(String, boolean) for description of exceptions
*/
- private boolean deleteInternal(String src, boolean enforcePermission)
- throws AccessControlException, SafeModeException,
- UnresolvedLinkException, IOException{
+ private boolean deleteInternal(String src, boolean recursive,
+ boolean enforcePermission)
+ throws AccessControlException, SafeModeException, UnresolvedLinkException,
+ IOException {
boolean deleteNow = false;
ArrayList<Block> collectedBlocks = new ArrayList<Block>();
@@ -2180,6 +2241,9 @@
if (isInSafeMode()) {
throw new SafeModeException("Cannot delete " + src, safeMode);
}
+ if (!recursive && !dir.isDirEmpty(src)) {
+ throw new IOException(src + " is non empty");
+ }
if (enforcePermission && isPermissionEnabled) {
checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL);
}
@@ -2194,10 +2258,16 @@
} finally {
writeUnlock();
}
- // Log directory deletion to editlog
+
getEditLog().logSync();
- if (!deleteNow) {
- removeBlocks(collectedBlocks); // Incremental deletion of blocks
+
+ writeLock();
+ try {
+ if (!deleteNow) {
+ removeBlocks(collectedBlocks); // Incremental deletion of blocks
+ }
+ } finally {
+ writeUnlock();
}
collectedBlocks.clear();
if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -2209,24 +2279,21 @@
/** From the given list, incrementally remove the blocks from blockManager */
private void removeBlocks(List<Block> blocks) {
+ assert hasWriteLock();
int start = 0;
int end = 0;
while (start < blocks.size()) {
end = BLOCK_DELETION_INCREMENT + start;
end = end > blocks.size() ? blocks.size() : end;
- writeLock();
- try {
- for (int i=start; i<end; i++) {
- blockManager.removeBlock(blocks.get(i));
- }
- } finally {
- writeUnlock();
+ for (int i=start; i<end; i++) {
+ blockManager.removeBlock(blocks.get(i));
}
start = end;
}
}
void removePathAndBlocks(String src, List<Block> blocks) {
+ assert hasWriteLock();
leaseManager.removeLeaseWithPrefixPath(src);
if (blocks == null) {
return;
@@ -2249,13 +2316,18 @@
*/
HdfsFileStatus getFileInfo(String src, boolean resolveLink)
throws AccessControlException, UnresolvedLinkException {
- if (!DFSUtil.isValidName(src)) {
- throw new InvalidPathException("Invalid file name: " + src);
+ readLock();
+ try {
+ if (!DFSUtil.isValidName(src)) {
+ throw new InvalidPathException("Invalid file name: " + src);
+ }
+ if (isPermissionEnabled) {
+ checkTraverse(src);
+ }
+ return dir.getFileInfo(src, resolveLink);
+ } finally {
+ readUnlock();
}
- if (isPermissionEnabled) {
- checkTraverse(src);
- }
- return dir.getFileInfo(src, resolveLink);
}
/**
@@ -2263,7 +2335,16 @@
*/
public boolean mkdirs(String src, PermissionStatus permissions,
boolean createParent) throws IOException, UnresolvedLinkException {
- boolean status = mkdirsInternal(src, permissions, createParent);
+ boolean status = false;
+ if(NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
+ }
+ writeLock();
+ try {
+ status = mkdirsInternal(src, permissions, createParent);
+ } finally {
+ writeUnlock();
+ }
getEditLog().logSync();
if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
final HdfsFileStatus stat = dir.getFileInfo(src, false);
@@ -2280,10 +2361,9 @@
private boolean mkdirsInternal(String src,
PermissionStatus permissions, boolean createParent)
throws IOException, UnresolvedLinkException {
- writeLock();
- try {
- if(NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
+ assert hasWriteLock();
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot create directory " + src, safeMode);
}
if (isPermissionEnabled) {
checkTraverse(src);
@@ -2293,15 +2373,12 @@
// a new directory is not created.
return true;
}
- if (isInSafeMode())
- throw new SafeModeException("Cannot create directory " + src, safeMode);
if (!DFSUtil.isValidName(src)) {
throw new InvalidPathException(src);
}
if (isPermissionEnabled) {
checkAncestorAccess(src, FsAction.WRITE);
}
-
if (!createParent) {
verifyParentDir(src);
}
@@ -2315,17 +2392,19 @@
throw new IOException("Failed to create directory: " + src);
}
return true;
- } finally {
- writeUnlock();
- }
}
ContentSummary getContentSummary(String src) throws AccessControlException,
FileNotFoundException, UnresolvedLinkException {
- if (isPermissionEnabled) {
- checkPermission(src, false, null, null, null, FsAction.READ_EXECUTE);
+ readLock();
+ try {
+ if (isPermissionEnabled) {
+ checkPermission(src, false, null, null, null, FsAction.READ_EXECUTE);
+ }
+ return dir.getContentSummary(src);
+ } finally {
+ readUnlock();
}
- return dir.getContentSummary(src);
}
/**
@@ -2335,12 +2414,18 @@
*/
void setQuota(String path, long nsQuota, long dsQuota)
throws IOException, UnresolvedLinkException {
- if (isInSafeMode())
- throw new SafeModeException("Cannot set quota on " + path, safeMode);
- if (isPermissionEnabled) {
- checkSuperuserPrivilege();
+ writeLock();
+ try {
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot set quota on " + path, safeMode);
+ }
+ if (isPermissionEnabled) {
+ checkSuperuserPrivilege();
+ }
+ dir.setQuota(path, nsQuota, dsQuota);
+ } finally {
+ writeUnlock();
}
- dir.setQuota(path, nsQuota, dsQuota);
getEditLog().logSync();
}
@@ -2351,7 +2436,6 @@
*/
void fsync(String src, String clientName)
throws IOException, UnresolvedLinkException {
-
NameNode.stateChangeLog.info("BLOCK* NameSystem.fsync: file "
+ src + " for " + clientName);
writeLock();
@@ -2364,6 +2448,7 @@
} finally {
writeUnlock();
}
+ getEditLog().logSync();
}
/**
@@ -2383,9 +2468,8 @@
String recoveryLeaseHolder) throws AlreadyBeingCreatedException,
IOException, UnresolvedLinkException {
LOG.info("Recovering lease=" + lease + ", src=" + src);
-
assert !isInSafeMode();
-
+ assert hasWriteLock();
INodeFile iFile = dir.getFileINode(src);
if (iFile == null) {
final String message = "DIR* NameSystem.internalReleaseLease: "
@@ -2507,6 +2591,7 @@
Lease reassignLease(Lease lease, String src, String newHolder,
INodeFileUnderConstruction pendingFile) throws IOException {
+ assert hasWriteLock();
if(newHolder == null)
return lease;
logReassignLease(lease.getHolder(), src, newHolder);
@@ -2515,6 +2600,7 @@
Lease reassignLeaseInternal(Lease lease, String src, String newHolder,
INodeFileUnderConstruction pendingFile) throws IOException {
+ assert hasWriteLock();
pendingFile.setClientName(newHolder);
return leaseManager.reassignLease(lease, src, newHolder);
}
@@ -2523,7 +2609,7 @@
private void finalizeINodeFileUnderConstruction(String src,
INodeFileUnderConstruction pendingFile)
throws IOException, UnresolvedLinkException {
-
+ assert hasWriteLock();
leaseManager.removeLease(pendingFile.getClientName(), src);
// The file is no longer pending.
@@ -2544,92 +2630,96 @@
String src = "";
writeLock();
try {
- LOG.info("commitBlockSynchronization(lastblock=" + lastblock
- + ", newgenerationstamp=" + newgenerationstamp
- + ", newlength=" + newlength
- + ", newtargets=" + Arrays.asList(newtargets)
- + ", closeFile=" + closeFile
- + ", deleteBlock=" + deleteblock
- + ")");
- final BlockInfo storedBlock = blockManager.getStoredBlock(ExtendedBlock
+ if (isInSafeMode()) {
+ throw new SafeModeException(
+ "Cannot commitBlockSynchronization while in safe mode",
+ safeMode);
+ }
+ LOG.info("commitBlockSynchronization(lastblock=" + lastblock
+ + ", newgenerationstamp=" + newgenerationstamp
+ + ", newlength=" + newlength
+ + ", newtargets=" + Arrays.asList(newtargets)
+ + ", closeFile=" + closeFile
+ + ", deleteBlock=" + deleteblock
+ + ")");
+ final BlockInfo storedBlock = blockManager.getStoredBlock(ExtendedBlock
.getLocalBlock(lastblock));
- if (storedBlock == null) {
- throw new IOException("Block (=" + lastblock + ") not found");
- }
- INodeFile iFile = storedBlock.getINode();
- if (!iFile.isUnderConstruction() || storedBlock.isComplete()) {
- throw new IOException("Unexpected block (=" + lastblock
- + ") since the file (=" + iFile.getLocalName()
- + ") is not under construction");
- }
-
- long recoveryId =
- ((BlockInfoUnderConstruction)storedBlock).getBlockRecoveryId();
- if(recoveryId != newgenerationstamp) {
- throw new IOException("The recovery id " + newgenerationstamp
- + " does not match current recovery id "
- + recoveryId + " for block " + lastblock);
- }
-
- INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
-
- if (deleteblock) {
- pendingFile.removeLastBlock(ExtendedBlock.getLocalBlock(lastblock));
- blockManager.removeBlockFromMap(storedBlock);
- }
- else {
- // update last block
- storedBlock.setGenerationStamp(newgenerationstamp);
- storedBlock.setNumBytes(newlength);
-
- // find the DatanodeDescriptor objects
- // There should be no locations in the blockManager till now because the
- // file is underConstruction
- DatanodeDescriptor[] descriptors = null;
- if (newtargets.length > 0) {
- descriptors = new DatanodeDescriptor[newtargets.length];
- for(int i = 0; i < newtargets.length; i++) {
- descriptors[i] = getDatanode(newtargets[i]);
- }
+ if (storedBlock == null) {
+ throw new IOException("Block (=" + lastblock + ") not found");
}
+ INodeFile iFile = storedBlock.getINode();
+ if (!iFile.isUnderConstruction() || storedBlock.isComplete()) {
+ throw new IOException("Unexpected block (=" + lastblock
+ + ") since the file (=" + iFile.getLocalName()
+ + ") is not under construction");
+ }
+
+ long recoveryId =
+ ((BlockInfoUnderConstruction)storedBlock).getBlockRecoveryId();
+ if(recoveryId != newgenerationstamp) {
+ throw new IOException("The recovery id " + newgenerationstamp
+ + " does not match current recovery id "
+ + recoveryId + " for block " + lastblock);
+ }
+
+ INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
+
+ if (deleteblock) {
+ pendingFile.removeLastBlock(ExtendedBlock.getLocalBlock(lastblock));
+ blockManager.removeBlockFromMap(storedBlock);
+ }
+ else {
+ // update last block
+ storedBlock.setGenerationStamp(newgenerationstamp);
+ storedBlock.setNumBytes(newlength);
+
+ // find the DatanodeDescriptor objects
+ // There should be no locations in the blockManager till now because the
+ // file is underConstruction
+ DatanodeDescriptor[] descriptors = null;
+ if (newtargets.length > 0) {
+ descriptors = new DatanodeDescriptor[newtargets.length];
+ for(int i = 0; i < newtargets.length; i++) {
+ descriptors[i] = getDatanode(newtargets[i]);
+ }
+ }
+ if (closeFile) {
+ // the file is getting closed. Insert block locations into blockManager.
+ // Otherwise fsck will report these blocks as MISSING, especially if the
+ // blocksReceived from Datanodes take a long time to arrive.
+ for (int i = 0; i < descriptors.length; i++) {
+ descriptors[i].addBlock(storedBlock);
+ }
+ }
+ // add pipeline locations into the INodeUnderConstruction
+ pendingFile.setLastBlock(storedBlock, descriptors);
+ }
+
+ src = leaseManager.findPath(pendingFile);
if (closeFile) {
- // the file is getting closed. Insert block locations into blockManager.
- // Otherwise fsck will report these blocks as MISSING, especially if the
- // blocksReceived from Datanodes take a long time to arrive.
- for (int i = 0; i < descriptors.length; i++) {
- descriptors[i].addBlock(storedBlock);
- }
- }
- // add pipeline locations into the INodeUnderConstruction
- pendingFile.setLastBlock(storedBlock, descriptors);
- }
+ // commit the last block and complete it if it has minimum replicas
+ blockManager.commitOrCompleteLastBlock(pendingFile, storedBlock);
- // If this commit does not want to close the file, persist
- // blocks only if append is supported and return
- src = leaseManager.findPath(pendingFile);
- if (!closeFile) {
- if (supportAppends) {
+ //remove lease, close file
+ finalizeINodeFileUnderConstruction(src, pendingFile);
+ } else if (supportAppends) {
+ // If this commit does not want to close the file, persist
+ // blocks only if append is supported
dir.persistBlocks(src, pendingFile);
- getEditLog().logSync();
}
- LOG.info("commitBlockSynchronization(" + lastblock + ") successful");
- return;
- }
-
- // commit the last block and complete it if it has minimum replicas
- blockManager.commitOrCompleteLastBlock(pendingFile, storedBlock);
-
- //remove lease, close file
- finalizeINodeFileUnderConstruction(src, pendingFile);
} finally {
writeUnlock();
}
getEditLog().logSync();
- LOG.info("commitBlockSynchronization(newblock=" + lastblock
+ if (closeFile) {
+ LOG.info("commitBlockSynchronization(newblock=" + lastblock
+ ", file=" + src
+ ", newgenerationstamp=" + newgenerationstamp
+ ", newlength=" + newlength
+ ", newtargets=" + Arrays.asList(newtargets) + ") successful");
+ } else {
+ LOG.info("commitBlockSynchronization(" + lastblock + ") successful");
+ }
}
@@ -2637,9 +2727,15 @@
* Renew the lease(s) held by the given client
*/
void renewLease(String holder) throws IOException {
- if (isInSafeMode())
- throw new SafeModeException("Cannot renew lease for " + holder, safeMode);
- leaseManager.renewLease(holder);
+ writeLock();
+ try {
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot renew lease for " + holder, safeMode);
+ }
+ leaseManager.renewLease(holder);
+ } finally {
+ writeUnlock();
+ }
}
/**
@@ -2657,20 +2753,26 @@
public DirectoryListing getListing(String src, byte[] startAfter,
boolean needLocation)
throws AccessControlException, UnresolvedLinkException, IOException {
- if (isPermissionEnabled) {
- if (dir.isDir(src)) {
- checkPathAccess(src, FsAction.READ_EXECUTE);
+ DirectoryListing dl;
+ readLock();
+ try {
+ if (isPermissionEnabled) {
+ if (dir.isDir(src)) {
+ checkPathAccess(src, FsAction.READ_EXECUTE);
+ } else {
+ checkTraverse(src);
+ }
}
- else {
- checkTraverse(src);
+ if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+ logAuditEvent(UserGroupInformation.getCurrentUser(),
+ Server.getRemoteIp(),
+ "listStatus", src, null, null);
}
+ dl = dir.getListing(src, startAfter, needLocation);
+ } finally {
+ readUnlock();
}
- if (auditLog.isInfoEnabled() && isExternalInvocation()) {
- logAuditEvent(UserGroupInformation.getCurrentUser(),
- Server.getRemoteIp(),
- "listStatus", src, null, null);
- }
- return dir.getListing(src, startAfter, needLocation);
+ return dl;
}
/////////////////////////////////////////////////////////
@@ -2701,10 +2803,20 @@
*
* @see org.apache.hadoop.hdfs.server.datanode.DataNode
*/
- public void registerDatanode(DatanodeRegistration nodeReg
- ) throws IOException {
+ public void registerDatanode(DatanodeRegistration nodeReg)
+ throws IOException {
writeLock();
try {
+ registerDatanodeInternal(nodeReg);
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ /** @see #registerDatanode(DatanodeRegistration) */
+ public void registerDatanodeInternal(DatanodeRegistration nodeReg)
+ throws IOException {
+ assert hasWriteLock();
String dnAddress = Server.getRemoteAddress();
if (dnAddress == null) {
// Mostly called inside an RPC.
@@ -2821,17 +2933,12 @@
// because its is done when the descriptor is created
}
- if (safeMode != null) {
- safeMode.checkMode();
- }
- return;
- } finally {
- writeUnlock();
- }
+ checkSafeMode();
}
/* Resolve a node's network location */
private void resolveNetworkLocation (DatanodeDescriptor node) {
+ assert hasWriteLock();
List<String> names = new ArrayList<String>(1);
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
// get the node's IP address
@@ -2908,7 +3015,23 @@
DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
int xceiverCount, int xmitsInProgress, int failedVolumes)
- throws IOException {
+ throws IOException {
+ readLock();
+ try {
+ return handleHeartbeatInternal(nodeReg, capacity, dfsUsed,
+ remaining, blockPoolUsed, xceiverCount, xmitsInProgress,
+ failedVolumes);
+ } finally {
+ readUnlock();
+ }
+ }
+
+ /** @see #handleHeartbeat(DatanodeRegistration, long, long, long, long, int, int, int) */
+ DatanodeCommand[] handleHeartbeatInternal(DatanodeRegistration nodeReg,
+ long capacity, long dfsUsed, long remaining, long blockPoolUsed,
+ int xceiverCount, int xmitsInProgress, int failedVolumes)
+ throws IOException {
+ assert hasReadLock();
DatanodeCommand cmd = null;
synchronized (heartbeats) {
synchronized (datanodeMap) {
@@ -3149,10 +3272,14 @@
int workFound = 0;
int blocksToProcess = 0;
int nodesToProcess = 0;
- // blocks should not be replicated or removed if safe mode is on
+ // Blocks should not be replicated or removed if in safe mode.
+ // It's OK to check safe mode here w/o holding lock, in the worst
+ // case extra replications will be scheduled, and these will get
+ // fixed up later.
if (isInSafeMode())
return workFound;
- synchronized(heartbeats) {
+
+ synchronized (heartbeats) {
blocksToProcess = (int)(heartbeats.size()
* ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION);
nodesToProcess = (int)Math.ceil((double)heartbeats.size()
@@ -3186,13 +3313,13 @@
throws IOException {
writeLock();
try {
- DatanodeDescriptor nodeInfo = getDatanode(nodeID);
- if (nodeInfo != null) {
- removeDatanode(nodeInfo);
- } else {
- NameNode.stateChangeLog.warn("BLOCK* NameSystem.removeDatanode: "
- + nodeID.getName() + " does not exist");
- }
+ DatanodeDescriptor nodeInfo = getDatanode(nodeID);
+ if (nodeInfo != null) {
+ removeDatanode(nodeInfo);
+ } else {
+ NameNode.stateChangeLog.warn("BLOCK* NameSystem.removeDatanode: "
+ + nodeID.getName() + " does not exist");
+ }
} finally {
writeUnlock();
}
@@ -3203,6 +3330,7 @@
* @param nodeInfo datanode descriptor.
*/
private void removeDatanode(DatanodeDescriptor nodeInfo) {
+ assert hasWriteLock();
synchronized (heartbeats) {
if (nodeInfo.isAlive) {
updateStats(nodeInfo, false);
@@ -3218,12 +3346,11 @@
unprotectedRemoveDatanode(nodeInfo);
clusterMap.remove(nodeInfo);
- if (safeMode != null) {
- safeMode.checkMode();
- }
+ checkSafeMode();
}
void unprotectedRemoveDatanode(DatanodeDescriptor nodeDescr) {
+ assert hasWriteLock();
nodeDescr.resetBlocks();
blockManager.removeFromInvalidates(nodeDescr.getStorageID());
if(NameNode.stateChangeLog.isDebugEnabled()) {
@@ -3234,12 +3361,14 @@
}
void unprotectedAddDatanode(DatanodeDescriptor nodeDescr) {
- /* To keep host2DataNodeMap consistent with datanodeMap,
- remove from host2DataNodeMap the datanodeDescriptor removed
- from datanodeMap before adding nodeDescr to host2DataNodeMap.
- */
- host2DataNodeMap.remove(
+ assert hasWriteLock();
+ // To keep host2DataNodeMap consistent with datanodeMap,
+ // remove from host2DataNodeMap the datanodeDescriptor removed
+ // from datanodeMap before adding nodeDescr to host2DataNodeMap.
+ synchronized (datanodeMap) {
+ host2DataNodeMap.remove(
datanodeMap.put(nodeDescr.getStorageID(), nodeDescr));
+ }
host2DataNodeMap.add(nodeDescr);
if(NameNode.stateChangeLog.isDebugEnabled()) {
@@ -3256,8 +3385,11 @@
* @throws IOException
*/
void wipeDatanode(DatanodeID nodeID) throws IOException {
+ assert hasWriteLock();
String key = nodeID.getStorageID();
- host2DataNodeMap.remove(datanodeMap.remove(key));
+ synchronized (datanodeMap) {
+ host2DataNodeMap.remove(datanodeMap.remove(key));
+ }
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.wipeDatanode: "
@@ -3282,8 +3414,9 @@
* effect causes more datanodes to be declared dead.
*/
void heartbeatCheck() {
+ // It's OK to check safe mode w/o taking the lock here, we re-check
+ // for safe mode after taking the lock before removing a datanode.
if (isInSafeMode()) {
- // not to check dead nodes if in safemode
return;
}
boolean allAlive = false;
@@ -3308,6 +3441,9 @@
// acquire the fsnamesystem lock, and then remove the dead node.
if (foundDead) {
writeLock();
+ if (isInSafeMode()) {
+ return;
+ }
try {
synchronized(heartbeats) {
synchronized (datanodeMap) {
@@ -3343,21 +3479,21 @@
writeLock();
startTime = now(); //after acquiring write lock
try {
- DatanodeDescriptor node = getDatanode(nodeID);
- if (node == null || !node.isAlive) {
- throw new IOException("ProcessReport from dead or unregistered node: "
- + nodeID.getName());
- }
- // To minimize startup time, we discard any second (or later) block reports
- // that we receive while still in startup phase.
- if (isInStartupSafeMode() && node.numBlocks() > 0) {
- NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: "
- + "discarded non-initial block report from " + nodeID.getName()
- + " because namenode still in startup phase");
- return;
- }
-
- blockManager.processReport(node, newReport);
+ DatanodeDescriptor node = getDatanode(nodeID);
+ if (node == null || !node.isAlive) {
+ throw new IOException("ProcessReport from dead or unregistered node: "
+ + nodeID.getName());
+ }
+ // To minimize startup time, we discard any second (or later) block reports
+ // that we receive while still in startup phase.
+ if (isInStartupSafeMode() && node.numBlocks() > 0) {
+ NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: "
+ + "discarded non-initial block report from " + nodeID.getName()
+ + " because namenode still in startup phase");
+ return;
+ }
+
+ blockManager.processReport(node, newReport);
} finally {
endTime = now();
writeUnlock();
@@ -3389,6 +3525,7 @@
DatanodeDescriptor addedNode,
DatanodeDescriptor delNodeHint,
BlockPlacementPolicy replicator) {
+ assert hasWriteLock();
// first form a rack to datanodes map and
INodeFile inode = blockManager.getINode(b);
HashMap<String, ArrayList<DatanodeDescriptor>> rackMap =
@@ -3482,20 +3619,20 @@
) throws IOException {
writeLock();
try {
- DatanodeDescriptor node = getDatanode(nodeID);
- if (node == null || !node.isAlive) {
- NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: " + block
- + " is received from dead or unregistered node " + nodeID.getName());
- throw new IOException(
- "Got blockReceived message from unregistered or dead node " + block);
- }
-
- if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: "
- +block+" is received from " + nodeID.getName());
- }
-
- blockManager.addBlock(node, block, delHint);
+ DatanodeDescriptor node = getDatanode(nodeID);
+ if (node == null || !node.isAlive) {
+ NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: " + block
+ + " is received from dead or unregistered node " + nodeID.getName());
+ throw new IOException(
+ "Got blockReceived message from unregistered or dead node " + block);
+ }
+
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: "
+ +block+" is received from " + nodeID.getName());
+ }
+
+ blockManager.addBlock(node, block, delHint);
} finally {
writeUnlock();
}
@@ -3612,75 +3749,74 @@
}
private ArrayList<DatanodeDescriptor> getDatanodeListForReport(
- DatanodeReportType type) {
- boolean listLiveNodes = type == DatanodeReportType.ALL ||
- type == DatanodeReportType.LIVE;
- boolean listDeadNodes = type == DatanodeReportType.ALL ||
- type == DatanodeReportType.DEAD;
-
- HashMap<String, String> mustList = new HashMap<String, String>();
-
+ DatanodeReportType type) {
readLock();
- try {
- if (listDeadNodes) {
- //first load all the nodes listed in include and exclude files.
- for (Iterator<String> it = hostsReader.getHosts().iterator();
- it.hasNext();) {
- mustList.put(it.next(), "");
+ try {
+ boolean listLiveNodes = type == DatanodeReportType.ALL ||
+ type == DatanodeReportType.LIVE;
+ boolean listDeadNodes = type == DatanodeReportType.ALL ||
+ type == DatanodeReportType.DEAD;
+
+ HashMap<String, String> mustList = new HashMap<String, String>();
+
+ if (listDeadNodes) {
+ //first load all the nodes listed in include and exclude files.
+ Iterator<String> it = hostsReader.getHosts().iterator();
+ while (it.hasNext()) {
+ mustList.put(it.next(), "");
+ }
+ it = hostsReader.getExcludedHosts().iterator();
+ while (it.hasNext()) {
+ mustList.put(it.next(), "");
+ }
}
- for (Iterator<String> it = hostsReader.getExcludedHosts().iterator();
- it.hasNext();) {
- mustList.put(it.next(), "");
- }
- }
-
- ArrayList<DatanodeDescriptor> nodes = null;
-
- synchronized (datanodeMap) {
- nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size() +
- mustList.size());
+
+ ArrayList<DatanodeDescriptor> nodes = null;
- for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
- it.hasNext();) {
- DatanodeDescriptor dn = it.next();
- boolean isDead = isDatanodeDead(dn);
- if ( (isDead && listDeadNodes) || (!isDead && listLiveNodes) ) {
+ synchronized (datanodeMap) {
+ nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size() +
+ mustList.size());
+ Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
+ while (it.hasNext()) {
+ DatanodeDescriptor dn = it.next();
+ boolean isDead = isDatanodeDead(dn);
+ if ( (isDead && listDeadNodes) || (!isDead && listLiveNodes) ) {
+ nodes.add(dn);
+ }
+ //Remove any form of the this datanode in include/exclude lists.
+ mustList.remove(dn.getName());
+ mustList.remove(dn.getHost());
+ mustList.remove(dn.getHostName());
+ }
+ }
+
+ if (listDeadNodes) {
+ Iterator<String> it = mustList.keySet().iterator();
+ while (it.hasNext()) {
+ DatanodeDescriptor dn =
+ new DatanodeDescriptor(new DatanodeID(it.next()));
+ dn.setLastUpdate(0);
nodes.add(dn);
}
- //Remove any form of the this datanode in include/exclude lists.
- mustList.remove(dn.getName());
- mustList.remove(dn.getHost());
- mustList.remove(dn.getHostName());
}
- }
-
- if (listDeadNodes) {
- for (Iterator<String> it = mustList.keySet().iterator(); it.hasNext();) {
- DatanodeDescriptor dn =
- new DatanodeDescriptor(new DatanodeID(it.next()));
- dn.setLastUpdate(0);
- nodes.add(dn);
- }
- }
-
- return nodes;
+ return nodes;
} finally {
readUnlock();
}
}
- public DatanodeInfo[] datanodeReport( DatanodeReportType type
- ) throws AccessControlException {
+ public DatanodeInfo[] datanodeReport( DatanodeReportType type)
+ throws AccessControlException {
readLock();
try {
- checkSuperuserPrivilege();
-
- ArrayList<DatanodeDescriptor> results = getDatanodeListForReport(type);
- DatanodeInfo[] arr = new DatanodeInfo[results.size()];
- for (int i=0; i<arr.length; i++) {
- arr[i] = new DatanodeInfo(results.get(i));
- }
- return arr;
+ checkSuperuserPrivilege();
+
+ ArrayList<DatanodeDescriptor> results = getDatanodeListForReport(type);
+ DatanodeInfo[] arr = new DatanodeInfo[results.size()];
+ for (int i=0; i<arr.length; i++) {
+ arr[i] = new DatanodeInfo(results.get(i));
+ }
+ return arr;
} finally {
readUnlock();
}
@@ -3695,17 +3831,17 @@
* @throws IOException if
*/
void saveNamespace() throws AccessControlException, IOException {
- writeLock();
+ readLock();
try {
- checkSuperuserPrivilege();
- if(!isInSafeMode()) {
- throw new IOException("Safe mode should be turned ON " +
- "in order to create namespace image.");
- }
- getFSImage().saveNamespace(true);
- LOG.info("New namespace image has been created.");
+ checkSuperuserPrivilege();
+ if (!isInSafeMode()) {
+ throw new IOException("Safe mode should be turned ON " +
+ "in order to create namespace image.");
+ }
+ getFSImage().saveNamespace(true);
+ LOG.info("New namespace image has been created.");
} finally {
- writeUnlock();
+ readUnlock();
}
}
@@ -3718,16 +3854,16 @@
boolean restoreFailedStorage(String arg) throws AccessControlException {
writeLock();
try {
- checkSuperuserPrivilege();
-
- // if it is disabled - enable it and vice versa.
- if(arg.equals("check"))
- return getFSImage().getStorage().getRestoreFailedStorage();
-
- boolean val = arg.equals("true"); // false if not
- getFSImage().getStorage().setRestoreFailedStorage(val);
-
- return val;
+ checkSuperuserPrivilege();
+
+ // if it is disabled - enable it and vice versa.
+ if(arg.equals("check"))
+ return getFSImage().getStorage().getRestoreFailedStorage();
+
+ boolean val = arg.equals("true"); // false if not
+ getFSImage().getStorage().setRestoreFailedStorage(val);
+
+ return val;
} finally {
writeUnlock();
}
@@ -3739,15 +3875,15 @@
ArrayList<DatanodeDescriptor> dead) {
readLock();
try {
- ArrayList<DatanodeDescriptor> results =
- getDatanodeListForReport(DatanodeReportType.ALL);
- for(Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
- DatanodeDescriptor node = it.next();
- if (isDatanodeDead(node))
- dead.add(node);
- else
- live.add(node);
- }
+ ArrayList<DatanodeDescriptor> results =
+ getDatanodeListForReport(DatanodeReportType.ALL);
+ for(Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
+ DatanodeDescriptor node = it.next();
+ if (isDatanodeDead(node))
+ dead.add(node);
+ else
+ live.add(node);
+ }
} finally {
readUnlock();
}
@@ -3759,13 +3895,13 @@
private void datanodeDump(PrintWriter out) {
readLock();
try {
- synchronized (datanodeMap) {
- out.println("Metasave: Number of datanodes: " + datanodeMap.size());
- for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); it.hasNext();) {
- DatanodeDescriptor node = it.next();
- out.println(node.dumpDatanode());
+ synchronized (datanodeMap) {
+ out.println("Metasave: Number of datanodes: " + datanodeMap.size());
+ for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); it.hasNext();) {
+ DatanodeDescriptor node = it.next();
+ out.println(node.dumpDatanode());
+ }
}
- }
} finally {
readUnlock();
}
@@ -3774,9 +3910,9 @@
/**
* Start decommissioning the specified datanode.
*/
- private void startDecommission (DatanodeDescriptor node)
+ private void startDecommission(DatanodeDescriptor node)
throws IOException {
-
+ assert hasWriteLock();
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
LOG.info("Start Decommissioning node " + node.getName() + " with " +
node.numBlocks() + " blocks.");
@@ -3795,8 +3931,9 @@
/**
* Stop decommissioning the specified datanodes.
*/
- public void stopDecommission (DatanodeDescriptor node)
+ public void stopDecommission(DatanodeDescriptor node)
throws IOException {
+ assert hasWriteLock();
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
LOG.info("Stop Decommissioning node " + node.getName());
synchronized (heartbeats) {
@@ -3807,12 +3944,6 @@
}
}
- /**
- */
- public DatanodeInfo getDataNodeInfo(String name) {
- return datanodeMap.get(name);
- }
-
public Date getStartTime() {
return new Date(systemStart);
}
@@ -3881,6 +4012,7 @@
* decommission completed. Return true if decommission is complete.
*/
boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
+ assert hasWriteLock();
//
// Check to see if all blocks in this decommissioned
// node has reached their target replication factor.
@@ -3965,7 +4097,7 @@
* it will be disallowed from registering.
*/
private boolean verifyNodeRegistration(DatanodeID nodeReg, String ipAddr) {
- assert (hasWriteLock());
+ assert hasWriteLock();
return inHostsList(nodeReg, ipAddr);
}
@@ -3974,6 +4106,7 @@
*/
private void checkDecommissioning(DatanodeDescriptor nodeReg, String ipAddr)
throws IOException {
+ assert hasWriteLock();
// If the registered node is in exclude list, then decommission it
if (inExcludedHostsList(nodeReg, ipAddr)) {
startDecommission(nodeReg);
@@ -3988,6 +4121,7 @@
* @throws IOException
*/
public DatanodeDescriptor getDatanode(DatanodeID nodeID) throws IOException {
+ assert hasReadOrWriteLock();
UnregisteredNodeException e = null;
DatanodeDescriptor node = datanodeMap.get(nodeID.getStorageID());
if (node == null)
@@ -4478,12 +4612,22 @@
}
return isInSafeMode();
}
+
+ private void checkSafeMode() {
+ // safeMode is volatile, and may be set to null at any time
+ SafeModeInfo safeMode = this.safeMode;
+ if (safeMode != null) {
+ safeMode.checkMode();
+ }
+ }
/**
* Check whether the name node is in safe mode.
* @return true if safe mode is ON, false otherwise
*/
- synchronized boolean isInSafeMode() {
+ boolean isInSafeMode() {
+ // safeMode is volatile, and may be set to null at any time
+ SafeModeInfo safeMode = this.safeMode;
if (safeMode == null)
return false;
return safeMode.isOn();
@@ -4492,18 +4636,23 @@
/**
* Check whether the name node is in startup mode.
*/
- synchronized boolean isInStartupSafeMode() {
+ boolean isInStartupSafeMode() {
+ // safeMode is volatile, and may be set to null at any time
+ SafeModeInfo safeMode = this.safeMode;
if (safeMode == null)
return false;
- return safeMode.isOn() && !safeMode.isManual();
+ return !safeMode.isManual() && safeMode.isOn();
}
/**
* Check whether replication queues are populated.
*/
- synchronized boolean isPopulatingReplQueues() {
- return (!isInSafeMode() ||
- safeMode.isPopulatingReplQueues());
+ boolean isPopulatingReplQueues() {
+ // safeMode is volatile, and may be set to null at any time
+ SafeModeInfo safeMode = this.safeMode;
+ if (safeMode == null)
+ return true;
+ return safeMode.isPopulatingReplQueues();
}
/**
@@ -4511,6 +4660,8 @@
* @param replication current replication
*/
void incrementSafeBlockCount(int replication) {
+ // safeMode is volatile, and may be set to null at any time
+ SafeModeInfo safeMode = this.safeMode;
if (safeMode == null)
return;
safeMode.incrementSafeBlockCount((short)replication);
@@ -4520,6 +4671,8 @@
* Decrement number of blocks that reached minimal replication.
*/
void decrementSafeBlockCount(Block b) {
+ // safeMode is volatile, and may be set to null at any time
+ SafeModeInfo safeMode = this.safeMode;
if (safeMode == null) // mostly true
return;
safeMode.decrementSafeBlockCount((short)blockManager.countNodes(b).liveReplicas());
@@ -4529,6 +4682,8 @@
* Set the total number of blocks in the system.
*/
void setBlockTotal() {
+ // safeMode is volatile, and may be set to null at any time
+ SafeModeInfo safeMode = this.safeMode;
if (safeMode == null)
return;
safeMode.setBlockTotal((int)getCompleteBlocksTotal());
@@ -4550,29 +4705,34 @@
long getCompleteBlocksTotal() {
// Calculate number of blocks under construction
long numUCBlocks = 0;
- for (Lease lease : leaseManager.getSortedLeases()) {
- for (String path : lease.getPaths()) {
- INode node;
- try {
- node = dir.getFileINode(path);
- } catch (UnresolvedLinkException e) {
- throw new AssertionError("Lease files should reside on this FS");
- }
- assert node != null : "Found a lease for nonexisting file.";
- assert node.isUnderConstruction() :
- "Found a lease for file that is not under construction.";
- INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node;
- BlockInfo[] blocks = cons.getBlocks();
- if(blocks == null)
- continue;
- for(BlockInfo b : blocks) {
- if(!b.isComplete())
- numUCBlocks++;
+ readLock();
+ try {
+ for (Lease lease : leaseManager.getSortedLeases()) {
+ for (String path : lease.getPaths()) {
+ INode node;
+ try {
+ node = dir.getFileINode(path);
+ } catch (UnresolvedLinkException e) {
+ throw new AssertionError("Lease files should reside on this FS");
+ }
+ assert node != null : "Found a lease for nonexisting file.";
+ assert node.isUnderConstruction() :
+ "Found a lease for file that is not under construction.";
+ INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node;
+ BlockInfo[] blocks = cons.getBlocks();
+ if(blocks == null)
+ continue;
+ for(BlockInfo b : blocks) {
+ if(!b.isComplete())
+ numUCBlocks++;
+ }
}
}
+ LOG.info("Number of blocks under construction: " + numUCBlocks);
+ return getBlocksTotal() - numUCBlocks;
+ } finally {
+ readUnlock();
}
- LOG.info("Number of blocks under construction: " + numUCBlocks);
- return getBlocksTotal() - numUCBlocks;
}
/**
@@ -4594,6 +4754,7 @@
safeMode.setResourcesLow();
}
safeMode.setManual();
+ getEditLog().logSyncAll();
NameNode.stateChangeLog.info("STATE* Safe mode is ON. "
+ safeMode.getTurnOffTip());
} finally {
@@ -4608,14 +4769,14 @@
void leaveSafeMode(boolean checkForUpgrades) throws SafeModeException {
writeLock();
try {
- if (!isInSafeMode()) {
- NameNode.stateChangeLog.info("STATE* Safe mode is already OFF.");
- return;
- }
- if(getDistributedUpgradeState())
- throw new SafeModeException("Distributed upgrade is in progress",
- safeMode);
- safeMode.leave(checkForUpgrades);
+ if (!isInSafeMode()) {
+ NameNode.stateChangeLog.info("STATE* Safe mode is already OFF.");
+ return;
+ }
+ if(getDistributedUpgradeState())
+ throw new SafeModeException("Distributed upgrade is in progress",
+ safeMode);
+ safeMode.leave(checkForUpgrades);
} finally {
writeUnlock();
}
@@ -4624,9 +4785,10 @@
String getSafeModeTip() {
readLock();
try {
- if (!isInSafeMode())
- return "";
- return safeMode.getTurnOffTip();
+ if (!isInSafeMode()) {
+ return "";
+ }
+ return safeMode.getTurnOffTip();
} finally {
readUnlock();
}
@@ -4639,12 +4801,11 @@
CheckpointSignature rollEditLog() throws IOException {
writeLock();
try {
- if (isInSafeMode()) {
- throw new SafeModeException("Checkpoint not created",
- safeMode);
- }
- LOG.info("Roll Edit Log from " + Server.getRemoteAddress());
- return getFSImage().rollEditLog();
+ if (isInSafeMode()) {
+ throw new SafeModeException("Log not rolled", safeMode);
+ }
+ LOG.info("Roll Edit Log from " + Server.getRemoteAddress());
+ return getFSImage().rollEditLog();
} finally {
writeUnlock();
}
@@ -4659,12 +4820,11 @@
void rollFSImage(CheckpointSignature sig) throws IOException {
writeLock();
try {
- if (isInSafeMode()) {
- throw new SafeModeException("Checkpoint not created",
- safeMode);
- }
- LOG.info("Roll FSImage from " + Server.getRemoteAddress());
- getFSImage().rollFSImage(sig, true);
+ if (isInSafeMode()) {
+ throw new SafeModeException("Image not rolled", safeMode);
+ }
+ LOG.info("Roll FSImage from " + Server.getRemoteAddress());
+ getFSImage().rollFSImage(sig, true);
} finally {
writeUnlock();
}
@@ -4676,10 +4836,13 @@
throws IOException {
writeLock();
try {
- LOG.info("Start checkpoint for " + bnReg.getAddress());
- NamenodeCommand cmd = getFSImage().startCheckpoint(bnReg, nnReg);
- getEditLog().logSync();
- return cmd;
+ if (isInSafeMode()) {
+ throw new SafeModeException("Checkpoint not started", safeMode);
+ }
+ LOG.info("Start checkpoint for " + bnReg.getAddress());
+ NamenodeCommand cmd = getFSImage().startCheckpoint(bnReg, nnReg);
+ getEditLog().logSync();
+ return cmd;
} finally {
writeUnlock();
}
@@ -4689,8 +4852,11 @@
CheckpointSignature sig) throws IOException {
writeLock();
try {
- LOG.info("End checkpoint for " + registration.getAddress());
- getFSImage().endCheckpoint(sig, registration.getRole());
+ if (isInSafeMode()) {
+ throw new SafeModeException("Checkpoint not ended", safeMode);
+ }
+ LOG.info("End checkpoint for " + registration.getAddress());
+ getFSImage().endCheckpoint(sig, registration.getRole());
} finally {
writeUnlock();
}
@@ -4815,7 +4981,12 @@
@Override // FSNamesystemMBean
@Metric
public long getFilesTotal() {
- return this.dir.totalInodes();
+ readLock();
+ try {
+ return this.dir.totalInodes();
+ } finally {
+ readUnlock();
+ }
}
@Override // FSNamesystemMBean
@@ -4949,18 +5120,25 @@
/**
* Increments, logs and then returns the stamp
*/
- long nextGenerationStamp() {
+ private long nextGenerationStamp() throws SafeModeException {
+ assert hasWriteLock();
+ if (isInSafeMode()) {
+ throw new SafeModeException(
+ "Cannot get next generation stamp", safeMode);
+ }
long gs = generationStamp.nextStamp();
getEditLog().logGenerationStamp(gs);
+ // NB: callers sync the log
return gs;
}
private INodeFileUnderConstruction checkUCBlock(ExtendedBlock block,
String clientName) throws IOException {
- // check safe mode
- if (isInSafeMode())
+ assert hasWriteLock();
+ if (isInSafeMode()) {
throw new SafeModeException("Cannot get a new generation stamp and an " +
"access token for block " + block, safeMode);
+ }
// check stored block state
BlockInfo storedBlock = blockManager.getStoredBlock(ExtendedBlock.getLocalBlock(block));
@@ -5001,25 +5179,27 @@
*/
LocatedBlock updateBlockForPipeline(ExtendedBlock block,
String clientName) throws IOException {
+ LocatedBlock locatedBlock;
writeLock();
try {
- // check vadility of parameters
- checkUCBlock(block, clientName);
-
- // get a new generation stamp and an access token
- block.setGenerationStamp(nextGenerationStamp());
- LocatedBlock locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
- if (isBlockTokenEnabled) {
- locatedBlock.setBlockToken(blockTokenSecretManager.generateToken(
+ // check vadility of parameters
+ checkUCBlock(block, clientName);
+
+ // get a new generation stamp and an access token
+ block.setGenerationStamp(nextGenerationStamp());
+ locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
+ if (isBlockTokenEnabled) {
+ locatedBlock.setBlockToken(blockTokenSecretManager.generateToken(
block, EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
- }
- return locatedBlock;
+ }
} finally {
writeUnlock();
}
+ // Ensure we record the new generation stamp
+ getEditLog().logSync();
+ return locatedBlock;
}
-
/**
* Update a pipeline for a block under construction
*
@@ -5034,15 +5214,32 @@
throws IOException {
writeLock();
try {
- assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and "
- + oldBlock + " has different block identifier";
- LOG.info("updatePipeline(block=" + oldBlock
- + ", newGenerationStamp=" + newBlock.getGenerationStamp()
- + ", newLength=" + newBlock.getNumBytes()
- + ", newNodes=" + Arrays.asList(newNodes)
- + ", clientName=" + clientName
- + ")");
+ if (isInSafeMode()) {
+ throw new SafeModeException("Pipeline not updated", safeMode);
+ }
+ assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and "
+ + oldBlock + " has different block identifier";
+ LOG.info("updatePipeline(block=" + oldBlock
+ + ", newGenerationStamp=" + newBlock.getGenerationStamp()
+ + ", newLength=" + newBlock.getNumBytes()
+ + ", newNodes=" + Arrays.asList(newNodes)
+ + ", clientName=" + clientName
+ + ")");
+ updatePipelineInternal(clientName, oldBlock, newBlock, newNodes);
+ } finally {
+ writeUnlock();
+ }
+ if (supportAppends) {
+ getEditLog().logSync();
+ }
+ LOG.info("updatePipeline(" + oldBlock + ") successfully to " + newBlock);
+ }
+ /** @see updatePipeline(String, ExtendedBlock, ExtendedBlock, DatanodeID[]) */
+ void updatePipelineInternal(String clientName, ExtendedBlock oldBlock,
+ ExtendedBlock newBlock, DatanodeID[] newNodes)
+ throws IOException {
+ assert hasWriteLock();
// check the vadility of the block and lease holder name
final INodeFileUnderConstruction pendingFile =
checkUCBlock(oldBlock, clientName);
@@ -5052,8 +5249,8 @@
if (newBlock.getGenerationStamp() <= blockinfo.getGenerationStamp() ||
newBlock.getNumBytes() < blockinfo.getNumBytes()) {
String msg = "Update " + oldBlock + " (len = " +
- blockinfo.getNumBytes() + ") to an older state: " + newBlock +
- " (len = " + newBlock.getNumBytes() +")";
+ blockinfo.getNumBytes() + ") to an older state: " + newBlock +
+ " (len = " + newBlock.getNumBytes() +")";
LOG.warn(msg);
throw new IOException(msg);
}
@@ -5076,21 +5273,15 @@
String src = leaseManager.findPath(pendingFile);
if (supportAppends) {
dir.persistBlocks(src, pendingFile);
- getEditLog().logSync();
- }
- LOG.info("updatePipeline(" + oldBlock + ") successfully to " + newBlock);
- return;
- } finally {
- writeUnlock();
}
}
// rename was successful. If any part of the renamed subtree had
// files that were being written to, update with new filename.
- //
- void changeLease(String src, String dst, HdfsFileStatus dinfo) {
+ void unprotectedChangeLease(String src, String dst, HdfsFileStatus dinfo) {
String overwrite;
String replaceBy;
+ assert hasWriteLock();
boolean destinationExisted = true;
if (dinfo == null) {
@@ -5113,6 +5304,9 @@
* Serializes leases.
*/
void saveFilesUnderConstruction(DataOutputStream out) throws IOException {
+ // This is run by an inferior thread of saveNamespace, which holds a read
+ // lock on our behalf. If we took the read lock here, we could block
+ // for fairness if a writer is waiting on the lock.
synchronized (leaseManager) {
out.writeInt(leaseManager.countPath()); // write the size
@@ -5201,6 +5395,7 @@
/** Get a datanode descriptor given corresponding storageID */
DatanodeDescriptor getDatanode(String nodeID) {
+ assert hasReadOrWriteLock();
return datanodeMap.get(nodeID);
}
@@ -5250,36 +5445,36 @@
readLock();
try {
- if (!isPopulatingReplQueues()) {
- throw new IOException("Cannot run listCorruptFileBlocks because " +
- "replication queues have not been initialized.");
- }
- checkSuperuserPrivilege();
- long startBlockId = 0;
- // print a limited # of corrupt files per call
- int count = 0;
- ArrayList<CorruptFileBlockInfo> corruptFiles = new ArrayList<CorruptFileBlockInfo>();
-
- if (startBlockAfter != null) {
- startBlockId = Block.filename2id(startBlockAfter);
- }
- BlockIterator blkIterator = blockManager.getCorruptReplicaBlockIterator();
- while (blkIterator.hasNext()) {
- Block blk = blkIterator.next();
- INode inode = blockManager.getINode(blk);
- if (inode != null && blockManager.countNodes(blk).liveReplicas() == 0) {
- String src = FSDirectory.getFullPathName(inode);
- if (((startBlockAfter == null) || (blk.getBlockId() > startBlockId))
- && (src.startsWith(path))) {
- corruptFiles.add(new CorruptFileBlockInfo(src, blk));
- count++;
- if (count >= DEFAULT_MAX_CORRUPT_FILEBLOCKS_RETURNED)
- break;
+ if (!isPopulatingReplQueues()) {
+ throw new IOException("Cannot run listCorruptFileBlocks because " +
+ "replication queues have not been initialized.");
+ }
+ checkSuperuserPrivilege();
+ long startBlockId = 0;
+ // print a limited # of corrupt files per call
+ int count = 0;
+ ArrayList<CorruptFileBlockInfo> corruptFiles = new ArrayList<CorruptFileBlockInfo>();
+
+ if (startBlockAfter != null) {
+ startBlockId = Block.filename2id(startBlockAfter);
+ }
+ BlockIterator blkIterator = blockManager.getCorruptReplicaBlockIterator();
+ while (blkIterator.hasNext()) {
+ Block blk = blkIterator.next();
+ INode inode = blockManager.getINode(blk);
+ if (inode != null && blockManager.countNodes(blk).liveReplicas() == 0) {
+ String src = FSDirectory.getFullPathName(inode);
+ if (((startBlockAfter == null) || (blk.getBlockId() > startBlockId))
+ && (src.startsWith(path))) {
+ corruptFiles.add(new CorruptFileBlockInfo(src, blk));
+ count++;
+ if (count >= DEFAULT_MAX_CORRUPT_FILEBLOCKS_RETURNED)
+ break;
+ }
}
}
- }
- LOG.info("list corrupt file blocks returned: " + count);
- return corruptFiles;
+ LOG.info("list corrupt file blocks returned: " + count);
+ return corruptFiles;
} finally {
readUnlock();
}
@@ -5291,17 +5486,17 @@
public ArrayList<DatanodeDescriptor> getDecommissioningNodes() {
readLock();
try {
- ArrayList<DatanodeDescriptor> decommissioningNodes =
+ ArrayList<DatanodeDescriptor> decommissioningNodes =
new ArrayList<DatanodeDescriptor>();
- ArrayList<DatanodeDescriptor> results =
+ ArrayList<DatanodeDescriptor> results =
getDatanodeListForReport(DatanodeReportType.LIVE);
- for (Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
- DatanodeDescriptor node = it.next();
- if (node.isDecommissionInProgress()) {
- decommissioningNodes.add(node);
+ for (Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
+ DatanodeDescriptor node = it.next();
+ if (node.isDecommissionInProgress()) {
+ decommissioningNodes.add(node);
+ }
}
- }
- return decommissioningNodes;
+ return decommissioningNodes;
} finally {
readUnlock();
}
@@ -5339,32 +5534,38 @@
*/
public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
throws IOException {
- if (isInSafeMode()) {
- throw new SafeModeException("Cannot issue delegation token", safeMode);
- }
- if (!isAllowedDelegationTokenOp()) {
- throw new IOException(
+ Token<DelegationTokenIdentifier> token;
+ writeLock();
+ try {
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot issue delegation token", safeMode);
+ }
+ if (!isAllowedDelegationTokenOp()) {
+ throw new IOException(
"Delegation Token can be issued only with kerberos or web authentication");
- }
-
- if(dtSecretManager == null || !dtSecretManager.isRunning()) {
- LOG.warn("trying to get DT with no secret manager running");
- return null;
- }
+ }
+ if (dtSecretManager == null || !dtSecretManager.isRunning()) {
+ LOG.warn("trying to get DT with no secret manager running");
+ return null;
+ }
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
- String user = ugi.getUserName();
- Text owner = new Text(user);
- Text realUser = null;
- if (ugi.getRealUser() != null) {
- realUser = new Text(ugi.getRealUser().getUserName());
- }
- DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(owner,
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ String user = ugi.getUserName();
+ Text owner = new Text(user);
+ Text realUser = null;
+ if (ugi.getRealUser() != null) {
+ realUser = new Text(ugi.getRealUser().getUserName());
+ }
+ DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(owner,
renewer, realUser);
- Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>(
+ token = new Token<DelegationTokenIdentifier>(
dtId, dtSecretManager);
- long expiryTime = dtSecretManager.getTokenExpiryTime(dtId);
- logGetDelegationToken(dtId, expiryTime);
+ long expiryTime = dtSecretManager.getTokenExpiryTime(dtId);
+ getEditLog().logGetDelegationToken(dtId, expiryTime);
+ } finally {
+ writeUnlock();
+ }
+ getEditLog().logSync();
return token;
}
@@ -5377,20 +5578,27 @@
*/
public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
throws InvalidToken, IOException {
- if (isInSafeMode()) {
- throw new SafeModeException("Cannot renew delegation token", safeMode);
+ long expiryTime;
+ writeLock();
+ try {
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot renew delegation token", safeMode);
+ }
+ if (!isAllowedDelegationTokenOp()) {
+ throw new IOException(
+ "Delegation Token can be renewed only with kerberos or web authentication");
+ }
+ String renewer = UserGroupInformation.getCurrentUser().getShortUserName();
+ expiryTime = dtSecretManager.renewToken(token, renewer);
+ DelegationTokenIdentifier id = new DelegationTokenIdentifier();
+ ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+ DataInputStream in = new DataInputStream(buf);
+ id.readFields(in);
+ getEditLog().logRenewDelegationToken(id, expiryTime);
+ } finally {
+ writeUnlock();
}
- if (!isAllowedDelegationTokenOp()) {
- throw new IOException(
- "Delegation Token can be renewed only with kerberos or web authentication");
- }
- String renewer = UserGroupInformation.getCurrentUser().getShortUserName();
- long expiryTime = dtSecretManager.renewToken(token, renewer);
- DelegationTokenIdentifier id = new DelegationTokenIdentifier();
- ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
- DataInputStream in = new DataInputStream(buf);
- id.readFields(in);
- logRenewDelegationToken(id, expiryTime);
+ getEditLog().logSync();
return expiryTime;
}
@@ -5401,13 +5609,19 @@
*/
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
throws IOException {
- if (isInSafeMode()) {
- throw new SafeModeException("Cannot cancel delegation token", safeMode);
- }
- String canceller = UserGroupInformation.getCurrentUser().getUserName();
- DelegationTokenIdentifier id = dtSecretManager
+ writeLock();
+ try {
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot cancel delegation token", safeMode);
+ }
+ String canceller = UserGroupInformation.getCurrentUser().getUserName();
+ DelegationTokenIdentifier id = dtSecretManager
.cancelToken(token, canceller);
- logCancelDelegationToken(id);
+ getEditLog().logCancelDelegationToken(id);
+ } finally {
+ writeUnlock();
+ }
+ getEditLog().logSync();
}
/**
@@ -5425,57 +5639,6 @@
}
/**
- * Log the getDelegationToken operation to edit logs
- *
- * @param id identifer of the new delegation token
- * @param expiryTime when delegation token expires
- */
- private void logGetDelegationToken(DelegationTokenIdentifier id,
- long expiryTime) throws IOException {
- writeLock();
- try {
- getEditLog().logGetDelegationToken(id, expiryTime);
- } finally {
- writeUnlock();
- }
- getEditLog().logSync();
- }
-
- /**
- * Log the renewDelegationToken operation to edit logs
- *
- * @param id identifer of the delegation token being renewed
- * @param expiryTime when delegation token expires
- */
- private void logRenewDelegationToken(DelegationTokenIdentifier id,
- long expiryTime) throws IOException {
- writeLock();
- try {
- getEditLog().logRenewDelegationToken(id, expiryTime);
- } finally {
- writeUnlock();
- }
- getEditLog().logSync();
- }
-
-
- /**
- * Log the cancelDelegationToken operation to edit logs
- *
- * @param id identifer of the delegation token being cancelled
- */
- private void logCancelDelegationToken(DelegationTokenIdentifier id)
- throws IOException {
- writeLock();
- try {
- getEditLog().logCancelDelegationToken(id);
- } finally {
- writeUnlock();
- }
- getEditLog().logSync();
- }
-
- /**
* Log the updateMasterKey operation to edit logs
*
* @param key new delegation key.
@@ -5483,6 +5646,10 @@
public void logUpdateMasterKey(DelegationKey key) throws IOException {
writeLock();
try {
+ if (isInSafeMode()) {
+ throw new SafeModeException(
+ "Cannot log master key update in safe mode", safeMode);
+ }
getEditLog().logUpdateMasterKey(key);
} finally {
writeUnlock();
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java b/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
index ff6b2dd..c38ba5b 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
@@ -391,6 +391,7 @@
/** Check the leases beginning from the oldest. */
private synchronized void checkLeases() {
+ assert fsnamesystem.hasWriteLock();
for(; sortedLeases.size() > 0; ) {
final Lease oldest = sortedLeases.first();
if (!oldest.expiredHardLimit()) {
diff --git a/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java b/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java
index 67bf0ce..9ee4a6f 100644
--- a/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java
+++ b/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java
@@ -56,7 +56,7 @@
.racks(racks)
.hosts(hosts)
.build();
-
+ dfsCluster.waitClusterUp();
namenode = conf.get(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "file:///");
username = System.getProperty("user.name");
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java b/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
index 02bbf5f..a1adbb2 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -555,6 +555,15 @@
IOUtils.copyBytes(is, os, s.length(), true);
}
+ /* Append the given string to the given file */
+ public static void appendFile(FileSystem fs, Path p, String s)
+ throws IOException {
+ assert fs.exists(p);
+ InputStream is = new ByteArrayInputStream(s.getBytes());
+ FSDataOutputStream os = fs.append(p);
+ IOUtils.copyBytes(is, os, s.length(), true);
+ }
+
// Returns url content as string.
public static String urlGet(URL url) throws IOException {
URLConnection conn = url.openConnection();
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java
index 1a6de15..6ded5a3 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java
@@ -35,6 +35,7 @@
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import static org.junit.Assert.*;
import org.junit.After;
@@ -201,7 +202,8 @@
nodes.add(nodename);
writeConfigFile(excludeFile, nodes);
cluster.getNamesystem(nnIndex).refreshNodes(conf);
- DatanodeInfo ret = cluster.getNamesystem(nnIndex).getDatanode(info[index]);
+ DatanodeInfo ret = NameNodeAdapter.getDatanode(
+ cluster.getNameNode(nnIndex), info[index]);
waitNodeState(ret, waitForState);
return ret;
}
@@ -371,7 +373,7 @@
// Stop decommissioning and verify stats
writeConfigFile(excludeFile, null);
fsn.refreshNodes(conf);
- DatanodeInfo ret = fsn.getDatanode(downnode);
+ DatanodeInfo ret = NameNodeAdapter.getDatanode(namenode, downnode);
waitNodeState(ret, AdminStates.NORMAL);
verifyStats(namenode, fsn, ret, false);
}
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestSafeMode.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestSafeMode.java
index ac62f06..8c98a20 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestSafeMode.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestSafeMode.java
@@ -20,20 +20,44 @@
import java.io.IOException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
-import junit.framework.TestCase;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
/**
* Tests to verify safe mode correctness.
*/
-public class TestSafeMode extends TestCase {
-
- static Log LOG = LogFactory.getLog(TestSafeMode.class);
+public class TestSafeMode {
+ Configuration conf;
+ MiniDFSCluster cluster;
+ FileSystem fs;
+ DistributedFileSystem dfs;
+
+ @Before
+ public void startUp() throws IOException {
+ conf = new HdfsConfiguration();
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ dfs = (DistributedFileSystem)fs;
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ if (fs != null) {
+ fs.close();
+ }
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
/**
* This test verifies that if SafeMode is manually entered, name-node does not
@@ -51,61 +75,123 @@
*
* @throws IOException
*/
- public void testManualSafeMode() throws IOException {
- MiniDFSCluster cluster = null;
- DistributedFileSystem fs = null;
+ @Test
+ public void testManualSafeMode() throws IOException {
+ fs = (DistributedFileSystem)cluster.getFileSystem();
+ Path file1 = new Path("/tmp/testManualSafeMode/file1");
+ Path file2 = new Path("/tmp/testManualSafeMode/file2");
+
+ // create two files with one block each.
+ DFSTestUtil.createFile(fs, file1, 1000, (short)1, 0);
+ DFSTestUtil.createFile(fs, file2, 2000, (short)1, 0);
+ fs.close();
+ cluster.shutdown();
+
+ // now bring up just the NameNode.
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).format(false).build();
+ cluster.waitActive();
+ dfs = (DistributedFileSystem)cluster.getFileSystem();
+
+ assertTrue("No datanode is started. Should be in SafeMode",
+ dfs.setSafeMode(SafeModeAction.SAFEMODE_GET));
+
+ // manually set safemode.
+ dfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+
+ // now bring up the datanode and wait for it to be active.
+ cluster.startDataNodes(conf, 1, true, null, null);
+ cluster.waitActive();
+
+ // wait longer than dfs.namenode.safemode.extension
try {
- Configuration conf = new HdfsConfiguration();
- // disable safemode extension to make the test run faster.
- conf.set(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, "1");
- cluster = new MiniDFSCluster.Builder(conf).build();
- cluster.waitActive();
-
- fs = (DistributedFileSystem)cluster.getFileSystem();
- Path file1 = new Path("/tmp/testManualSafeMode/file1");
- Path file2 = new Path("/tmp/testManualSafeMode/file2");
-
- LOG.info("Created file1 and file2.");
-
- // create two files with one block each.
- DFSTestUtil.createFile(fs, file1, 1000, (short)1, 0);
- DFSTestUtil.createFile(fs, file2, 2000, (short)1, 0);
- fs.close();
- cluster.shutdown();
-
- // now bring up just the NameNode.
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).format(false).build();
- cluster.waitActive();
- fs = (DistributedFileSystem)cluster.getFileSystem();
-
- LOG.info("Restarted cluster with just the NameNode");
-
- assertTrue("No datanode is started. Should be in SafeMode",
- fs.setSafeMode(SafeModeAction.SAFEMODE_GET));
-
- // manually set safemode.
- fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
-
- // now bring up the datanode and wait for it to be active.
- cluster.startDataNodes(conf, 1, true, null, null);
- cluster.waitActive();
-
- LOG.info("Datanode is started.");
+ Thread.sleep(2000);
+ } catch (InterruptedException ignored) {}
- // wait longer than dfs.namenode.safemode.extension
- try {
- Thread.sleep(2000);
- } catch (InterruptedException ignored) {}
-
- assertTrue("should still be in SafeMode",
- fs.setSafeMode(SafeModeAction.SAFEMODE_GET));
-
- fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
- assertFalse("should not be in SafeMode",
- fs.setSafeMode(SafeModeAction.SAFEMODE_GET));
- } finally {
- if(fs != null) fs.close();
- if(cluster!= null) cluster.shutdown();
- }
+ assertTrue("should still be in SafeMode",
+ dfs.setSafeMode(SafeModeAction.SAFEMODE_GET));
+ assertFalse("should not be in SafeMode",
+ dfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE));
}
-}
+
+ public interface FSRun {
+ public abstract void run(FileSystem fs) throws IOException;
+ }
+
+ /**
+ * Assert that the given function fails to run due to a safe
+ * mode exception.
+ */
+ public void runFsFun(String msg, FSRun f) {
+ try {
+ f.run(fs);
+ fail(msg);
+ } catch (IOException ioe) {
+ assertTrue(ioe.getMessage().contains("safe mode"));
+ }
+ }
+
+ /**
+ * Run various fs operations while the NN is in safe mode,
+ * assert that they are either allowed or fail as expected.
+ */
+ @Test
+ public void testOperationsWhileInSafeMode() throws IOException {
+ final Path file1 = new Path("/file1");
+
+ assertFalse(dfs.setSafeMode(SafeModeAction.SAFEMODE_GET));
+ DFSTestUtil.createFile(fs, file1, 1024, (short)1, 0);
+ assertTrue("Could not enter SM",
+ dfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER));
+
+ runFsFun("Set quota while in SM", new FSRun() {
+ public void run(FileSystem fs) throws IOException {
+ ((DistributedFileSystem)fs).setQuota(file1, 1, 1);
+ }});
+
+ runFsFun("Set perm while in SM", new FSRun() {
+ public void run(FileSystem fs) throws IOException {
+ fs.setPermission(file1, FsPermission.getDefault());
+ }});
+
+ runFsFun("Set owner while in SM", new FSRun() {
+ public void run(FileSystem fs) throws IOException {
+ fs.setOwner(file1, "user", "group");
+ }});
+
+ runFsFun("Set repl while in SM", new FSRun() {
+ public void run(FileSystem fs) throws IOException {
+ fs.setReplication(file1, (short)1);
+ }});
+
+ runFsFun("Append file while in SM", new FSRun() {
+ public void run(FileSystem fs) throws IOException {
+ DFSTestUtil.appendFile(fs, file1, "new bytes");
+ }});
+
+ runFsFun("Delete file while in SM", new FSRun() {
+ public void run(FileSystem fs) throws IOException {
+ fs.delete(file1, false);
+ }});
+
+ runFsFun("Rename file while in SM", new FSRun() {
+ public void run(FileSystem fs) throws IOException {
+ fs.rename(file1, new Path("file2"));
+ }});
+
+ try {
+ fs.setTimes(file1, 0, 0);
+ } catch (IOException ioe) {
+ fail("Set times failed while in SM");
+ }
+
+ try {
+ DFSTestUtil.readFile(fs, file1);
+ } catch (IOException ioe) {
+ fail("Set times failed while in SM");
+ }
+
+ assertFalse("Could not leave SM",
+ dfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE));
+ }
+
+}
\ No newline at end of file
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index 683f505..883c51f 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -21,6 +21,7 @@
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
/**
@@ -77,4 +78,18 @@
public static String getLeaseHolderForPath(NameNode namenode, String path) {
return namenode.getNamesystem().leaseManager.getLeaseByPath(path).getHolder();
}
+
+ /**
+ * Return the datanode descriptor for the given datanode.
+ */
+ public static DatanodeDescriptor getDatanode(NameNode namenode,
+ DatanodeID id) throws IOException {
+ FSNamesystem ns = namenode.getNamesystem();
+ ns.readLock();
+ try {
+ return ns.getDatanode(id);
+ } finally {
+ ns.readUnlock();
+ }
+ }
}
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
index 60d697a..0c5aa8c 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
@@ -61,7 +61,14 @@
FSNamesystem namesystem = cluster.getNamesystem();
String state = alive ? "alive" : "dead";
while (System.currentTimeMillis() < stopTime) {
- if (namesystem.getDatanode(nodeID).isAlive == alive) {
+ namesystem.readLock();
+ DatanodeDescriptor dd;
+ try {
+ dd = namesystem.getDatanode(nodeID);
+ } finally {
+ namesystem.readUnlock();
+ }
+ if (dd.isAlive == alive) {
LOG.info("datanode " + nodeID + " is " + state);
return;
}
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestHeartbeatHandling.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestHeartbeatHandling.java
index 4a2767a..9b10156 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestHeartbeatHandling.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestHeartbeatHandling.java
@@ -54,7 +54,13 @@
final DatanodeRegistration nodeReg =
DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
- DatanodeDescriptor dd = namesystem.getDatanode(nodeReg);
+ namesystem.readLock();
+ DatanodeDescriptor dd;
+ try {
+ dd = namesystem.getDatanode(nodeReg);
+ } finally {
+ namesystem.readUnlock();
+ }
final int REMAINING_BLOCKS = 1;
final int MAX_REPLICATE_LIMIT =
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNNThroughputBenchmark.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNNThroughputBenchmark.java
index 40c25d0..8203292 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNNThroughputBenchmark.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNNThroughputBenchmark.java
@@ -24,7 +24,6 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Test;
public class TestNNThroughputBenchmark {
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSafeMode.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSafeMode.java
index 875fc33..1f953bb 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSafeMode.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSafeMode.java
@@ -20,109 +20,31 @@
import java.io.IOException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.DFSTestUtil;
-import junit.framework.TestCase;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
/**
* Tests to verify safe mode correctness.
*/
-public class TestSafeMode extends TestCase {
+public class TestSafeMode {
- static Log LOG = LogFactory.getLog(TestSafeMode.class);
-
- /**
- * This test verifies that if SafeMode is manually entered, name-node does not
- * come out of safe mode even after the startup safe mode conditions are met.
- * <ol>
- * <li>Start cluster with 1 data-node.</li>
- * <li>Create 2 files with replication 1.</li>
- * <li>Re-start cluster with 0 data-nodes.
- * Name-node should stay in automatic safe-mode.</li>
- * <li>Enter safe mode manually.</li>
- * <li>Start the data-node.</li>
- * <li>Wait longer than <tt>dfs.namenode.safemode.extension</tt> and
- * verify that the name-node is still in safe mode.</li>
- * </ol>
- *
- * @throws IOException
- */
- public void testManualSafeMode() throws IOException {
- MiniDFSCluster cluster = null;
- DistributedFileSystem fs = null;
- try {
- Configuration conf = new HdfsConfiguration();
- // disable safemode extension to make the test run faster.
- conf.set(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, "1");
- cluster = new MiniDFSCluster.Builder(conf).build();
- cluster.waitActive();
-
- fs = (DistributedFileSystem)cluster.getFileSystem();
- Path file1 = new Path("/tmp/testManualSafeMode/file1");
- Path file2 = new Path("/tmp/testManualSafeMode/file2");
-
- LOG.info("Created file1 and file2.");
-
- // create two files with one block each.
- DFSTestUtil.createFile(fs, file1, 1000, (short)1, 0);
- DFSTestUtil.createFile(fs, file2, 2000, (short)1, 0);
- fs.close();
- cluster.shutdown();
-
- // now bring up just the NameNode.
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).format(false).build();
- cluster.waitActive();
- fs = (DistributedFileSystem)cluster.getFileSystem();
-
- LOG.info("Restarted cluster with just the NameNode");
-
- assertTrue("No datanode is started. Should be in SafeMode",
- fs.setSafeMode(SafeModeAction.SAFEMODE_GET));
-
- // manually set safemode.
- fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
-
- // now bring up the datanode and wait for it to be active.
- cluster.startDataNodes(conf, 1, true, null, null);
- cluster.waitActive();
-
- LOG.info("Datanode is started.");
-
- // wait longer than dfs.namenode.safemode.extension
- try {
- Thread.sleep(2000);
- } catch (InterruptedException ignored) {}
-
- assertTrue("should still be in SafeMode",
- fs.setSafeMode(SafeModeAction.SAFEMODE_GET));
-
- fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
- assertFalse("should not be in SafeMode",
- fs.setSafeMode(SafeModeAction.SAFEMODE_GET));
- } finally {
- if(fs != null) fs.close();
- if(cluster!= null) cluster.shutdown();
- }
- }
-
-
/**
* Verify that the NameNode stays in safemode when dfs.safemode.datanode.min
* is set to a number greater than the number of live datanodes.
*/
+ @Test
public void testDatanodeThreshold() throws IOException {
MiniDFSCluster cluster = null;
DistributedFileSystem fs = null;
try {
- Configuration conf = new Configuration();
+ Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1);
diff --git a/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestNNLeaseRecovery.java b/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestNNLeaseRecovery.java
index c2aab08..0c22de8 100644
--- a/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestNNLeaseRecovery.java
+++ b/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestNNLeaseRecovery.java
@@ -39,7 +39,8 @@
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.After;
-import static org.junit.Assert.assertFalse;
+
+import static org.junit.Assert.*;
import org.junit.Before;
import org.junit.Test;
import static org.mockito.Matchers.anyString;
@@ -99,6 +100,17 @@
}
}
+ // Release the lease for the given file
+ private boolean releaseLease(FSNamesystem ns, LeaseManager.Lease lm,
+ Path file) throws IOException {
+ fsn.writeLock();
+ try {
+ return fsn.internalReleaseLease(lm, file.toString(), null);
+ } finally {
+ fsn.writeUnlock();
+ }
+ }
+
/**
* Mocks FSNamesystem instance, adds an empty file and invokes lease recovery
* method.
@@ -118,7 +130,7 @@
fsn.dir.addFile(file.toString(), ps, (short)3, 1l,
"test", "test-machine", dnd, 1001l);
assertTrue("True has to be returned in this case",
- fsn.internalReleaseLease(lm, file.toString(), null));
+ releaseLease(fsn, lm, file));
}
/**
@@ -143,9 +155,9 @@
mockFileBlocks(2, null,
HdfsConstants.BlockUCState.UNDER_CONSTRUCTION, file, dnd, ps, false);
- fsn.internalReleaseLease(lm, file.toString(), null);
- assertTrue("FSNamesystem.internalReleaseLease suppose to throw " +
- "IOException here", false);
+ releaseLease(fsn, lm, file);
+ fail("FSNamesystem.internalReleaseLease suppose to throw " +
+ "IOException here");
}
/**
@@ -169,15 +181,14 @@
mockFileBlocks(2, HdfsConstants.BlockUCState.COMMITTED,
HdfsConstants.BlockUCState.COMMITTED, file, dnd, ps, false);
- fsn.internalReleaseLease(lm, file.toString(), null);
- assertTrue("FSNamesystem.internalReleaseLease suppose to throw " +
- "AlreadyBeingCreatedException here", false);
+ releaseLease(fsn, lm, file);
+ fail("FSNamesystem.internalReleaseLease suppose to throw " +
+ "IOException here");
}
/**
* Mocks FSNamesystem instance, adds an empty file with 0 blocks
* and invokes lease recovery method.
- *
*/
@Test
public void testInternalReleaseLease_0blocks () throws IOException {
@@ -194,7 +205,7 @@
mockFileBlocks(0, null, null, file, dnd, ps, false);
assertTrue("True has to be returned in this case",
- fsn.internalReleaseLease(lm, file.toString(), null));
+ releaseLease(fsn, lm, file));
}
/**
@@ -217,9 +228,9 @@
mockFileBlocks(1, null, HdfsConstants.BlockUCState.COMMITTED, file, dnd, ps, false);
- fsn.internalReleaseLease(lm, file.toString(), null);
- assertTrue("FSNamesystem.internalReleaseLease suppose to throw " +
- "AlreadyBeingCreatedException here", false);
+ releaseLease(fsn, lm, file);
+ fail("FSNamesystem.internalReleaseLease suppose to throw " +
+ "IOException here");
}
/**
@@ -244,7 +255,7 @@
HdfsConstants.BlockUCState.UNDER_CONSTRUCTION, file, dnd, ps, false);
assertFalse("False is expected in return in this case",
- fsn.internalReleaseLease(lm, file.toString(), null));
+ releaseLease(fsn, lm, file));
}
@Test