HDFS-326 Merge with SVN_HEAD of 2010-01-08
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-326@897222 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/.eclipse.templates/.classpath b/.eclipse.templates/.classpath
index a4e8970..680a01e 100644
--- a/.eclipse.templates/.classpath
+++ b/.eclipse.templates/.classpath
@@ -9,20 +9,19 @@
<classpathentry kind="src" path="src/contrib/thriftfs/src/java"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="var" path="ANT_HOME/lib/ant.jar"/>
- <classpathentry kind="lib" path="lib/hadoop-core-0.22.0-dev.jar"/>
- <classpathentry kind="lib" path="lib/hadoop-core-test-0.22.0-dev.jar"/>
+ <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/hadoop-core-0.22.0-SNAPSHOT.jar"/>
+ <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/test/hadoop-core-test-0.22.0-SNAPSHOT.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/commons-cli-1.2.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/commons-codec-1.3.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/commons-el-1.0.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/commons-httpclient-3.0.1.jar"/>
- <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/commons-logging-1.0.4.jar"/>
- <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/commons-logging-api-1.0.4.jar"/>
+ <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/commons-logging-1.1.1.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/commons-net-1.4.1.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/core-3.1.1.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/hsqldb-1.8.0.10.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/jasper-compiler-5.5.12.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/jasper-runtime-5.5.12.jar"/>
- <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/jets3t-0.6.1.jar"/>
+ <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/jets3t-0.7.1.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/jetty-6.1.14.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/jetty-util-6.1.14.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/jsp-2.1-6.1.14.jar"/>
@@ -32,16 +31,17 @@
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/log4j-1.2.15.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/oro-2.0.8.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/servlet-api-2.5-6.1.14.jar"/>
- <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/slf4j-api-1.4.3.jar"/>
- <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/slf4j-log4j12-1.4.3.jar"/>
+ <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/slf4j-api-1.5.8.jar"/>
+ <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/test/slf4j-log4j12-1.4.3.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/xmlenc-0.52.jar"/>
- <classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/aspectjrt-1.5.3.jar"/>
+ <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/test/mockito-all-1.8.0.jar"/>
+ <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/aspectjrt-1.6.5.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/cactus.core.framework.uberjar.javaEE.14-1.8.0.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/cactus.integration.ant-1.8.0.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/cactus.integration.shared.api-1.8.0.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/cargo-ant-0.9.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/cargo-core-uberjar-0.9.jar"/>
- <classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/standard-1.1.2.jar"/>
+ <classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/standard-1.1.2.jar"/>
<classpathentry kind="lib" path="src/contrib/thriftfs/lib/hadoopthriftapi.jar"/>
<classpathentry kind="lib" path="src/contrib/thriftfs/lib/libthrift.jar"/>
<classpathentry kind="lib" path="build/test/classes"/>
diff --git a/CHANGES.txt b/CHANGES.txt
index 3b95e93..552e0f6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -18,29 +18,28 @@
IMPROVEMENTS
- HDFS-704. Unify build property names to facilitate cross-projects
- modifications (cos)
-
HDFS-703. Replace current fault injection implementation with one
from (cos)
HDFS-754. Reduce ivy console output to ovservable level (cos)
- HDFS-699. Add unit tests framework (Mockito) (cos, Eli Collins)
+ HDFS-832. HDFS side of HADOOP-6222. (cos)
- HDFS-630 In DFSOutputStream.nextBlockOutputStream(), the client can
- exclude specific datanodes when locating the next block
- (Cosmin Lehene via Stack)
+ HDFS-840. Change tests to use FileContext test helper introduced in
+ HADOOP-6394. (Jitendra Nath Pandey via suresh)
- HDFS-519. Create new tests for lease recovery (cos)
+ HDFS-685. Use the user-to-groups mapping service in the NameNode. (boryas, acmurthy)
+
+ HDFS-755. Read multiple checksum chunks at once in DFSInputStream.
+ (Todd Lipcon via tomwhite)
+
+ HDFS-786. Implement getContentSummary in HftpFileSystem.
+ (Tsz Wo (Nicholas), SZE via cdouglas)
OPTIMIZATIONS
BUG FIXES
- HDFS-646. Fix test-patch failure by adding test-contrib ant target.
- (gkesavan)
-
HDFS-695. RaidNode should read in configuration from hdfs-site.xml.
(dhruba)
@@ -49,8 +48,6 @@
HDFS-750. Fix build failure due to TestRename. (suresh)
- HDFS-733. TestBlockReport fails intermittently. (cos)
-
HDFS-712. Move libhdfs from mapreduce subproject to hdfs subproject.
(Eli Collins via dhruba)
@@ -62,13 +59,6 @@
HDFS-751. Fix TestCrcCorruption to pick up the correct datablocks to
corrupt. (dhruba)
- HDFS-774. Intermittent race condition in TestFiPipelines (cos)
-
- HDFS-741. TestHFlush test doesn't seek() past previously written part of
- the file (cos, szetszwo)
-
- HDFS-706. Intermittent failures in TestFiHFlush (cos)
-
HDFS-763. Fix slightly misleading report from DataBlockScanner
about corrupted scans. (dhruba)
@@ -81,7 +71,21 @@
HDFS-785. Add Apache license to several namenode unit tests.
(Ravi Phulari via jghoman)
- HDFS-791. Build is broken after HDFS-787 patch has been applied (cos)
+ HDFS-802. Update Eclipse configuration to match changes to Ivy
+ configuration (Edwin Chan via cos)
+
+ HDFS-423. Unbreak FUSE build and fuse_dfs_wrapper.sh (Eli Collins via cos)
+
+ HDFS-825. Build fails to pull latest hadoop-core-* artifacts (cos)
+
+ HDFS-94. The Heap Size printed in the NameNode WebUI is accurate.
+ (Dmytro Molkov via dhruba)
+
+ HDFS-767. An improved retry policy when the DFSClient is unable to fetch a
+ block from the datanode. (Ning Zhang via dhruba)
+
+ HDFS-187. Initialize secondary namenode http address in TestStartup.
+ (Todd Lipcon via szetszwo)
Release 0.21.0 - Unreleased
@@ -186,6 +190,8 @@
HDFS-631. Rename configuration keys towards API standardization and
backward compatibility. (Jitendra Nath Pandey via suresh)
+ HDFS-669. Add unit tests framework (Mockito) (cos, Eli Collins)
+
HDFS-731. Support new Syncable interface in HDFS. (hairong)
HDFS-702. Add HDFS implementation of AbstractFileSystem.
@@ -194,6 +200,9 @@
HDFS-758. Add decommissioning status page to Namenode Web UI.
(Jitendra Nath Pandey via suresh)
+ HDFS-814. Add an api to get the visible length of a DFSDataInputStream.
+ (szetszwo)
+
IMPROVEMENTS
HDFS-381. Remove blocks from DataNode maps when corresponding file
@@ -354,6 +363,9 @@
HDFS-680. Add new access method to a copy of a block's replica. (shv)
+ HDFS-704. Unify build property names to facilitate cross-projects
+ modifications (cos)
+
HDFS-705. Create an adapter to access some of package-private methods of
DataNode from tests (cos)
@@ -393,6 +405,12 @@
HDFS-787. Upgrade some libraries to be consistent with common and
mapreduce. (omalley)
+ HDFS-519. Create new tests for lease recovery (cos)
+
+ HDFS-804. New unit tests for concurrent lease recovery (cos)
+
+ HDFS-813. Enable the append test in TestReadWhileWriting. (szetszwo)
+
BUG FIXES
HDFS-76. Better error message to users when commands fail because of
@@ -540,6 +558,52 @@
HDFS-691. Fix an overflow error in DFSClient.DFSInputStream.available().
(szetszwo)
+ HDFS-733. TestBlockReport fails intermittently. (cos)
+
+ HDFS-774. Intermittent race condition in TestFiPipelines (cos)
+
+ HDFS-741. TestHFlush test doesn't seek() past previously written part of
+ the file (cos, szetszwo)
+
+ HDFS-706. Intermittent failures in TestFiHFlush (cos)
+
+ HDFS-646. Fix test-patch failure by adding test-contrib ant target.
+ (gkesavan)
+
+ HDFS-791. Build is broken after HDFS-787 patch has been applied (cos)
+
+ HDFS-792. TestHDFSCLI is failing. (Todd Lipcon via cos)
+
+ HDFS-781. Namenode metrics PendingDeletionBlocks is not decremented.
+ (Suresh)
+
+ HDFS-192. Fix TestBackupNode failures. (shv)
+
+ HDFS-797. TestHDFSCLI much slower after HDFS-265 merge. (Todd Lipcon via cos)
+
+ HDFS-824. Stop lease checker in TestReadWhileWriting. (szetszwo)
+
+ HDFS-823. CheckPointer should use addInternalServlet for image-fetching
+ servlet (jghoman)
+
+ HDFS-456. Fix URI generation for windows file paths. (shv)
+
+ HDFS-812. FSNamesystem#internalReleaseLease throws NullPointerException on
+ a single-block file's lease recovery. (cos)
+
+ HDFS-724. Pipeline hangs if one of the block receiver is not responsive.
+ (hairong)
+
+ HDFS-564. Adding pipeline tests 17-35. (hairong)
+
+ HDFS-849. TestFiDataTransferProtocol2#pipeline_Fi_18 sometimes fails.
+ (hairong)
+
+ HDFS-762. Balancer causes Null Pointer Exception.
+ (Cristian Ivascu via dhruba)
+
+ HDFS-868. Fix link to Hadoop Upgrade Wiki. (Chris A. Mattmann via shv)
+
Release 0.20.2 - Unreleased
IMPROVEMENTS
@@ -570,6 +634,15 @@
HDFS-596. Fix memory leak in hdfsFreeFileInfo() for libhdfs.
(Zhang Bingjun via dhruba)
+ HDFS-793. Data node should receive the whole packet ack message before it
+ constructs and sends its own ack message for the packet. (hairong)
+
+ HDFS-185. Disallow chown, chgrp, chmod, setQuota, and setSpaceQuota when
+ name-node is in safemode. (Ravi Phulari via shv)
+
+ HDFS-101. DFS write pipeline: DFSClient sometimes does not detect second
+ datanode failure. (hairong)
+
Release 0.20.1 - 2009-09-01
IMPROVEMENTS
diff --git a/build.xml b/build.xml
index ef0a9a4..05ca157 100644
--- a/build.xml
+++ b/build.xml
@@ -1086,6 +1086,8 @@
<env key="JVM_ARCH" value="${jvm.arch}"/>
<arg value="install"/>
</exec>
+ <!-- Create a build platform-agnostic link to c++ libs -->
+ <symlink overwrite="true" link="${build.dir}/c++/lib" resource="${install.c++}/lib"/>
</target>
<target name="compile-ant-tasks" depends="compile-core">
diff --git a/ivy/ivysettings.xml b/ivy/ivysettings.xml
index 40b5843..9e10e41 100644
--- a/ivy/ivysettings.xml
+++ b/ivy/ivysettings.xml
@@ -39,14 +39,15 @@
<resolvers>
<ibiblio name="maven2" root="${repo.maven.org}" pattern="${maven2.pattern.ext}" m2compatible="true"/>
- <ibiblio name="apache-snapshot" root="${snapshot.apache.org}" m2compatible="true"/>
+ <ibiblio name="apache-snapshot" root="${snapshot.apache.org}" m2compatible="true"
+ checkmodified="true" changingPattern=".*SNAPSHOT"/>
<filesystem name="fs" m2compatible="true" force="true">
<artifact pattern="${repo.dir}/org/apache/hadoop/[module]/[revision]/[module]-[revision].[ext]"/>
<ivy pattern="${repo.dir}/org/apache/hadoop/[module]/[revision]/[module]-[revision].pom"/>
</filesystem>
- <chain name="default" dual="true">
+ <chain name="default" dual="true" checkmodified="true" changingPattern=".*SNAPSHOT">
<resolver ref="apache-snapshot"/>
<resolver ref="maven2"/>
</chain>
diff --git a/src/contrib/build-contrib.xml b/src/contrib/build-contrib.xml
index c7d57cf..8d62945 100644
--- a/src/contrib/build-contrib.xml
+++ b/src/contrib/build-contrib.xml
@@ -43,6 +43,9 @@
<property name="test.timeout" value="900000"/>
<property name="build.dir" location="${hadoop.root}/build/contrib/${name}"/>
<property name="build.classes" location="${build.dir}/classes"/>
+ <!-- NB: sun.arch.data.model is not supported on all platforms -->
+ <property name="build.platform"
+ value="${os.name}-${os.arch}-${sun.arch.data.model}"/>
<property name="build.test" location="${build.dir}/test"/>
<property name="build.examples" location="${build.dir}/examples"/>
<property name="hadoop.log.dir" location="${build.dir}/test/logs"/>
diff --git a/src/contrib/fuse-dfs/build.xml b/src/contrib/fuse-dfs/build.xml
index 6388fdd..e8ab8d3 100644
--- a/src/contrib/fuse-dfs/build.xml
+++ b/src/contrib/fuse-dfs/build.xml
@@ -32,9 +32,9 @@
<target name="check-libhdfs-exists" if="fusedfs">
- <property name="libhdfs.lib" value="${hadoop.root}/build/libhdfs/libhdfs.so"/>
+ <property name="libhdfs.lib" value="${hadoop.root}/build/c++/${build.platform}/lib/libhdfs.so"/>
<available file="${libhdfs.lib}" property="libhdfs-exists"/>
- <fail message="libhdfs.so does not exist: ${libhdfs.lib}. Please check flags -Dlibhdfs=1 -Dfusedfs=1 are set or first try ant compile-libhdfs -Dlibhdfs=1">
+ <fail message="libhdfs.so does not exist: ${libhdfs.lib}. Please check flags -Dlibhdfs=1 -Dfusedfs=1 are set or first try ant compile -Dcompile.c++=true -Dlibhdfs=true">
<condition>
<not><isset property="libhdfs-exists"/></not>
</condition>
@@ -59,7 +59,7 @@
<env key="OS_ARCH" value="${os.arch}"/>
<env key="HADOOP_HOME" value="${hadoop.root}"/>
<env key="PACKAGE_VERSION" value="0.1.0"/>
-
+ <env key="BUILD_PLATFORM" value="${build.platform}" />
<env key="PERMS" value="${perms}"/>
</exec>
<mkdir dir="${build.dir}"/>
diff --git a/src/contrib/fuse-dfs/src/Makefile.am b/src/contrib/fuse-dfs/src/Makefile.am
index 3b978e9..053a2fe 100644
--- a/src/contrib/fuse-dfs/src/Makefile.am
+++ b/src/contrib/fuse-dfs/src/Makefile.am
@@ -17,5 +17,4 @@
bin_PROGRAMS = fuse_dfs
fuse_dfs_SOURCES = fuse_dfs.c fuse_options.c fuse_trash.c fuse_stat_struct.c fuse_users.c fuse_init.c fuse_connect.c fuse_impls_access.c fuse_impls_chmod.c fuse_impls_chown.c fuse_impls_create.c fuse_impls_flush.c fuse_impls_getattr.c fuse_impls_mkdir.c fuse_impls_mknod.c fuse_impls_open.c fuse_impls_read.c fuse_impls_release.c fuse_impls_readdir.c fuse_impls_rename.c fuse_impls_rmdir.c fuse_impls_statfs.c fuse_impls_symlink.c fuse_impls_truncate.c fuse_impls_utimens.c fuse_impls_unlink.c fuse_impls_write.c
AM_CPPFLAGS= -DPERMS=$(PERMS) -D_FILE_OFFSET_BITS=64 -I$(JAVA_HOME)/include -I$(HADOOP_HOME)/src/c++/libhdfs/ -I$(JAVA_HOME)/include/linux/ -D_FUSE_DFS_VERSION=\"$(PACKAGE_VERSION)\" -DPROTECTED_PATHS=\"$(PROTECTED_PATHS)\" -I$(FUSE_HOME)/include
-AM_LDFLAGS= -L$(HADOOP_HOME)/build/libhdfs -lhdfs -L$(FUSE_HOME)/lib -lfuse -L$(JAVA_HOME)/jre/lib/$(OS_ARCH)/server -ljvm
-
+AM_LDFLAGS= -L$(HADOOP_HOME)/build/c++/$(BUILD_PLATFORM)/lib -lhdfs -L$(FUSE_HOME)/lib -lfuse -L$(JAVA_HOME)/jre/lib/$(OS_ARCH)/server -ljvm
diff --git a/src/contrib/fuse-dfs/src/fuse_dfs_wrapper.sh b/src/contrib/fuse-dfs/src/fuse_dfs_wrapper.sh
index 35a6c6b..cf0fbcb 100755
--- a/src/contrib/fuse-dfs/src/fuse_dfs_wrapper.sh
+++ b/src/contrib/fuse-dfs/src/fuse_dfs_wrapper.sh
@@ -1,3 +1,4 @@
+#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
@@ -19,12 +20,6 @@
export HADOOP_HOME=/usr/local/share/hadoop
fi
-export PATH=$HADOOP_HOME/contrib/fuse_dfs:$PATH
-
-for f in ls $HADOOP_HOME/lib/*.jar $HADOOP_HOME/*.jar ; do
-export CLASSPATH=$CLASSPATH:$f
-done
-
if [ "$OS_ARCH" = "" ]; then
export OS_ARCH=amd64
fi
@@ -37,4 +32,17 @@
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/$OS_ARCH/server:/usr/local/share/hdfs/libhdfs/:/usr/local/lib
fi
-./fuse_dfs $@
+# If dev build set paths accordingly
+if [ -d $HADOOP_HDFS_HOME/build ]; then
+ export HADOOP_HOME=$HADOOP_HDFS_HOME
+ for f in ${HADOOP_HOME}/build/*.jar ; do
+ export CLASSPATH=$CLASSPATH:$f
+ done
+ for f in $HADOOP_HOME/build/ivy/lib/Hadoop-Hdfs/common/*.jar ; do
+ export CLASSPATH=$CLASSPATH:$f
+ done
+ export PATH=$HADOOP_HOME/build/contrib/fuse-dfs:$PATH
+ export LD_LIBRARY_PATH=$HADOOP_HOME/build/c++/lib:$JAVA_HOME/jre/lib/$OS_ARCH/server
+fi
+
+fuse_dfs $@
diff --git a/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml b/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml
index 3c91748..df813d5 100644
--- a/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml
+++ b/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml
@@ -530,7 +530,7 @@
of Hadoop and rollback the cluster to the state it was in
before
the upgrade. HDFS upgrade is described in more detail in
- <a href="http://wiki.apache.org/hadoop/Hadoop%20Upgrade">Hadoop Upgrade</a> Wiki page.
+ <a href="http://wiki.apache.org/hadoop/Hadoop_Upgrade">Hadoop Upgrade</a> Wiki page.
HDFS can have one such backup at a time. Before upgrading,
administrators need to remove existing backup using <code>bin/hadoop
dfsadmin -finalizeUpgrade</code> command. The following
diff --git a/src/java/hdfs-default.xml b/src/java/hdfs-default.xml
index 3cecb5d..8f35f58 100644
--- a/src/java/hdfs-default.xml
+++ b/src/java/hdfs-default.xml
@@ -169,7 +169,7 @@
<property>
<name>dfs.namenode.name.dir</name>
- <value>${hadoop.tmp.dir}/dfs/name</value>
+ <value>file://${hadoop.tmp.dir}/dfs/name</value>
<description>Determines where on the local filesystem the DFS name node
should store the name table(fsimage). If this is a comma-delimited list
of directories then the name table is replicated in all of the
@@ -447,7 +447,7 @@
<property>
<name>dfs.namenode.checkpoint.dir</name>
- <value>${hadoop.tmp.dir}/dfs/namesecondary</value>
+ <value>file://${hadoop.tmp.dir}/dfs/namesecondary</value>
<description>Determines where on the local filesystem the DFS secondary
name node should store the temporary images to merge.
If this is a comma-delimited list of directories then the image is
diff --git a/src/java/org/apache/hadoop/hdfs/DFSClient.java b/src/java/org/apache/hadoop/hdfs/DFSClient.java
index 2e204c7..09bafd1 100644
--- a/src/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/src/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -87,6 +87,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
import org.apache.hadoop.hdfs.security.BlockAccessToken;
import org.apache.hadoop.hdfs.security.InvalidAccessTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -1510,26 +1511,60 @@
}
}
- int chunkLen = Math.min(dataLeft, bytesPerChecksum);
-
- if ( chunkLen > 0 ) {
- // len should be >= chunkLen
- IOUtils.readFully(in, buf, offset, chunkLen);
- checksumBytes.get(checksumBuf, 0, checksumSize);
+ // Sanity checks
+ assert len >= bytesPerChecksum;
+ assert checksum != null;
+ assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0);
+
+
+ int checksumsToRead, bytesToRead;
+
+ if (checksumSize > 0) {
+
+ // How many chunks left in our stream - this is a ceiling
+ // since we may have a partial chunk at the end of the file
+ int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1;
+
+ // How many chunks we can fit in databuffer
+ // - note this is a floor since we always read full chunks
+ int chunksCanFit = Math.min(len / bytesPerChecksum,
+ checksumBuf.length / checksumSize);
+
+ // How many chunks should we read
+ checksumsToRead = Math.min(chunksLeft, chunksCanFit);
+ // How many bytes should we actually read
+ bytesToRead = Math.min(
+ checksumsToRead * bytesPerChecksum, // full chunks
+ dataLeft); // in case we have a partial
+ } else {
+ // no checksum
+ bytesToRead = Math.min(dataLeft, len);
+ checksumsToRead = 0;
+ }
+
+ if ( bytesToRead > 0 ) {
+ // Assert we have enough space
+ assert bytesToRead <= len;
+ assert checksumBytes.remaining() >= checksumSize * checksumsToRead;
+ assert checksumBuf.length >= checksumSize * checksumsToRead;
+ IOUtils.readFully(in, buf, offset, bytesToRead);
+ checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead);
}
- dataLeft -= chunkLen;
+ dataLeft -= bytesToRead;
+ assert dataLeft >= 0;
+
lastChunkOffset = chunkOffset;
- lastChunkLen = chunkLen;
+ lastChunkLen = bytesToRead;
- if ((dataLeft == 0 && isLastPacket) || chunkLen == 0) {
+ if ((dataLeft == 0 && isLastPacket) || bytesToRead == 0) {
gotEOS = true;
}
- if ( chunkLen == 0 ) {
+ if ( bytesToRead == 0 ) {
return -1;
}
-
- return chunkLen;
+
+ return bytesToRead;
}
private BlockReader( String file, long blockId, DataInputStream in,
@@ -1661,7 +1696,7 @@
* DFSInputStream provides bytes from a named file. It handles
* negotiation of the namenode and various datanodes as necessary.
****************************************************************/
- class DFSInputStream extends FSInputStream {
+ private class DFSInputStream extends FSInputStream {
private Socket s = null;
private boolean closed = false;
@@ -1676,6 +1711,7 @@
private long pos = 0;
private long blockEnd = -1;
private int failures = 0;
+ private int timeWindow = 3000; // wait time window (in msec) if BlockMissingException is caught
/* XXX Use of CocurrentHashMap is temp fix. Need to fix
* parallel accesses to DFSInputStream (through ptreads) properly */
@@ -1695,6 +1731,7 @@
this.buffersize = buffersize;
this.src = src;
prefetchSize = conf.getLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, prefetchSize);
+ timeWindow = conf.getInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, timeWindow);
openInfo();
}
@@ -2147,7 +2184,19 @@
+ " from any node: " + ie
+ ". Will get new block locations from namenode and retry...");
try {
- Thread.sleep(3000);
+ // Introducing a random factor to the wait time before another retry.
+ // The wait time is dependent on # of failures and a random factor.
+ // At the first time of getting a BlockMissingException, the wait time
+ // is a random number between 0..3000 ms. If the first retry
+ // still fails, we will wait 3000 ms grace period before the 2nd retry.
+ // Also at the second retry, the waiting window is expanded to 6000 ms
+ // alleviating the request rate from the server. Similarly the 3rd retry
+ // will wait 6000ms grace period before retry and the waiting window is
+ // expanded to 9000ms.
+ double waitTime = timeWindow * failures + // grace period for the last round of attempt
+ timeWindow * (failures + 1) * r.nextDouble(); // expanding time window for each failure
+ LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec.");
+ Thread.sleep((long)waitTime);
} catch (InterruptedException iex) {
}
deadNodes.clear(); //2nd option is to remove only nodes[blockId]
@@ -2392,6 +2441,9 @@
}
}
+ /**
+ * The Hdfs implementation of {@link FSDataInputStream}
+ */
public static class DFSDataInputStream extends FSDataInputStream {
public DFSDataInputStream(DFSInputStream in)
throws IOException {
@@ -2419,6 +2471,12 @@
return ((DFSInputStream)in).getAllBlocks();
}
+ /**
+ * @return The visible length of the file.
+ */
+ public long getVisibleLength() throws IOException {
+ return ((DFSInputStream)in).getFileLength();
+ }
}
/****************************************************************
@@ -2482,7 +2540,27 @@
int dataPos;
int checksumStart;
int checksumPos;
-
+ private static final long HEART_BEAT_SEQNO = -1L;
+
+ /**
+ * create a heartbeat packet
+ */
+ Packet() {
+ this.lastPacketInBlock = false;
+ this.numChunks = 0;
+ this.offsetInBlock = 0;
+ this.seqno = HEART_BEAT_SEQNO;
+
+ buffer = null;
+ int packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
+ buf = new byte[packetSize];
+
+ checksumStart = dataStart = packetSize;
+ checksumPos = checksumStart;
+ dataPos = dataStart;
+ maxChunks = 0;
+ }
+
// create a new packet
Packet(int pktSize, int chunksPerPkt, long offsetInBlock) {
this.lastPacketInBlock = false;
@@ -2569,6 +2647,14 @@
return offsetInBlock + dataPos - dataStart;
}
+ /**
+ * Check if this packet is a heart beat packet
+ * @return true if the sequence number is HEART_BEAT_SEQNO
+ */
+ private boolean isHeartbeatPacket() {
+ return seqno == HEART_BEAT_SEQNO;
+ }
+
public String toString() {
return "packet seqno:" + this.seqno +
" offsetInBlock:" + this.offsetInBlock +
@@ -2593,7 +2679,6 @@
private DataInputStream blockReplyStream;
private ResponseProcessor response = null;
private volatile DatanodeInfo[] nodes = null; // list of targets for current block
- private ArrayList<DatanodeInfo> excludedNodes = new ArrayList<DatanodeInfo>();
volatile boolean hasError = false;
volatile int errorIndex = -1;
private BlockConstructionStage stage; // block construction stage
@@ -2687,6 +2772,7 @@
* and closes them. Any error recovery is also done by this thread.
*/
public void run() {
+ long lastPacket = System.currentTimeMillis();
while (!streamerClosed && clientRunning) {
// if the Responder encountered an error, shutdown Responder
@@ -2710,19 +2796,32 @@
synchronized (dataQueue) {
// wait for a packet to be sent.
+ long now = System.currentTimeMillis();
while ((!streamerClosed && !hasError && clientRunning
- && dataQueue.size() == 0) || doSleep) {
+ && dataQueue.size() == 0 &&
+ (stage != BlockConstructionStage.DATA_STREAMING ||
+ stage == BlockConstructionStage.DATA_STREAMING &&
+ now - lastPacket < socketTimeout/2)) || doSleep ) {
+ long timeout = socketTimeout/2 - (now-lastPacket);
+ timeout = timeout <= 0 ? 1000 : timeout;
+ timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
+ timeout : 1000;
try {
- dataQueue.wait(1000);
+ dataQueue.wait(timeout);
} catch (InterruptedException e) {
}
doSleep = false;
+ now = System.currentTimeMillis();
}
- if (streamerClosed || hasError || dataQueue.size() == 0 || !clientRunning) {
+ if (streamerClosed || hasError || !clientRunning) {
continue;
}
// get packet to be sent.
- one = dataQueue.getFirst();
+ if (dataQueue.isEmpty()) {
+ one = new Packet(); // heartbeat packet
+ } else {
+ one = dataQueue.getFirst(); // regular data packet
+ }
}
// get new block from namenode.
@@ -2768,9 +2867,11 @@
synchronized (dataQueue) {
// move packet from dataQueue to ackQueue
- dataQueue.removeFirst();
- ackQueue.addLast(one);
- dataQueue.notifyAll();
+ if (!one.isHeartbeatPacket()) {
+ dataQueue.removeFirst();
+ ackQueue.addLast(one);
+ dataQueue.notifyAll();
+ }
}
if (LOG.isDebugEnabled()) {
@@ -2781,6 +2882,10 @@
// write out data to remote datanode
blockStream.write(buf.array(), buf.position(), buf.remaining());
blockStream.flush();
+ lastPacket = System.currentTimeMillis();
+
+ if (one.isHeartbeatPacket()) { //heartbeat packet
+ }
// update bytesSent
long tmpBytesSent = one.getLastByteOffsetBlock();
@@ -2849,6 +2954,9 @@
*/
void close(boolean force) {
streamerClosed = true;
+ synchronized (dataQueue) {
+ dataQueue.notifyAll();
+ }
if (force) {
this.interrupt();
}
@@ -2902,45 +3010,22 @@
public void run() {
this.setName("ResponseProcessor for block " + block);
+ PipelineAck ack = new PipelineAck();
while (!responderClosed && clientRunning && !isLastPacketInBlock) {
// process responses from datanodes.
try {
- // verify seqno from datanode
- long seqno = blockReplyStream.readLong();
- LOG.debug("DFSClient received ack for seqno " + seqno);
- Packet one = null;
- if (seqno == -1) {
- continue;
- } else if (seqno == -2) {
- // no nothing
- } else {
- synchronized (dataQueue) {
- one = ackQueue.getFirst();
- }
- if (one.seqno != seqno) {
- throw new IOException("Responseprocessor: Expecting seqno " +
- " for block " + block +
- one.seqno + " but received " + seqno);
- }
- isLastPacketInBlock = one.lastPacketInBlock;
- }
-
- // processes response status from all datanodes.
- String replies = null;
+ // read an ack from the pipeline
+ ack.readFields(blockReplyStream);
if (LOG.isDebugEnabled()) {
- replies = "DFSClient Replies for seqno " + seqno + " are";
+ LOG.debug("DFSClient " + ack);
}
- for (int i = 0; i < targets.length && clientRunning; i++) {
- final DataTransferProtocol.Status reply
- = DataTransferProtocol.Status.read(blockReplyStream);
- if (LOG.isDebugEnabled()) {
- replies += " " + reply;
- }
+
+ long seqno = ack.getSeqno();
+ // processes response status from datanodes.
+ for (int i = ack.getNumOfReplies()-1; i >=0 && clientRunning; i--) {
+ final DataTransferProtocol.Status reply = ack.getReply(i);
if (reply != SUCCESS) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(replies);
- }
errorIndex = i; // first bad datanode
throw new IOException("Bad response " + reply +
" for block " + block +
@@ -2948,16 +3033,24 @@
targets[i].getName());
}
}
+
+ assert seqno != PipelineAck.UNKOWN_SEQNO :
+ "Ack for unkown seqno should be a failed ack: " + ack;
+ if (seqno == Packet.HEART_BEAT_SEQNO) { // a heartbeat ack
+ continue;
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug(replies);
+ // a success ack for a data packet
+ Packet one = null;
+ synchronized (dataQueue) {
+ one = ackQueue.getFirst();
}
-
- if (one == null) {
- throw new IOException("Panic: responder did not receive " +
- "an ack for a packet: " + seqno);
+ if (one.seqno != seqno) {
+ throw new IOException("Responseprocessor: Expecting seqno " +
+ " for block " + block +
+ one.seqno + " but received " + seqno);
}
-
+ isLastPacketInBlock = one.lastPacketInBlock;
// update bytesAcked
block.setNumBytes(one.getLastByteOffsetBlock());
@@ -3118,9 +3211,7 @@
success = false;
long startTime = System.currentTimeMillis();
- DatanodeInfo[] w = excludedNodes.toArray(
- new DatanodeInfo[excludedNodes.size()]);
- lb = locateFollowingBlock(startTime, w.length > 0 ? w : null);
+ lb = locateFollowingBlock(startTime);
block = lb.getBlock();
block.setNumBytes(0);
accessToken = lb.getAccessToken();
@@ -3136,16 +3227,12 @@
namenode.abandonBlock(block, src, clientName);
block = null;
- LOG.info("Excluding datanode " + nodes[errorIndex]);
- excludedNodes.add(nodes[errorIndex]);
-
// Connection failed. Let's wait a little bit and retry
retry = true;
try {
if (System.currentTimeMillis() - startTime > 5000) {
LOG.info("Waiting to find target node: " + nodes[0].getName());
}
- //TODO fix this timout. Extract it o a constant, maybe make it available from conf
Thread.sleep(6000);
} catch (InterruptedException iex) {
}
@@ -3243,15 +3330,14 @@
}
}
- private LocatedBlock locateFollowingBlock(long start,
- DatanodeInfo[] excludedNodes) throws IOException {
+ private LocatedBlock locateFollowingBlock(long start) throws IOException {
int retries = conf.getInt("dfs.client.block.write.locateFollowingBlock.retries", 5);
long sleeptime = 400;
while (true) {
long localstart = System.currentTimeMillis();
while (true) {
try {
- return namenode.addBlock(src, clientName, block, excludedNodes);
+ return namenode.addBlock(src, clientName, block);
} catch (RemoteException e) {
IOException ue =
e.unwrapRemoteException(FileNotFoundException.class,
diff --git a/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index a2054a1..fda289e 100644
--- a/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -95,6 +95,7 @@
public static final String DFS_NAMENODE_NAME_DIR_KEY = "dfs.namenode.name.dir";
public static final String DFS_NAMENODE_EDITS_DIR_KEY = "dfs.namenode.edits.dir";
public static final String DFS_CLIENT_READ_PREFETCH_SIZE_KEY = "dfs.client.read.prefetch.size";
+ public static final String DFS_CLIENT_RETRY_WINDOW_BASE= "dfs.client.retry.window.base";
public static final String DFS_METRICS_SESSION_ID_KEY = "dfs.metrics.session-id";
public static final String DFS_DATANODE_HOST_NAME_KEY = "dfs.datanode.hostname";
public static final String DFS_DATANODE_STORAGEID_KEY = "dfs.datanode.StorageId";
diff --git a/src/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java b/src/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
index b43f669..6048df8 100644
--- a/src/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
+++ b/src/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
@@ -22,6 +22,7 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.security.RefreshUserToGroupMappingsProtocol;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
import org.apache.hadoop.security.authorize.Service;
@@ -41,6 +42,8 @@
new Service("security.namenode.protocol.acl", NamenodeProtocol.class),
new Service("security.refresh.policy.protocol.acl",
RefreshAuthorizationPolicyProtocol.class),
+ new Service("security.refresh.usertogroups.mappings.protocol.acl",
+ RefreshUserToGroupMappingsProtocol.class),
};
@Override
diff --git a/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java b/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java
index f41a81b..0e5e531 100644
--- a/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java
+++ b/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java
@@ -36,6 +36,7 @@
import javax.security.auth.login.LoginException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -329,4 +330,101 @@
throw new IOException("Not supported");
}
+ /**
+ * A parser for parsing {@link ContentSummary} xml.
+ */
+ private class ContentSummaryParser extends DefaultHandler {
+ private ContentSummary contentsummary;
+
+ /** {@inheritDoc} */
+ public void startElement(String ns, String localname, String qname,
+ Attributes attrs) throws SAXException {
+ if (!ContentSummary.class.getName().equals(qname)) {
+ if (RemoteException.class.getSimpleName().equals(qname)) {
+ throw new SAXException(RemoteException.valueOf(attrs));
+ }
+ throw new SAXException("Unrecognized entry: " + qname);
+ }
+
+ contentsummary = toContentSummary(attrs);
+ }
+
+ /**
+ * Connect to the name node and get content summary.
+ * @param path The path
+ * @return The content summary for the path.
+ * @throws IOException
+ */
+ private ContentSummary getContentSummary(String path) throws IOException {
+ final HttpURLConnection connection = openConnection(
+ "/contentSummary" + path, "ugi=" + ugi);
+ InputStream in = null;
+ try {
+ in = connection.getInputStream();
+
+ final XMLReader xr = XMLReaderFactory.createXMLReader();
+ xr.setContentHandler(this);
+ xr.parse(new InputSource(in));
+ } catch(FileNotFoundException fnfe) {
+ //the server may not support getContentSummary
+ return null;
+ } catch(SAXException saxe) {
+ final Exception embedded = saxe.getException();
+ if (embedded != null && embedded instanceof IOException) {
+ throw (IOException)embedded;
+ }
+ throw new IOException("Invalid xml format", saxe);
+ } finally {
+ if (in != null) {
+ in.close();
+ }
+ connection.disconnect();
+ }
+ return contentsummary;
+ }
+ }
+
+ /** Return the object represented in the attributes. */
+ private static ContentSummary toContentSummary(Attributes attrs
+ ) throws SAXException {
+ final String length = attrs.getValue("length");
+ final String fileCount = attrs.getValue("fileCount");
+ final String directoryCount = attrs.getValue("directoryCount");
+ final String quota = attrs.getValue("quota");
+ final String spaceConsumed = attrs.getValue("spaceConsumed");
+ final String spaceQuota = attrs.getValue("spaceQuota");
+
+ if (length == null
+ || fileCount == null
+ || directoryCount == null
+ || quota == null
+ || spaceConsumed == null
+ || spaceQuota == null) {
+ return null;
+ }
+
+ try {
+ return new ContentSummary(
+ Long.parseLong(length),
+ Long.parseLong(fileCount),
+ Long.parseLong(directoryCount),
+ Long.parseLong(quota),
+ Long.parseLong(spaceConsumed),
+ Long.parseLong(spaceQuota));
+ } catch(Exception e) {
+ throw new SAXException("Invalid attributes: length=" + length
+ + ", fileCount=" + fileCount
+ + ", directoryCount=" + directoryCount
+ + ", quota=" + quota
+ + ", spaceConsumed=" + spaceConsumed
+ + ", spaceQuota=" + spaceQuota, e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ public ContentSummary getContentSummary(Path f) throws IOException {
+ final String s = makeQualified(f).toUri().getPath();
+ final ContentSummary cs = new ContentSummaryParser().getContentSummary(s);
+ return cs != null? cs: super.getContentSummary(f);
+ }
}
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
index d0a672f..a740ee1 100644
--- a/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
+++ b/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
@@ -198,9 +198,6 @@
public LocatedBlock addBlock(String src, String clientName,
Block previous) throws IOException;
- public LocatedBlock addBlock(String src, String clientName,
- Block previous, DatanodeInfo[] excludedNode) throws IOException;
-
/**
* The client is done writing data to the given filename, and would
* like to complete it.
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java b/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
index 1bd2427..3166f33 100644
--- a/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
+++ b/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.hdfs.security.BlockAccessToken;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
/**
@@ -39,12 +40,11 @@
* when protocol changes. It is not very obvious.
*/
/*
- * Version 17:
- * Change the block write protocol to support pipeline recovery.
- * Additional fields, like recovery flags, new GS, minBytesRcvd,
- * and maxBytesRcvd are included.
+ * Version 19:
+ * Change the block packet ack protocol to include seqno,
+ * numberOfReplies, reply0, reply1, ...
*/
- public static final int DATA_TRANSFER_VERSION = 17;
+ public static final int DATA_TRANSFER_VERSION = 19;
/** Operation */
public enum Op {
@@ -453,4 +453,98 @@
return t;
}
}
+
+ /** reply **/
+ public static class PipelineAck implements Writable {
+ private long seqno;
+ private Status replies[];
+ public final static long UNKOWN_SEQNO = -2;
+
+ /** default constructor **/
+ public PipelineAck() {
+ }
+
+ /**
+ * Constructor
+ * @param seqno sequence number
+ * @param replies an array of replies
+ */
+ public PipelineAck(long seqno, Status[] replies) {
+ this.seqno = seqno;
+ this.replies = replies;
+ }
+
+ /**
+ * Get the sequence number
+ * @return the sequence number
+ */
+ public long getSeqno() {
+ return seqno;
+ }
+
+ /**
+ * Get the number of replies
+ * @return the number of replies
+ */
+ public short getNumOfReplies() {
+ return (short)replies.length;
+ }
+
+ /**
+ * get the ith reply
+ * @return the the ith reply
+ */
+ public Status getReply(int i) {
+ if (i<0 || i>=replies.length) {
+ throw new IllegalArgumentException("The input parameter " + i +
+ " should in the range of [0, " + replies.length);
+ }
+ return replies[i];
+ }
+
+ /**
+ * Check if this ack contains error status
+ * @return true if all statuses are SUCCESS
+ */
+ public boolean isSuccess() {
+ for (Status reply : replies) {
+ if (reply != Status.SUCCESS) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**** Writable interface ****/
+ @Override // Writable
+ public void readFields(DataInput in) throws IOException {
+ seqno = in.readLong();
+ short numOfReplies = in.readShort();
+ replies = new Status[numOfReplies];
+ for (int i=0; i<numOfReplies; i++) {
+ replies[i] = Status.read(in);
+ }
+ }
+
+ @Override // Writable
+ public void write(DataOutput out) throws IOException {
+ //WritableUtils.writeVLong(out, seqno);
+ out.writeLong(seqno);
+ out.writeShort((short)replies.length);
+ for(Status reply : replies) {
+ reply.write(out);
+ }
+ }
+
+ @Override //Object
+ public String toString() {
+ StringBuilder ack = new StringBuilder("Replies for seqno ");
+ ack.append( seqno ).append( " are" );
+ for(Status reply : replies) {
+ ack.append(" ");
+ ack.append(reply);
+ }
+ return ack.toString();
+ }
+ }
}
diff --git a/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
index 65370eb..63d4939 100644
--- a/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
+++ b/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
@@ -789,7 +789,6 @@
/** Default constructor */
Balancer() throws UnsupportedActionException {
- checkReplicationPolicyCompatibility(getConf());
}
/** Construct a balancer from the given configuration */
diff --git a/src/java/org/apache/hadoop/hdfs/server/common/Util.java b/src/java/org/apache/hadoop/hdfs/server/common/Util.java
index 5e8de72..d754237 100644
--- a/src/java/org/apache/hadoop/hdfs/server/common/Util.java
+++ b/src/java/org/apache/hadoop/hdfs/server/common/Util.java
@@ -17,7 +17,19 @@
*/
package org.apache.hadoop.hdfs.server.common;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
public final class Util {
+ private final static Log LOG = LogFactory.getLog(Util.class.getName());
+
/**
* Current system time.
* @return current time in msec.
@@ -25,4 +37,58 @@
public static long now() {
return System.currentTimeMillis();
}
-}
\ No newline at end of file
+
+ /**
+ * Interprets the passed string as a URI. In case of error it
+ * assumes the specified string is a file.
+ *
+ * @param s the string to interpret
+ * @return the resulting URI
+ * @throws IOException
+ */
+ public static URI stringAsURI(String s) throws IOException {
+ URI u = null;
+ // try to make a URI
+ try {
+ u = new URI(s);
+ } catch (URISyntaxException e){
+ LOG.warn("Path " + s + " should be specified as a URI " +
+ "in configuration files. Please update hdfs configuration.", e);
+ }
+
+ // if URI is null or scheme is undefined, then assume it's file://
+ if(u == null || u.getScheme() == null){
+ u = fileAsURI(new File(s));
+ }
+ return u;
+ }
+
+ /**
+ * Converts the passed File to a URI.
+ *
+ * @param f the file to convert
+ * @return the resulting URI
+ * @throws IOException
+ */
+ public static URI fileAsURI(File f) throws IOException {
+ return f.getCanonicalFile().toURI();
+ }
+
+ /**
+ * Converts a collection of strings into a collection of URIs.
+ * @param names collection of strings to convert to URIs
+ * @return collection of URIs
+ */
+ public static Collection<URI> stringCollectionAsURIs(
+ Collection<String> names) {
+ Collection<URI> uris = new ArrayList<URI>(names.size());
+ for(String name : names) {
+ try {
+ uris.add(stringAsURI(name));
+ } catch (IOException e) {
+ LOG.error("Error while processing URI: " + name, e);
+ }
+ }
+ return uris;
+ }
+}
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 5aab2b5..d65a598 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -35,11 +35,11 @@
import org.apache.hadoop.fs.FSInputChecker;
import org.apache.hadoop.fs.FSOutputSummer;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon;
@@ -77,6 +77,7 @@
private Checksum partialCrc = null;
private final DataNode datanode;
final private ReplicaInPipelineInterface replicaInfo;
+ volatile private boolean mirrorError;
BlockReceiver(Block block, DataInputStream in, String inAddr,
String myAddr, BlockConstructionStage stage,
@@ -217,21 +218,19 @@
/**
* While writing to mirrorOut, failure to write to mirror should not
- * affect this datanode unless a client is writing the block.
+ * affect this datanode unless it is caused by interruption.
*/
private void handleMirrorOutError(IOException ioe) throws IOException {
LOG.info(datanode.dnRegistration + ":Exception writing block " +
block + " to mirror " + mirrorAddr + "\n" +
StringUtils.stringifyException(ioe));
- mirrorOut = null;
- //
- // If stream-copy fails, continue
- // writing to disk for replication requests. For client
- // writes, return error so that the client can do error
- // recovery.
- //
- if (clientName.length() > 0) {
+ if (Thread.interrupted()) { // shut down if the thread is interrupted
throw ioe;
+ } else { // encounter an error while writing to mirror
+ // continue to run even if can not write to mirror
+ // notify client of the error
+ // and wait for the client to shut down the pipeline
+ mirrorError = true;
}
}
@@ -433,6 +432,14 @@
return receivePacket(offsetInBlock, seqno, lastPacketInBlock, len, endOfHeader);
}
+ /**
+ * Write the received packet to disk (data only)
+ */
+ private void writePacketToDisk(byte[] pktBuf, int startByteToDisk,
+ int numBytesToDisk) throws IOException {
+ out.write(pktBuf, startByteToDisk, numBytesToDisk);
+ }
+
/**
* Receives and processes a packet. It can contain many chunks.
* returns the number of data bytes that the packet has.
@@ -461,7 +468,7 @@
}
//First write the packet to the mirror:
- if (mirrorOut != null) {
+ if (mirrorOut != null && !mirrorError) {
try {
mirrorOut.write(buf.array(), buf.position(), buf.remaining());
mirrorOut.flush();
@@ -469,7 +476,7 @@
handleMirrorOutError(e);
}
}
-
+
buf.position(endOfHeader);
if (lastPacketInBlock || len == 0) {
@@ -525,7 +532,7 @@
int startByteToDisk = dataOff+(int)(onDiskLen-firstByteInBlock);
int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
- out.write(pktBuf, startByteToDisk, numBytesToDisk);
+ writePacketToDisk(pktBuf, startByteToDisk, numBytesToDisk);
// If this is a partial chunk, then verify that this is the only
// chunk in the packet. Calculate new crc for this chunk.
@@ -560,7 +567,7 @@
throttler.throttle(len);
}
- return len;
+ return lastPacketInBlock?-1:len;
}
void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
@@ -584,14 +591,15 @@
if (clientName.length() > 0) {
responder = new Daemon(datanode.threadGroup,
new PacketResponder(this, block, mirrIn,
- replyOut, numTargets));
+ replyOut, numTargets,
+ Thread.currentThread()));
responder.start(); // start thread to processes reponses
}
/*
- * Receive until packet has zero bytes of data.
+ * Receive until the last packet.
*/
- while (receivePacket() > 0) {}
+ while (receivePacket() >= 0) {}
// wait for all outstanding packet responses. And then
// indicate responder to gracefully shutdown.
@@ -729,13 +737,16 @@
DataOutputStream replyOut; // output to upstream datanode
private int numTargets; // number of downstream datanodes including myself
private BlockReceiver receiver; // The owner of this responder.
+ private Thread receiverThread; // the thread that spawns this responder
public String toString() {
return "PacketResponder " + numTargets + " for Block " + this.block;
}
PacketResponder(BlockReceiver receiver, Block b, DataInputStream in,
- DataOutputStream out, int numTargets) {
+ DataOutputStream out, int numTargets,
+ Thread receiverThread) {
+ this.receiverThread = receiverThread;
this.receiver = receiver;
this.block = b;
mirrorIn = in;
@@ -775,145 +786,31 @@
notifyAll();
}
- private synchronized void lastDataNodeRun() {
- long lastHeartbeat = System.currentTimeMillis();
- boolean lastPacket = false;
- final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
-
- while (running && datanode.shouldRun && !lastPacket) {
- long now = System.currentTimeMillis();
- try {
-
- // wait for a packet to be sent to downstream datanode
- while (running && datanode.shouldRun && ackQueue.size() == 0) {
- long idle = now - lastHeartbeat;
- long timeout = (datanode.socketTimeout/2) - idle;
- if (timeout <= 0) {
- timeout = 1000;
- }
- try {
- wait(timeout);
- } catch (InterruptedException e) {
- if (running) {
- LOG.info("PacketResponder " + numTargets +
- " for block " + block + " Interrupted.");
- running = false;
- }
- break;
- }
-
- // send a heartbeat if it is time.
- now = System.currentTimeMillis();
- if (now - lastHeartbeat > datanode.socketTimeout/2) {
- replyOut.writeLong(-1); // send heartbeat
- replyOut.flush();
- lastHeartbeat = now;
- }
- }
-
- if (!running || !datanode.shouldRun) {
- break;
- }
- Packet pkt = ackQueue.getFirst();
- long expected = pkt.seqno;
- LOG.debug("PacketResponder " + numTargets +
- " for block " + block +
- " acking for packet " + expected);
-
- // If this is the last packet in block, then close block
- // file and finalize the block before responding success
- if (pkt.lastPacketInBlock) {
- receiver.close();
- final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
- block.setNumBytes(replicaInfo.getNumBytes());
- datanode.data.finalizeBlock(block);
- datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
- if (ClientTraceLog.isInfoEnabled() &&
- receiver.clientName.length() > 0) {
- long offset = 0;
- ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
- receiver.inAddr, receiver.myAddr, block.getNumBytes(),
- "HDFS_WRITE", receiver.clientName, offset,
- datanode.dnRegistration.getStorageID(), block, endTime-startTime));
- } else {
- LOG.info("Received block " + block +
- " of size " + block.getNumBytes() +
- " from " + receiver.inAddr);
- }
- lastPacket = true;
- }
-
- ackReply(expected);
- replyOut.flush();
- // remove the packet from the ack queue
- removeAckHead();
- // update the bytes acked
- if (pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
- replicaInfo.setBytesAcked(pkt.lastByteInBlock);
- }
- } catch (Exception e) {
- LOG.warn("IOException in BlockReceiver.lastNodeRun: ", e);
- if (running) {
- try {
- datanode.checkDiskError(e); // may throw an exception here
- } catch (IOException ioe) {
- LOG.warn("DataNode.chekDiskError failed in lastDataNodeRun with: ",
- ioe);
- }
- LOG.info("PacketResponder " + block + " " + numTargets +
- " Exception " + StringUtils.stringifyException(e));
- running = false;
- }
- }
- }
- LOG.info("PacketResponder " + numTargets +
- " for block " + block + " terminating");
- }
-
- // This method is introduced to facilitate testing. Otherwise
- // there was a little chance to bind an AspectJ advice to such a sequence
- // of calls
- private void ackReply(long expected) throws IOException {
- replyOut.writeLong(expected);
- SUCCESS.write(replyOut);
- }
-
/**
* Thread to process incoming acks.
* @see java.lang.Runnable#run()
*/
public void run() {
-
- // If this is the last datanode in pipeline, then handle differently
- if (numTargets == 0) {
- lastDataNodeRun();
- return;
- }
-
boolean lastPacketInBlock = false;
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
while (running && datanode.shouldRun && !lastPacketInBlock) {
boolean isInterrupted = false;
try {
- DataTransferProtocol.Status op = SUCCESS;
- boolean didRead = false;
Packet pkt = null;
long expected = -2;
- try {
- // read seqno from downstream datanode
- long seqno = mirrorIn.readLong();
- didRead = true;
- if (seqno == -1) {
- replyOut.writeLong(-1); // send keepalive
- replyOut.flush();
- LOG.debug("PacketResponder " + numTargets + " got -1");
- continue;
- } else if (seqno == -2) {
- LOG.debug("PacketResponder " + numTargets + " got -2");
- } else {
- LOG.debug("PacketResponder " + numTargets + " got seqno = " +
- seqno);
+ PipelineAck ack = new PipelineAck();
+ long seqno = PipelineAck.UNKOWN_SEQNO;
+ try {
+ if (numTargets != 0 && !mirrorError) {// not the last DN & no mirror error
+ // read an ack from downstream datanode
+ ack.readFields(mirrorIn);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("PacketResponder " + numTargets + " got " + ack);
+ }
+ seqno = ack.getSeqno();
+ }
+ if (seqno != PipelineAck.UNKOWN_SEQNO || numTargets == 0) {
synchronized (this) {
while (running && datanode.shouldRun && ackQueue.size() == 0) {
if (LOG.isDebugEnabled()) {
@@ -922,17 +819,14 @@
" for block " + block +
" waiting for local datanode to finish write.");
}
- try {
- wait();
- } catch (InterruptedException e) {
- isInterrupted = true;
- throw e;
- }
+ wait();
+ }
+ if (!running || !datanode.shouldRun) {
+ break;
}
pkt = ackQueue.getFirst();
expected = pkt.seqno;
- LOG.debug("PacketResponder " + numTargets + " seqno = " + seqno);
- if (seqno != expected) {
+ if (numTargets > 0 && seqno != expected) {
throw new IOException("PacketResponder " + numTargets +
" for block " + block +
" expected seqno:" + expected +
@@ -941,11 +835,18 @@
lastPacketInBlock = pkt.lastPacketInBlock;
}
}
- } catch (Throwable e) {
- if (running) {
+ } catch (InterruptedException ine) {
+ isInterrupted = true;
+ } catch (IOException ioe) {
+ if (Thread.interrupted()) {
+ isInterrupted = true;
+ } else {
+ // continue to run even if can not read from mirror
+ // notify client of the error
+ // and wait for the client to shut down the pipeline
+ mirrorError = true;
LOG.info("PacketResponder " + block + " " + numTargets +
- " Exception " + StringUtils.stringifyException(e));
- running = false;
+ " Exception " + StringUtils.stringifyException(ioe));
}
}
@@ -955,8 +856,7 @@
* receiver thread (e.g. if it is ok to write to replyOut).
* It is prudent to not send any more status back to the client
* because this datanode has a problem. The upstream datanode
- * will detect a timout on heartbeats and will declare that
- * this datanode is bad, and rightly so.
+ * will detect that this datanode is bad, and rightly so.
*/
LOG.info("PacketResponder " + block + " " + numTargets +
" : Thread is interrupted.");
@@ -964,10 +864,6 @@
continue;
}
- if (!didRead) {
- op = ERROR;
- }
-
// If this is the last packet in block, then close block
// file and finalize the block before responding success
if (lastPacketInBlock) {
@@ -990,56 +886,39 @@
}
}
- // send my status back to upstream datanode
- ackReply(expected);
-
- LOG.debug("PacketResponder " + numTargets +
- " for block " + block +
- " responded my status " +
- " for seqno " + expected);
-
- boolean success = true;
- // forward responses from downstream datanodes.
- for (int i = 0; i < numTargets && datanode.shouldRun; i++) {
- try {
- if (op == SUCCESS) {
- op = Status.read(mirrorIn);
- if (op != SUCCESS) {
- success = false;
- LOG.debug("PacketResponder for block " + block +
- ": error code received from downstream " +
- " datanode[" + i + "] " + op);
- }
- }
- } catch (Throwable e) {
- op = ERROR;
- success = false;
+ // construct my ack message
+ Status[] replies = null;
+ if (mirrorError) { // ack read error
+ replies = new Status[2];
+ replies[0] = SUCCESS;
+ replies[1] = ERROR;
+ } else {
+ short ackLen = numTargets == 0 ? 0 : ack.getNumOfReplies();
+ replies = new Status[1+ackLen];
+ replies[0] = SUCCESS;
+ for (int i=0; i<ackLen; i++) {
+ replies[i+1] = ack.getReply(i);
}
- op.write(replyOut);
}
- replyOut.flush();
+ PipelineAck replyAck = new PipelineAck(expected, replies);
- LOG.debug("PacketResponder " + block + " " + numTargets +
- " responded other status " + " for seqno " + expected);
-
+ // send my ack back to upstream datanode
+ replyAck.write(replyOut);
+ replyOut.flush();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("PacketResponder " + numTargets +
+ " for block " + block +
+ " responded an ack: " + replyAck);
+ }
if (pkt != null) {
// remove the packet from the ack queue
removeAckHead();
// update bytes acked
- if (success && pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
+ if (replyAck.isSuccess() &&
+ pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
replicaInfo.setBytesAcked(pkt.lastByteInBlock);
}
}
- // If we were unable to read the seqno from downstream, then stop.
- if (expected == -2) {
- running = false;
- }
- // If we forwarded an error response from a downstream datanode
- // and we are acting on behalf of a client, then we quit. The
- // client will drive the recovery mechanism.
- if (op == ERROR && receiver.clientName.length() > 0) {
- running = false;
- }
} catch (IOException e) {
LOG.warn("IOException in BlockReceiver.run(): ", e);
if (running) {
@@ -1051,12 +930,16 @@
LOG.info("PacketResponder " + block + " " + numTargets +
" Exception " + StringUtils.stringifyException(e));
running = false;
+ if (!Thread.interrupted()) { // failure not caused by interruption
+ receiverThread.interrupt();
+ }
}
- } catch (RuntimeException e) {
+ } catch (Throwable e) {
if (running) {
LOG.info("PacketResponder " + block + " " + numTargets +
" Exception " + StringUtils.stringifyException(e));
running = false;
+ receiverThread.interrupt();
}
}
}
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java b/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
index 42ba15e..6383239 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
@@ -355,12 +355,14 @@
return dfsUsage.getUsed();
}
+ /**
+ * Calculate the capacity of the filesystem, after removing any
+ * reserved capacity.
+ * @return the unreserved number of bytes left in this filesystem. May be zero.
+ */
long getCapacity() throws IOException {
- if (reserved > usage.getCapacity()) {
- return 0;
- }
-
- return usage.getCapacity()-reserved;
+ long remaining = usage.getCapacity() - reserved;
+ return remaining > 0 ? remaining : 0;
}
long getAvailable() throws IOException {
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java b/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
index d8cdcbe..9e3f618 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
@@ -37,7 +37,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.util.Daemon;
/**
* BackupNode.
@@ -66,8 +65,6 @@
String nnHttpAddress;
/** Checkpoint manager */
Checkpointer checkpointManager;
- /** Checkpoint daemon */
- private Daemon cpDaemon;
BackupNode(Configuration conf, NamenodeRole role) throws IOException {
super(conf, role);
@@ -142,9 +139,17 @@
*/
@Override // NameNode
protected void innerClose() throws IOException {
- if(checkpointManager != null) checkpointManager.shouldRun = false;
- if(cpDaemon != null) cpDaemon.interrupt();
+ if(checkpointManager != null) {
+ // Prevent from starting a new checkpoint.
+ // Checkpoints that has already been started may proceed until
+ // the error reporting to the name-node is complete.
+ // Checkpoint manager should not be interrupted yet because it will
+ // close storage file channels and the checkpoint may fail with
+ // ClosedByInterruptException.
+ checkpointManager.shouldRun = false;
+ }
if(namenode != null && getRegistration() != null) {
+ // Exclude this node from the list of backup streams on the name-node
try {
namenode.errorReport(getRegistration(), NamenodeProtocol.FATAL,
"Shutting down.");
@@ -152,7 +157,15 @@
LOG.error("Failed to report to name-node.", e);
}
}
- RPC.stopProxy(namenode); // stop the RPC threads
+ // Stop the RPC client
+ RPC.stopProxy(namenode);
+ namenode = null;
+ // Stop the checkpoint manager
+ if(checkpointManager != null) {
+ checkpointManager.interrupt();
+ checkpointManager = null;
+ }
+ // Stop name-node threads
super.innerClose();
}
@@ -243,7 +256,7 @@
this.nnHttpAddress = getHostPortString(super.getHttpServerAddress(conf));
// get version and id info from the name-node
NamespaceInfo nsInfo = null;
- while(!stopRequested) {
+ while(!isStopRequested()) {
try {
nsInfo = handshake(namenode);
break;
@@ -262,8 +275,7 @@
*/
private void runCheckpointDaemon(Configuration conf) throws IOException {
checkpointManager = new Checkpointer(conf, this);
- cpDaemon = new Daemon(checkpointManager);
- cpDaemon.start();
+ checkpointManager.start();
}
/**
@@ -300,7 +312,7 @@
setRegistration();
NamenodeRegistration nnReg = null;
- while(!stopRequested) {
+ while(!isStopRequested()) {
try {
nnReg = namenode.register(getRegistration());
break;
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java b/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
index 0bc4acf..43d9f5a 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
@@ -41,7 +41,6 @@
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas;
import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.mortbay.log.Log;
/**
* Keeps information related to the blocks stored in the Hadoop cluster.
@@ -1614,6 +1613,7 @@
NameNode.stateChangeLog.info("BLOCK* ask " + dn.getName()
+ " to delete " + blockList);
}
+ pendingDeletionBlocksCount -= blocksToInvalidate.size();
return blocksToInvalidate.size();
}
}
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java b/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
index c723196..f6cb181 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
@@ -21,7 +21,6 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.ReflectionUtils;
import java.util.*;
@@ -61,26 +60,6 @@
* choose <i>numOfReplicas</i> data nodes for <i>writer</i>
* to re-replicate a block with size <i>blocksize</i>
* If not, return as many as we can.
- *
- * @param srcPath the file to which this chooseTargets is being invoked.
- * @param numOfReplicas additional number of replicas wanted.
- * @param writer the writer's machine, null if not in the cluster.
- * @param chosenNodes datanodes that have been chosen as targets.
- * @param excludedNodes: datanodes that should not be considered as targets.
- * @param blocksize size of the data to be written.
- * @return array of DatanodeDescriptor instances chosen as target
- * and sorted as a pipeline.
- */
- abstract DatanodeDescriptor[] chooseTarget(String srcPath,
- int numOfReplicas,
- DatanodeDescriptor writer,
- List<DatanodeDescriptor> chosenNodes,
- HashMap<Node, Node> excludedNodes,
- long blocksize);
-
- /**
- * choose <i>numOfReplicas</i> data nodes for <i>writer</i>
- * If not, return as many as we can.
* The base implemenatation extracts the pathname of the file from the
* specified srcInode, but this could be a costly operation depending on the
* file system implementation. Concrete implementations of this class should
@@ -188,29 +167,4 @@
new ArrayList<DatanodeDescriptor>(),
blocksize);
}
-
- /**
- * choose <i>numOfReplicas</i> nodes for <i>writer</i> to replicate
- * a block with size <i>blocksize</i>
- * If not, return as many as we can.
- *
- * @param srcPath a string representation of the file for which chooseTarget is invoked
- * @param numOfReplicas number of replicas wanted.
- * @param writer the writer's machine, null if not in the cluster.
- * @param blocksize size of the data to be written.
- * @param excludedNodes: datanodes that should not be considered as targets.
- * @return array of DatanodeDescriptor instances chosen as targets
- * and sorted as a pipeline.
- */
- DatanodeDescriptor[] chooseTarget(String srcPath,
- int numOfReplicas,
- DatanodeDescriptor writer,
- HashMap<Node, Node> excludedNodes,
- long blocksize) {
- return chooseTarget(srcPath, numOfReplicas, writer,
- new ArrayList<DatanodeDescriptor>(),
- excludedNodes,
- blocksize);
- }
-
}
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java b/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
index 96dca00..b01dcac 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
@@ -68,17 +68,6 @@
}
/** {@inheritDoc} */
- public DatanodeDescriptor[] chooseTarget(String srcPath,
- int numOfReplicas,
- DatanodeDescriptor writer,
- List<DatanodeDescriptor> chosenNodes,
- HashMap<Node, Node> excludedNodes,
- long blocksize) {
- return chooseTarget(numOfReplicas, writer, chosenNodes, excludedNodes, blocksize);
- }
-
-
- /** {@inheritDoc} */
@Override
public DatanodeDescriptor[] chooseTarget(FSInodeInfo srcInode,
int numOfReplicas,
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java b/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
index a7d5d56..c1c7f66 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
@@ -37,6 +37,7 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpServer;
+import org.apache.hadoop.util.Daemon;
/**
* The Checkpointer is responsible for supporting periodic checkpoints
@@ -49,7 +50,7 @@
* The start of a checkpoint is triggered by one of the two factors:
* (1) time or (2) the size of the edits file.
*/
-class Checkpointer implements Runnable {
+class Checkpointer extends Daemon {
public static final Log LOG =
LogFactory.getLog(Checkpointer.class.getName());
@@ -95,7 +96,7 @@
HttpServer httpServer = backupNode.httpServer;
httpServer.setAttribute("name.system.image", getFSImage());
httpServer.setAttribute("name.conf", conf);
- httpServer.addServlet("getimage", "/getimage", GetImageServlet.class);
+ httpServer.addInternalServlet("getimage", "/getimage", GetImageServlet.class);
LOG.info("Checkpoint Period : " + checkpointPeriod + " secs " +
"(" + checkpointPeriod/60 + " min)");
@@ -144,7 +145,8 @@
LOG.error("Exception in doCheckpoint: ", e);
} catch(Throwable e) {
LOG.error("Throwable Exception in doCheckpoint: ", e);
- Runtime.getRuntime().exit(-1);
+ shutdown();
+ break;
}
try {
Thread.sleep(periodMSec);
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryServlet.java b/src/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryServlet.java
new file mode 100644
index 0000000..f534625
--- /dev/null
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryServlet.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.znerd.xmlenc.XMLOutputter;
+
+/** Servlets for file checksum */
+public class ContentSummaryServlet extends DfsServlet {
+ /** For java.io.Serializable */
+ private static final long serialVersionUID = 1L;
+
+ /** {@inheritDoc} */
+ public void doGet(HttpServletRequest request, HttpServletResponse response
+ ) throws ServletException, IOException {
+ final UnixUserGroupInformation ugi = getUGI(request);
+ final String path = request.getPathInfo();
+
+ final PrintWriter out = response.getWriter();
+ final XMLOutputter xml = new XMLOutputter(out, "UTF-8");
+ xml.declaration();
+ try {
+ //get content summary
+ final ClientProtocol nnproxy = createNameNodeProxy(ugi);
+ final ContentSummary cs = nnproxy.getContentSummary(path);
+
+ //write xml
+ xml.startTag(ContentSummary.class.getName());
+ if (cs != null) {
+ xml.attribute("length" , "" + cs.getLength());
+ xml.attribute("fileCount" , "" + cs.getFileCount());
+ xml.attribute("directoryCount", "" + cs.getDirectoryCount());
+ xml.attribute("quota" , "" + cs.getQuota());
+ xml.attribute("spaceConsumed" , "" + cs.getSpaceConsumed());
+ xml.attribute("spaceQuota" , "" + cs.getSpaceQuota());
+ }
+ xml.endTag();
+ } catch(IOException ioe) {
+ new RemoteException(ioe.getClass().getName(), ioe.getMessage()
+ ).writeXml(path, xml);
+ }
+ xml.endDocument();
+ }
+}
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java b/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
index 1e0db5c..feb5ca2 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
@@ -22,7 +22,6 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index cfab437..ce4f9ed 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -24,7 +24,6 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Iterator;
import org.apache.hadoop.fs.FileStatus;
@@ -216,7 +215,7 @@
/**
* Shutdown the file store.
*/
- public synchronized void close() {
+ synchronized void close() {
while (isSyncRunning) {
try {
wait(1000);
@@ -275,12 +274,6 @@
String lsd = fsimage.listStorageDirectories();
FSNamesystem.LOG.info("current list of storage dirs:" + lsd);
- //EditLogOutputStream
- if (editStreams == null || editStreams.size() <= 1) {
- FSNamesystem.LOG.fatal(
- "Fatal Error : All storage directories are inaccessible.");
- Runtime.getRuntime().exit(-1);
- }
ArrayList<StorageDirectory> al = null;
for (EditLogOutputStream eStream : errorStreams) {
@@ -311,6 +304,12 @@
}
}
+ if (editStreams == null || editStreams.size() <= 0) {
+ String msg = "Fatal Error: All storage directories are inaccessible.";
+ FSNamesystem.LOG.fatal(msg, new IOException(msg));
+ Runtime.getRuntime().exit(-1);
+ }
+
// removed failed SDs
if(propagate && al != null) fsimage.processIOError(al, false);
@@ -867,6 +866,7 @@
try {
eStream.flush();
} catch (IOException ie) {
+ FSNamesystem.LOG.error("Unable to sync edit log.", ie);
//
// remember the streams that encountered an error.
//
@@ -874,8 +874,6 @@
errorStreams = new ArrayList<EditLogOutputStream>(1);
}
errorStreams.add(eStream);
- FSNamesystem.LOG.error("Unable to sync edit log. " +
- "Fatal Error.");
}
}
long elapsed = FSNamesystem.now() - start;
@@ -1165,6 +1163,7 @@
// replace by the new stream
itE.replace(eStream);
} catch (IOException e) {
+ FSNamesystem.LOG.warn("Error in editStream " + eStream.getName(), e);
if(errorStreams == null)
errorStreams = new ArrayList<EditLogOutputStream>(1);
errorStreams.add(eStream);
@@ -1225,6 +1224,7 @@
// replace by the new stream
itE.replace(eStream);
} catch (IOException e) {
+ FSNamesystem.LOG.warn("Error in editStream " + eStream.getName(), e);
if(errorStreams == null)
errorStreams = new ArrayList<EditLogOutputStream>(1);
errorStreams.add(eStream);
@@ -1390,6 +1390,7 @@
try {
eStream.write(data, 0, length);
} catch (IOException ie) {
+ FSNamesystem.LOG.warn("Error in editStream " + eStream.getName(), ie);
if(errorStreams == null)
errorStreams = new ArrayList<EditLogOutputStream>(1);
errorStreams.add(eStream);
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
index 9f34651..754dfe6 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
@@ -56,6 +56,7 @@
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.common.UpgradeManager;
+import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType;
@@ -305,11 +306,10 @@
for ( ;it.hasNext(); ) {
StorageDirectory sd = it.next();
try {
- list.add(new URI("file://" + sd.getRoot().getAbsolutePath()));
- } catch (Exception e) {
+ list.add(Util.fileAsURI(sd.getRoot()));
+ } catch (IOException e) {
throw new IOException("Exception while processing " +
- "StorageDirectory " + sd.getRoot().getAbsolutePath() + ". The"
- + " full error message is " + e.getMessage());
+ "StorageDirectory " + sd.getRoot(), e);
}
}
return list;
@@ -1708,7 +1708,7 @@
ckptState = CheckpointStates.UPLOAD_DONE;
}
- void close() throws IOException {
+ synchronized void close() throws IOException {
getEditLog().close();
unlockAll();
}
@@ -1907,25 +1907,9 @@
if (dirNames.size() == 0 && defaultValue != null) {
dirNames.add(defaultValue);
}
- Collection<URI> dirs = new ArrayList<URI>(dirNames.size());
- for(String name : dirNames) {
- try {
- // process value as URI
- URI u = new URI(name);
- // if scheme is undefined, then assume it's file://
- if(u.getScheme() == null)
- u = new URI("file://" + new File(name).getAbsolutePath());
- // check that scheme is not null (trivial) and supported
- checkSchemeConsistency(u);
- dirs.add(u);
- } catch (Exception e) {
- LOG.error("Error while processing URI: " + name +
- ". The error message was: " + e.getMessage());
- }
- }
- return dirs;
+ return Util.stringCollectionAsURIs(dirNames);
}
-
+
static Collection<URI> getCheckpointEditsDirs(Configuration conf,
String defaultName) {
Collection<String> dirNames =
@@ -1933,23 +1917,7 @@
if (dirNames.size() == 0 && defaultName != null) {
dirNames.add(defaultName);
}
- Collection<URI> dirs = new ArrayList<URI>(dirNames.size());
- for(String name : dirNames) {
- try {
- // process value as URI
- URI u = new URI(name);
- // if scheme is undefined, then assume it's file://
- if(u.getScheme() == null)
- u = new URI("file://" + new File(name).getAbsolutePath());
- // check that scheme is not null (trivial) and supported
- checkSchemeConsistency(u);
- dirs.add(u);
- } catch (Exception e) {
- LOG.error("Error while processing URI: " + name +
- ". The error message was: " + e.getMessage());
- }
- }
- return dirs;
+ return Util.stringCollectionAsURIs(dirNames);
}
static private final DeprecatedUTF8 U_STR = new DeprecatedUTF8();
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index e1c7443..7589682 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
+import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMetrics;
import org.apache.hadoop.security.AccessControlException;
@@ -40,7 +41,6 @@
import org.apache.hadoop.net.CachedDNSToSwitchMapping;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.ScriptBasedMapping;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
@@ -339,26 +339,8 @@
"\n\t\t- use Backup Node as a persistent and up-to-date storage " +
"of the file system meta-data.");
} else if (dirNames.isEmpty())
- dirNames.add("/tmp/hadoop/dfs/name");
- Collection<URI> dirs = new ArrayList<URI>(dirNames.size());
- for(String name : dirNames) {
- try {
- URI u = new URI(name);
- // If the scheme was not declared, default to file://
- // and use the absolute path of the file, then warn the user
- if(u.getScheme() == null) {
- u = new URI("file://" + new File(name).getAbsolutePath());
- LOG.warn("Scheme is undefined for " + name);
- LOG.warn("Please check your file system configuration in " +
- "hdfs-site.xml");
- }
- dirs.add(u);
- } catch (Exception e) {
- LOG.error("Error while processing URI: " + name +
- ". The error message was: " + e.getMessage());
- }
- }
- return dirs;
+ dirNames.add("file:///tmp/hadoop/dfs/name");
+ return Util.stringCollectionAsURIs(dirNames);
}
public static Collection<URI> getNamespaceEditsDirs(Configuration conf) {
@@ -688,6 +670,8 @@
*/
public synchronized void setPermission(String src, FsPermission permission
) throws IOException {
+ if (isInSafeMode())
+ throw new SafeModeException("Cannot set permission for " + src, safeMode);
checkOwner(src);
dir.setPermission(src, permission);
getEditLog().logSync();
@@ -705,6 +689,8 @@
*/
public synchronized void setOwner(String src, String username, String group
) throws IOException {
+ if (isInSafeMode())
+ throw new SafeModeException("Cannot set owner for " + src, safeMode);
FSPermissionChecker pc = checkOwner(src);
if (!pc.isSuper) {
if (username != null && !pc.user.equals(username)) {
@@ -1361,18 +1347,10 @@
* are replicated. Will return an empty 2-elt array if we want the
* client to "try again later".
*/
- public LocatedBlock getAdditionalBlock(String src,
+ public LocatedBlock getAdditionalBlock(String src,
String clientName,
Block previous
) throws IOException {
- return getAdditionalBlock(src, clientName, previous, null);
- }
-
- public LocatedBlock getAdditionalBlock(String src,
- String clientName,
- Block previous,
- HashMap<Node, Node> excludedNodes
- ) throws IOException {
long fileLength, blockSize;
int replication;
DatanodeDescriptor clientNode = null;
@@ -1408,7 +1386,7 @@
// choose targets for the new block to be allocated.
DatanodeDescriptor targets[] = blockManager.replicator.chooseTarget(
- src, replication, clientNode, excludedNodes, blockSize);
+ src, replication, clientNode, blockSize);
if (targets.length < blockManager.minReplication) {
throw new IOException("File " + src + " could only be replicated to " +
targets.length + " nodes, instead of " +
@@ -1909,10 +1887,11 @@
* contract.
*/
void setQuota(String path, long nsQuota, long dsQuota) throws IOException {
+ if (isInSafeMode())
+ throw new SafeModeException("Cannot set quota on " + path, safeMode);
if (isPermissionEnabled) {
checkSuperuserPrivilege();
}
-
dir.setQuota(path, nsQuota, dsQuota);
getEditLog().logSync();
}
@@ -2011,8 +1990,17 @@
BlockInfoUnderConstruction lastBlock = pendingFile.getLastBlock();
BlockUCState lastBlockState = lastBlock.getBlockUCState();
BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
- BlockUCState penultimateBlockState = (penultimateBlock == null ?
- BlockUCState.COMPLETE : penultimateBlock.getBlockUCState());
+ boolean penultimateBlockMinReplication;
+ BlockUCState penultimateBlockState;
+ if (penultimateBlock == null) {
+ penultimateBlockState = BlockUCState.COMPLETE;
+ // If penultimate block doesn't exist then its minReplication is met
+ penultimateBlockMinReplication = true;
+ } else {
+ penultimateBlockState = BlockUCState.COMMITTED;
+ penultimateBlockMinReplication =
+ blockManager.checkMinReplication(penultimateBlock);
+ }
assert penultimateBlockState == BlockUCState.COMPLETE ||
penultimateBlockState == BlockUCState.COMMITTED :
"Unexpected state of penultimate block in " + src;
@@ -2023,7 +2011,7 @@
break;
case COMMITTED:
// Close file if committed blocks are minimally replicated
- if(blockManager.checkMinReplication(penultimateBlock) &&
+ if(penultimateBlockMinReplication &&
blockManager.checkMinReplication(lastBlock)) {
finalizeINodeFileUnderConstruction(src, pendingFile);
NameNode.stateChangeLog.warn("BLOCK*"
@@ -3860,14 +3848,12 @@
getFSImage().rollFSImage();
}
- NamenodeCommand startCheckpoint(NamenodeRegistration bnReg, // backup node
- NamenodeRegistration nnReg) // active name-node
+ synchronized NamenodeCommand startCheckpoint(
+ NamenodeRegistration bnReg, // backup node
+ NamenodeRegistration nnReg) // active name-node
throws IOException {
- NamenodeCommand cmd;
- synchronized(this) {
- cmd = getFSImage().startCheckpoint(bnReg, nnReg);
- }
LOG.info("Start checkpoint for " + bnReg.getAddress());
+ NamenodeCommand cmd = getFSImage().startCheckpoint(bnReg, nnReg);
getEditLog().logSync();
return cmd;
}
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index 3a38c02..6a9eaa0 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -22,7 +22,6 @@
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -50,6 +49,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
+import org.apache.hadoop.security.RefreshUserToGroupMappingsProtocol;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
@@ -74,7 +74,6 @@
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
@@ -124,7 +123,8 @@
**********************************************************/
public class NameNode extends Service implements ClientProtocol, DatanodeProtocol,
NamenodeProtocol, FSConstants,
- RefreshAuthorizationPolicyProtocol {
+ RefreshAuthorizationPolicyProtocol,
+ RefreshUserToGroupMappingsProtocol {
static{
Configuration.addDefaultResource("hdfs-default.xml");
Configuration.addDefaultResource("hdfs-site.xml");
@@ -140,6 +140,8 @@
return NamenodeProtocol.versionID;
} else if (protocol.equals(RefreshAuthorizationPolicyProtocol.class.getName())){
return RefreshAuthorizationPolicyProtocol.versionID;
+ } else if (protocol.equals(RefreshUserToGroupMappingsProtocol.class.getName())){
+ return RefreshUserToGroupMappingsProtocol.versionID;
} else {
throw new IOException("Unknown protocol to name node: " + protocol);
}
@@ -367,6 +369,8 @@
this.httpServer.addInternalServlet("data", "/data/*", FileDataServlet.class);
this.httpServer.addInternalServlet("checksum", "/fileChecksum/*",
FileChecksumServlets.RedirectServlet.class);
+ this.httpServer.addInternalServlet("contentSummary", "/contentSummary/*",
+ ContentSummaryServlet.class);
this.httpServer.start();
// The web-server port can be ephemeral... ensure we have the correct info
@@ -514,9 +518,11 @@
protected synchronized void innerClose() throws IOException {
LOG.info("Closing " + getServiceName());
- if (stopRequested)
- return;
- stopRequested = true;
+ synchronized(this) {
+ if (stopRequested)
+ return;
+ stopRequested = true;
+ }
if (plugins != null) {
for (ServicePlugin p : plugins) {
try {
@@ -550,7 +556,11 @@
namesystem = null;
}
}
-
+
+ synchronized boolean isStopRequested() {
+ return stopRequested;
+ }
+
/////////////////////////////////////////////////////
// NamenodeProtocol
/////////////////////////////////////////////////////
@@ -706,30 +716,14 @@
namesystem.setOwner(src, username, groupname);
}
-
- @Override
+ /**
+ */
public LocatedBlock addBlock(String src, String clientName,
Block previous) throws IOException {
- return addBlock(src, clientName, previous, null);
- }
-
- @Override
- public LocatedBlock addBlock(String src,
- String clientName,
- Block previous,
- DatanodeInfo[] excludedNodes
- ) throws IOException {
stateChangeLog.debug("*BLOCK* NameNode.addBlock: file "
+src+" for "+clientName);
- HashMap<Node, Node> excludedNodesSet = null;
- if (excludedNodes != null) {
- excludedNodesSet = new HashMap<Node, Node>(excludedNodes.length);
- for (Node node:excludedNodes) {
- excludedNodesSet.put(node, node);
- }
- }
LocatedBlock locatedBlock =
- namesystem.getAdditionalBlock(src, clientName, previous, excludedNodesSet);
+ namesystem.getAdditionalBlock(src, clientName, previous);
if (locatedBlock != null)
myMetrics.numAddBlockOps.inc();
return locatedBlock;
@@ -1246,6 +1240,13 @@
SecurityUtil.getPolicy().refresh();
}
+ @Override
+ public void refreshUserToGroupsMappings(Configuration conf) throws IOException {
+ LOG.info("Refreshing all user-to-groups mappings. Requested by user: " +
+ UserGroupInformation.getCurrentUGI().getUserName());
+ SecurityUtil.getUserToGroupsMappingService(conf).refresh();
+ }
+
private static void printUsage() {
System.err.println(
"Usage: java NameNode [" +
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java b/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
index 2ced987..b13a728 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
import java.net.InetAddress;
import java.net.URLEncoder;
import java.util.ArrayList;
@@ -53,8 +56,11 @@
long inodes = fsn.dir.totalInodes();
long blocks = fsn.getBlocksTotal();
long maxobjects = fsn.getMaxObjects();
- long totalMemory = Runtime.getRuntime().totalMemory();
- long maxMemory = Runtime.getRuntime().maxMemory();
+
+ MemoryMXBean mem = ManagementFactory.getMemoryMXBean();
+ MemoryUsage heap = mem.getHeapMemoryUsage();
+ long totalMemory = heap.getUsed();
+ long maxMemory = heap.getMax();
long used = (totalMemory * 100) / maxMemory;
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java b/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
index 32b8cbe..1e7c741 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
@@ -589,6 +589,7 @@
sdEdits = it.next();
if ((sdName == null) || (sdEdits == null))
throw new IOException("Could not locate checkpoint directories");
+ this.layoutVersion = -1; // to avoid assert in loadFSImage()
loadFSImage(FSImage.getImageFile(sdName, NameNodeFile.IMAGE));
loadFSEdits(sdEdits);
sig.validateStorageInfo(this);
diff --git a/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
index 00f0455..19d2edb 100644
--- a/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
+++ b/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
@@ -44,6 +44,7 @@
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.RefreshUserToGroupMappingsProtocol;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
import org.apache.hadoop.util.StringUtils;
@@ -473,6 +474,7 @@
"\t[" + SetSpaceQuotaCommand.USAGE + "]\n" +
"\t[" + ClearSpaceQuotaCommand.USAGE +"]\n" +
"\t[-refreshServiceAcl]\n" +
+ "\t[-refreshUserToGroupsMappings]\n" +
"\t[-printTopology]\n" +
"\t[-help [cmd]]\n";
@@ -527,6 +529,9 @@
String refreshServiceAcl = "-refreshServiceAcl: Reload the service-level authorization policy file\n" +
"\t\tNamenode will reload the authorization policy file.\n";
+ String refreshUserToGroupsMappings =
+ "-refreshUserToGroupsMappings: Refresh user-to-groups mappings\n";
+
String printTopology = "-printTopology: Print a tree of the racks and their\n" +
"\t\tnodes as reported by the Namenode\n";
@@ -559,6 +564,8 @@
System.out.println(ClearSpaceQuotaCommand.DESCRIPTION);
} else if ("refreshServiceAcl".equals(cmd)) {
System.out.println(refreshServiceAcl);
+ } else if ("refreshUserToGroupsMappings".equals(cmd)) {
+ System.out.println(refreshUserToGroupsMappings);
} else if ("printTopology".equals(cmd)) {
System.out.println(printTopology);
} else if ("help".equals(cmd)) {
@@ -746,6 +753,30 @@
}
/**
+ * Refresh the user-to-groups mappings on the {@link NameNode}.
+ * @return exitcode 0 on success, non-zero on failure
+ * @throws IOException
+ */
+ public int refreshUserToGroupsMappings() throws IOException {
+ // Get the current configuration
+ Configuration conf = getConf();
+
+ // Create the client
+ RefreshUserToGroupMappingsProtocol refreshProtocol =
+ (RefreshUserToGroupMappingsProtocol)
+ RPC.getProxy(RefreshUserToGroupMappingsProtocol.class,
+ RefreshUserToGroupMappingsProtocol.versionID,
+ NameNode.getAddress(conf), getUGI(conf), conf,
+ NetUtils.getSocketFactory(conf,
+ RefreshUserToGroupMappingsProtocol.class));
+
+ // Refresh the user-to-groups mappings
+ refreshProtocol.refreshUserToGroupsMappings(conf);
+
+ return 0;
+ }
+
+ /**
* Displays format of commands.
* @param cmd The command that is being executed.
*/
@@ -789,6 +820,9 @@
} else if ("-refreshServiceAcl".equals(cmd)) {
System.err.println("Usage: java DFSAdmin"
+ " [-refreshServiceAcl]");
+ } else if ("-refreshUserToGroupsMappings".equals(cmd)) {
+ System.err.println("Usage: java DFSAdmin"
+ + " [-refreshUserToGroupsMappings]");
} else if ("-printTopology".equals(cmd)) {
System.err.println("Usage: java DFSAdmin"
+ " [-printTopology]");
@@ -803,6 +837,7 @@
System.err.println(" [-upgradeProgress status | details | force]");
System.err.println(" [-metasave filename]");
System.err.println(" [-refreshServiceAcl]");
+ System.err.println(" [-refreshUserToGroupsMappings]");
System.err.println(" [-printTopology]");
System.err.println(" ["+SetQuotaCommand.USAGE+"]");
System.err.println(" ["+ClearQuotaCommand.USAGE+"]");
@@ -879,11 +914,15 @@
printUsage(cmd);
return exitCode;
}
- else if ("-printTopology".equals(cmd)) {
- if(argv.length != 1) {
- printUsage(cmd);
- return exitCode;
- }
+ } else if ("-refreshUserToGroupsMappings".equals(cmd)) {
+ if (argv.length != 1) {
+ printUsage(cmd);
+ return exitCode;
+ }
+ } else if ("-printTopology".equals(cmd)) {
+ if(argv.length != 1) {
+ printUsage(cmd);
+ return exitCode;
}
}
@@ -927,6 +966,8 @@
exitCode = new SetSpaceQuotaCommand(argv, i, fs).runAll();
} else if ("-refreshServiceAcl".equals(cmd)) {
exitCode = refreshServiceAcl();
+ } else if ("-refreshUserToGroupsMappings".equals(cmd)) {
+ exitCode = refreshUserToGroupsMappings();
} else if ("-printTopology".equals(cmd)) {
exitCode = printTopology();
} else if ("-help".equals(cmd)) {
diff --git a/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java b/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java
index 5be5d8d..898cabf 100644
--- a/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java
+++ b/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java
@@ -59,30 +59,36 @@
private volatile boolean isSuccess = false;
/** Simulate action for the receiverOpWriteBlock pointcut */
- public final ActionContainer<DatanodeID> fiReceiverOpWriteBlock
- = new ActionContainer<DatanodeID>();
+ public final ActionContainer<DatanodeID, IOException> fiReceiverOpWriteBlock
+ = new ActionContainer<DatanodeID, IOException>();
/** Simulate action for the callReceivePacket pointcut */
- public final ActionContainer<DatanodeID> fiCallReceivePacket
- = new ActionContainer<DatanodeID>();
+ public final ActionContainer<DatanodeID, IOException> fiCallReceivePacket
+ = new ActionContainer<DatanodeID, IOException>();
+ /** Simulate action for the callWritePacketToDisk pointcut */
+ public final ActionContainer<DatanodeID, IOException> fiCallWritePacketToDisk
+ = new ActionContainer<DatanodeID, IOException>();
/** Simulate action for the statusRead pointcut */
- public final ActionContainer<DatanodeID> fiStatusRead
- = new ActionContainer<DatanodeID>();
+ public final ActionContainer<DatanodeID, IOException> fiStatusRead
+ = new ActionContainer<DatanodeID, IOException>();
+ /** Simulate action for the afterDownstreamStatusRead pointcut */
+ public final ActionContainer<DatanodeID, IOException> fiAfterDownstreamStatusRead
+ = new ActionContainer<DatanodeID, IOException>();
/** Simulate action for the pipelineAck pointcut */
- public final ActionContainer<DatanodeID> fiPipelineAck
- = new ActionContainer<DatanodeID>();
+ public final ActionContainer<DatanodeID, IOException> fiPipelineAck
+ = new ActionContainer<DatanodeID, IOException>();
/** Simulate action for the pipelineClose pointcut */
- public final ActionContainer<DatanodeID> fiPipelineClose
- = new ActionContainer<DatanodeID>();
+ public final ActionContainer<DatanodeID, IOException> fiPipelineClose
+ = new ActionContainer<DatanodeID, IOException>();
/** Simulate action for the blockFileClose pointcut */
- public final ActionContainer<DatanodeID> fiBlockFileClose
- = new ActionContainer<DatanodeID>();
+ public final ActionContainer<DatanodeID, IOException> fiBlockFileClose
+ = new ActionContainer<DatanodeID, IOException>();
/** Verification action for the pipelineInitNonAppend pointcut */
- public final ActionContainer<Integer> fiPipelineInitErrorNonAppend
- = new ActionContainer<Integer>();
+ public final ActionContainer<Integer, RuntimeException> fiPipelineInitErrorNonAppend
+ = new ActionContainer<Integer, RuntimeException>();
/** Verification action for the pipelineErrorAfterInit pointcut */
- public final ActionContainer<Integer> fiPipelineErrorAfterInit
- = new ActionContainer<Integer>();
+ public final ActionContainer<Integer, RuntimeException> fiPipelineErrorAfterInit
+ = new ActionContainer<Integer, RuntimeException>();
/** Get test status */
public boolean isSuccess() {
@@ -121,7 +127,8 @@
}
/** Action for DataNode */
- public static abstract class DataNodeAction implements Action<DatanodeID> {
+ public static abstract class DataNodeAction implements
+ Action<DatanodeID, IOException> {
/** The name of the test */
final String currentTest;
/** The index of the datanode */
@@ -195,6 +202,28 @@
}
}
+ /** Throws OutOfMemoryError if the count is zero. */
+ public static class CountdownOomAction extends OomAction {
+ private final CountdownConstraint countdown;
+
+ /** Create an action for datanode i in the pipeline with count down. */
+ public CountdownOomAction(String currentTest, int i, int count) {
+ super(currentTest, i);
+ countdown = new CountdownConstraint(count);
+ }
+
+ @Override
+ public void run(DatanodeID id) {
+ final DataTransferTest test = getDataTransferTest();
+ final Pipeline p = test.getPipeline(id);
+ if (p.contains(index, id) && countdown.isSatisfied()) {
+ final String s = toString(id);
+ FiTestUtil.LOG.info(s);
+ throw new OutOfMemoryError(s);
+ }
+ }
+ }
+
/** Throws DiskOutOfSpaceException. */
public static class DoosAction extends DataNodeAction {
/** Create an action for datanode i in the pipeline. */
@@ -242,6 +271,28 @@
}
}
+ /** Throws DiskOutOfSpaceException if the count is zero. */
+ public static class CountdownDoosAction extends DoosAction {
+ private final CountdownConstraint countdown;
+
+ /** Create an action for datanode i in the pipeline with count down. */
+ public CountdownDoosAction(String currentTest, int i, int count) {
+ super(currentTest, i);
+ countdown = new CountdownConstraint(count);
+ }
+
+ @Override
+ public void run(DatanodeID id) throws DiskOutOfSpaceException {
+ final DataTransferTest test = getDataTransferTest();
+ final Pipeline p = test.getPipeline(id);
+ if (p.contains(index, id) && countdown.isSatisfied()) {
+ final String s = toString(id);
+ FiTestUtil.LOG.info(s);
+ throw new DiskOutOfSpaceException(s);
+ }
+ }
+ }
+
/**
* Sleep some period of time so that it slows down the datanode
* or sleep forever so that datanode becomes not responding.
@@ -307,8 +358,50 @@
}
}
+ /**
+ * When the count is zero,
+ * sleep some period of time so that it slows down the datanode
+ * or sleep forever so that datanode becomes not responding.
+ */
+ public static class CountdownSleepAction extends SleepAction {
+ private final CountdownConstraint countdown;
+
+ /**
+ * Create an action for datanode i in the pipeline.
+ * @param duration In milliseconds, duration <= 0 means sleeping forever.
+ */
+ public CountdownSleepAction(String currentTest, int i,
+ long duration, int count) {
+ this(currentTest, i, duration, duration+1, count);
+ }
+
+ /** Create an action for datanode i in the pipeline with count down. */
+ public CountdownSleepAction(String currentTest, int i,
+ long minDuration, long maxDuration, int count) {
+ super(currentTest, i, minDuration, maxDuration);
+ countdown = new CountdownConstraint(count);
+ }
+
+ @Override
+ public void run(DatanodeID id) {
+ final DataTransferTest test = getDataTransferTest();
+ final Pipeline p = test.getPipeline(id);
+ if (p.contains(index, id) && countdown.isSatisfied()) {
+ final String s = toString(id) + ", duration = ["
+ + minDuration + "," + maxDuration + ")";
+ FiTestUtil.LOG.info(s);
+ if (maxDuration <= 1) {
+ for(; true; FiTestUtil.sleep(1000)); //sleep forever
+ } else {
+ FiTestUtil.sleep(minDuration, maxDuration);
+ }
+ }
+ }
+ }
+
/** Action for pipeline error verification */
- public static class VerificationAction implements Action<Integer> {
+ public static class VerificationAction implements
+ Action<Integer, RuntimeException> {
/** The name of the test */
final String currentTest;
/** The error index of the datanode */
@@ -343,9 +436,10 @@
* Create a OomAction with a CountdownConstraint
* so that it throws OutOfMemoryError if the count is zero.
*/
- public static ConstraintSatisfactionAction<DatanodeID> createCountdownOomAction(
- String currentTest, int i, int count) {
- return new ConstraintSatisfactionAction<DatanodeID>(
+ public static ConstraintSatisfactionAction<DatanodeID, IOException>
+ createCountdownOomAction(
+ String currentTest, int i, int count) {
+ return new ConstraintSatisfactionAction<DatanodeID, IOException>(
new OomAction(currentTest, i), new CountdownConstraint(count));
}
@@ -353,9 +447,10 @@
* Create a DoosAction with a CountdownConstraint
* so that it throws DiskOutOfSpaceException if the count is zero.
*/
- public static ConstraintSatisfactionAction<DatanodeID> createCountdownDoosAction(
+ public static ConstraintSatisfactionAction<DatanodeID, IOException>
+ createCountdownDoosAction(
String currentTest, int i, int count) {
- return new ConstraintSatisfactionAction<DatanodeID>(
+ return new ConstraintSatisfactionAction<DatanodeID, IOException>(
new DoosAction(currentTest, i), new CountdownConstraint(count));
}
@@ -366,9 +461,9 @@
* sleep some period of time so that it slows down the datanode
* or sleep forever so the that datanode becomes not responding.
*/
- public static ConstraintSatisfactionAction<DatanodeID> createCountdownSleepAction(
+ public static ConstraintSatisfactionAction<DatanodeID, IOException> createCountdownSleepAction(
String currentTest, int i, long minDuration, long maxDuration, int count) {
- return new ConstraintSatisfactionAction<DatanodeID>(
+ return new ConstraintSatisfactionAction<DatanodeID, IOException>(
new SleepAction(currentTest, i, minDuration, maxDuration),
new CountdownConstraint(count));
}
@@ -377,7 +472,7 @@
* Same as
* createCountdownSleepAction(currentTest, i, duration, duration+1, count).
*/
- public static ConstraintSatisfactionAction<DatanodeID> createCountdownSleepAction(
+ public static ConstraintSatisfactionAction<DatanodeID, IOException> createCountdownSleepAction(
String currentTest, int i, long duration, int count) {
return createCountdownSleepAction(currentTest, i, duration, duration+1,
count);
diff --git a/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java b/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java
index 8ff53b1..201c8eb 100644
--- a/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java
+++ b/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java
@@ -59,9 +59,9 @@
/** Class adds new type of action */
public static class HFlushTest extends DataTransferTest {
- public final ActionContainer<DatanodeID> fiCallHFlush =
- new ActionContainer<DatanodeID>();
- public final ActionContainer<Integer> fiErrorOnCallHFlush =
- new ActionContainer<Integer>();
+ public final ActionContainer<DatanodeID, IOException> fiCallHFlush =
+ new ActionContainer<DatanodeID, IOException>();
+ public final ActionContainer<Integer, RuntimeException> fiErrorOnCallHFlush =
+ new ActionContainer<Integer, RuntimeException>();
}
}
\ No newline at end of file
diff --git a/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java b/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java
index 853d271..3f608a4 100644
--- a/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java
+++ b/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java
@@ -17,7 +17,8 @@
*/
package org.apache.hadoop.fi;
-import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Random;
import org.apache.commons.logging.Log;
@@ -95,24 +96,23 @@
}
/** Action interface */
- public static interface Action<T> {
+ public static interface Action<T, E extends Exception> {
/** Run the action with the parameter. */
- public void run(T parameter) throws IOException;
+ public void run(T parameter) throws E;
}
/** An ActionContainer contains at most one action. */
- public static class ActionContainer<T> {
- private Action<T> action;
-
+ public static class ActionContainer<T, E extends Exception> {
+ private List<Action<T, E>> actionList = new ArrayList<Action<T, E>>();
/** Create an empty container. */
public ActionContainer() {}
/** Set action. */
- public void set(Action<T> a) {action = a;}
+ public void set(Action<T, E> a) {actionList.add(a);}
/** Run the action if it exists. */
- public void run(T obj) throws IOException {
- if (action != null) {
+ public void run(T obj) throws E {
+ for (Action<T, E> action : actionList) {
action.run(obj);
}
}
@@ -124,21 +124,21 @@
public boolean isSatisfied();
}
- /** Counting down, the constraint is satisfied if the count is zero. */
+ /** Counting down, the constraint is satisfied if the count is one. */
public static class CountdownConstraint implements Constraint {
private int count;
/** Initialize the count. */
public CountdownConstraint(int count) {
- if (count < 0) {
- throw new IllegalArgumentException(count + " = count < 0");
+ if (count < 1) {
+ throw new IllegalArgumentException(count + " = count < 1");
}
this.count = count;
}
/** Counting down, the constraint is satisfied if the count is zero. */
public boolean isSatisfied() {
- if (count > 0) {
+ if (count > 1) {
count--;
return false;
}
@@ -147,13 +147,14 @@
}
/** An action is fired if all the constraints are satisfied. */
- public static class ConstraintSatisfactionAction<T> implements Action<T> {
- private final Action<T> action;
+ public static class ConstraintSatisfactionAction<T, E extends Exception>
+ implements Action<T, E> {
+ private final Action<T, E> action;
private final Constraint[] constraints;
/** Constructor */
public ConstraintSatisfactionAction(
- Action<T> action, Constraint... constraints) {
+ Action<T, E> action, Constraint... constraints) {
this.action = action;
this.constraints = constraints;
}
@@ -163,7 +164,7 @@
* Short-circuit-and is used.
*/
@Override
- public final void run(T parameter) throws IOException {
+ public final void run(T parameter) throws E {
for(Constraint c : constraints) {
if (!c.isSatisfied()) {
return;
diff --git a/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj b/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj
index 6371135..dc10e1b 100644
--- a/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj
+++ b/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hdfs;
-import java.io.IOException;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fi.DataTransferTestUtil;
@@ -49,14 +47,10 @@
after(DataStreamer datastreamer) returning : pipelineInitNonAppend(datastreamer) {
LOG.info("FI: after pipelineInitNonAppend: hasError="
+ datastreamer.hasError + " errorIndex=" + datastreamer.errorIndex);
- try {
- if (datastreamer.hasError) {
- DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
- if (dtTest != null )
- dtTest.fiPipelineInitErrorNonAppend.run(datastreamer.errorIndex);
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
+ if (datastreamer.hasError) {
+ DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
+ if (dtTest != null)
+ dtTest.fiPipelineInitErrorNonAppend.run(datastreamer.errorIndex);
}
}
@@ -78,13 +72,9 @@
before(DataStreamer datastreamer) : pipelineErrorAfterInit(datastreamer) {
LOG.info("FI: before pipelineErrorAfterInit: errorIndex="
+ datastreamer.errorIndex);
- try {
- DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
- if (dtTest != null )
- dtTest.fiPipelineErrorAfterInit.run(datastreamer.errorIndex);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
+ if (dtTest != null )
+ dtTest.fiPipelineErrorAfterInit.run(datastreamer.errorIndex);
}
pointcut pipelineClose(DFSOutputStream out):
diff --git a/src/test/aop/org/apache/hadoop/hdfs/PipelinesTestUtil.java b/src/test/aop/org/apache/hadoop/hdfs/PipelinesTestUtil.java
index 22a0edb..268c02a 100644
--- a/src/test/aop/org/apache/hadoop/hdfs/PipelinesTestUtil.java
+++ b/src/test/aop/org/apache/hadoop/hdfs/PipelinesTestUtil.java
@@ -39,7 +39,7 @@
/**
* Storing acknowleged bytes num. action for fault injection tests
*/
- public static class ReceivedCheckAction implements FiTestUtil.Action<NodeBytes> {
+ public static class ReceivedCheckAction implements FiTestUtil.Action<NodeBytes, IOException> {
String name;
LinkedList<NodeBytes> rcv = ((PipelinesTest) getPipelineTest()).received;
LinkedList<NodeBytes> ack = ((PipelinesTest) getPipelineTest()).acked;
@@ -77,7 +77,7 @@
/**
* Storing acknowleged bytes num. action for fault injection tests
*/
- public static class AckedCheckAction implements FiTestUtil.Action<NodeBytes> {
+ public static class AckedCheckAction implements FiTestUtil.Action<NodeBytes, IOException> {
String name;
LinkedList<NodeBytes> rcv = ((PipelinesTest) getPipelineTest()).received;
LinkedList<NodeBytes> ack = ((PipelinesTest) getPipelineTest()).acked;
@@ -118,10 +118,10 @@
LinkedList<NodeBytes> received = new LinkedList<NodeBytes>();
LinkedList<NodeBytes> acked = new LinkedList<NodeBytes>();
- public final ActionContainer<NodeBytes> fiCallSetNumBytes =
- new ActionContainer<NodeBytes>();
- public final ActionContainer<NodeBytes> fiCallSetBytesAcked =
- new ActionContainer<NodeBytes>();
+ public final ActionContainer<NodeBytes, IOException> fiCallSetNumBytes =
+ new ActionContainer<NodeBytes, IOException>();
+ public final ActionContainer<NodeBytes, IOException> fiCallSetBytesAcked =
+ new ActionContainer<NodeBytes, IOException>();
private static boolean suspend = false;
private static long lastQueuedPacket = -1;
diff --git a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
index 62ef10a..a79c383 100644
--- a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
+++ b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
@@ -18,12 +18,14 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fi.DataTransferTestUtil;
+import org.apache.hadoop.fi.Pipeline;
import org.apache.hadoop.fi.PipelineTest;
import org.apache.hadoop.fi.ProbabilityModel;
import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
@@ -31,6 +33,7 @@
import org.apache.hadoop.hdfs.PipelinesTestUtil.PipelinesTest;
import org.apache.hadoop.hdfs.PipelinesTestUtil.NodeBytes;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
@@ -42,12 +45,7 @@
public static final Log LOG = LogFactory.getLog(BlockReceiverAspects.class);
pointcut callReceivePacket(BlockReceiver blockreceiver) :
- call (* OutputStream.write(..))
- && withincode (* BlockReceiver.receivePacket(..))
-// to further limit the application of this aspect a very narrow 'target' can be used as follows
-// && target(DataOutputStream)
- && !within(BlockReceiverAspects +)
- && this(blockreceiver);
+ call(* receivePacket(..)) && target(blockreceiver);
before(BlockReceiver blockreceiver
) throws IOException : callReceivePacket(blockreceiver) {
@@ -65,7 +63,30 @@
}
}
- // Pointcuts and advises for TestFiPipelines
+ pointcut callWritePacketToDisk(BlockReceiver blockreceiver) :
+ call(* writePacketToDisk(..)) && target(blockreceiver);
+
+ before(BlockReceiver blockreceiver
+ ) throws IOException : callWritePacketToDisk(blockreceiver) {
+ LOG.info("FI: callWritePacketToDisk");
+ DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
+ if (dtTest != null)
+ dtTest.fiCallWritePacketToDisk.run(
+ blockreceiver.getDataNode().getDatanodeRegistration());
+ }
+
+ pointcut afterDownstreamStatusRead(BlockReceiver.PacketResponder responder):
+ call(void PipelineAck.readFields(DataInput)) && this(responder);
+
+ after(BlockReceiver.PacketResponder responder)
+ throws IOException: afterDownstreamStatusRead(responder) {
+ final DataNode d = responder.receiver.getDataNode();
+ DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
+ if (dtTest != null)
+ dtTest.fiAfterDownstreamStatusRead.run(d.getDatanodeRegistration());
+ }
+
+ // Pointcuts and advises for TestFiPipelines
pointcut callSetNumBytes(BlockReceiver br, long offset) :
call (void ReplicaInPipelineInterface.setNumBytes(long))
&& withincode (int BlockReceiver.receivePacket(long, long, boolean, int, int))
@@ -97,12 +118,6 @@
&& args(acked)
&& this(pr);
- pointcut callSetBytesAckedLastDN(PacketResponder pr, long acked) :
- call (void ReplicaInPipelineInterface.setBytesAcked(long))
- && withincode (void PacketResponder.lastDataNodeRun())
- && args(acked)
- && this(pr);
-
after (PacketResponder pr, long acked) : callSetBytesAcked (pr, acked) {
PipelineTest pTest = DataTransferTestUtil.getDataTransferTest();
if (pTest == null) {
@@ -115,19 +130,7 @@
bytesAckedService((PipelinesTest)pTest, pr, acked);
}
}
- after (PacketResponder pr, long acked) : callSetBytesAckedLastDN (pr, acked) {
- PipelineTest pTest = DataTransferTestUtil.getDataTransferTest();
- if (pTest == null) {
- LOG.debug("FI: no pipeline has been found in acking");
- return;
- }
- LOG.debug("FI: Acked total bytes from (last DN): " +
- pr.receiver.datanode.dnRegistration.getStorageID() + ": " + acked);
- if (pTest instanceof PipelinesTest) {
- bytesAckedService((PipelinesTest)pTest, pr, acked);
- }
- }
-
+
private void bytesAckedService
(final PipelinesTest pTest, final PacketResponder pr, final long acked) {
NodeBytes nb = new NodeBytes(pr.receiver.datanode.dnRegistration, acked);
@@ -140,7 +143,7 @@
}
pointcut preventAckSending () :
- call (void ackReply(long))
+ call (void PipelineAck.write(DataOutput))
&& within (PacketResponder);
static int ackCounter = 0;
@@ -193,7 +196,7 @@
}
pointcut pipelineAck(BlockReceiver.PacketResponder packetresponder) :
- call (Status Status.read(DataInput))
+ call (void PipelineAck.readFields(DataInput))
&& this(packetresponder);
after(BlockReceiver.PacketResponder packetresponder) throws IOException
diff --git a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
index 1ea556b..d5b7131 100644
--- a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
+++ b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
@@ -19,6 +19,7 @@
import java.io.IOException;
+import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fi.DataTransferTestUtil;
import org.apache.hadoop.fi.FiTestUtil;
@@ -101,7 +102,7 @@
}
private static void runReceiverOpWriteBlockTest(String methodName,
- int errorIndex, Action<DatanodeID> a) throws IOException {
+ int errorIndex, Action<DatanodeID, IOException> a) throws IOException {
FiTestUtil.LOG.info("Running " + methodName + " ...");
final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
.initTest();
@@ -113,7 +114,7 @@
}
private static void runStatusReadTest(String methodName, int errorIndex,
- Action<DatanodeID> a) throws IOException {
+ Action<DatanodeID, IOException> a) throws IOException {
FiTestUtil.LOG.info("Running " + methodName + " ...");
final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
.initTest();
@@ -124,11 +125,11 @@
Assert.assertTrue(t.isSuccess());
}
- private static void runCallReceivePacketTest(String methodName,
- int errorIndex, Action<DatanodeID> a) throws IOException {
+ private static void runCallWritePacketToDisk(String methodName,
+ int errorIndex, Action<DatanodeID, IOException> a) throws IOException {
FiTestUtil.LOG.info("Running " + methodName + " ...");
final DataTransferTest t = (DataTransferTest)DataTransferTestUtil.initTest();
- t.fiCallReceivePacket.set(a);
+ t.fiCallWritePacketToDisk.set(a);
t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, errorIndex));
write1byte(methodName);
Assert.assertTrue(t.isSuccess());
@@ -280,7 +281,7 @@
@Test
public void pipeline_Fi_14() throws IOException {
final String methodName = FiTestUtil.getMethodName();
- runCallReceivePacketTest(methodName, 0, new DoosAction(methodName, 0));
+ runCallWritePacketToDisk(methodName, 0, new DoosAction(methodName, 0));
}
/**
@@ -291,7 +292,7 @@
@Test
public void pipeline_Fi_15() throws IOException {
final String methodName = FiTestUtil.getMethodName();
- runCallReceivePacketTest(methodName, 1, new DoosAction(methodName, 1));
+ runCallWritePacketToDisk(methodName, 1, new DoosAction(methodName, 1));
}
/**
@@ -302,11 +303,11 @@
@Test
public void pipeline_Fi_16() throws IOException {
final String methodName = FiTestUtil.getMethodName();
- runCallReceivePacketTest(methodName, 2, new DoosAction(methodName, 2));
+ runCallWritePacketToDisk(methodName, 2, new DoosAction(methodName, 2));
}
private static void runPipelineCloseTest(String methodName,
- Action<DatanodeID> a) throws IOException {
+ Action<DatanodeID, IOException> a) throws IOException {
FiTestUtil.LOG.info("Running " + methodName + " ...");
final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
.initTest();
@@ -324,7 +325,7 @@
final DataTransferTest t = (DataTransferTest)DataTransferTestUtil.initTest();
final MarkerConstraint marker = new MarkerConstraint(name);
t.fiPipelineClose.set(new DatanodeMarkingAction(name, i, marker));
- t.fiPipelineAck.set(new ConstraintSatisfactionAction<DatanodeID>(a, marker));
+ t.fiPipelineAck.set(new ConstraintSatisfactionAction<DatanodeID, IOException>(a, marker));
write1byte(name);
}
@@ -442,7 +443,7 @@
}
private static void runBlockFileCloseTest(String methodName,
- Action<DatanodeID> a) throws IOException {
+ Action<DatanodeID, IOException> a) throws IOException {
FiTestUtil.LOG.info("Running " + methodName + " ...");
final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
.initTest();
diff --git a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java
new file mode 100644
index 0000000..2c3280b
--- /dev/null
+++ b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fi.DataTransferTestUtil;
+import org.apache.hadoop.fi.FiTestUtil;
+import org.apache.hadoop.fi.DataTransferTestUtil.CountdownDoosAction;
+import org.apache.hadoop.fi.DataTransferTestUtil.CountdownOomAction;
+import org.apache.hadoop.fi.DataTransferTestUtil.CountdownSleepAction;
+import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
+import org.apache.hadoop.fi.DataTransferTestUtil.SleepAction;
+import org.apache.hadoop.fi.DataTransferTestUtil.VerificationAction;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.datanode.BlockReceiver;
+import org.apache.log4j.Level;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test DataTransferProtocol with fault injection. */
+public class TestFiDataTransferProtocol2 {
+ static final short REPLICATION = 3;
+ static final long BLOCKSIZE = 1L * (1L << 20);
+ static final int PACKET_SIZE = 1024;
+ static final int MIN_N_PACKET = 3;
+ static final int MAX_N_PACKET = 10;
+
+ static final Configuration conf = new Configuration();
+ static {
+ conf.setInt("dfs.datanode.handler.count", 1);
+ conf.setInt("dfs.replication", REPLICATION);
+ conf.setInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, PACKET_SIZE);
+ conf.setInt("dfs.socket.timeout", 5000);
+ }
+
+ static final byte[] bytes = new byte[MAX_N_PACKET * PACKET_SIZE];
+ static final byte[] toRead = new byte[MAX_N_PACKET * PACKET_SIZE];
+
+ static private FSDataOutputStream createFile(FileSystem fs, Path p
+ ) throws IOException {
+ return fs.create(p, true, fs.getConf().getInt("io.file.buffer.size", 4096),
+ REPLICATION, BLOCKSIZE);
+ }
+
+ {
+ ((Log4JLogger) BlockReceiver.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL);
+ }
+ /**
+ * 1. create files with dfs
+ * 2. write MIN_N_PACKET to MAX_N_PACKET packets
+ * 3. close file
+ * 4. open the same file
+ * 5. read the bytes and compare results
+ */
+ private static void writeSeveralPackets(String methodName) throws IOException {
+ final Random r = FiTestUtil.RANDOM.get();
+ final int nPackets = FiTestUtil.nextRandomInt(MIN_N_PACKET, MAX_N_PACKET + 1);
+ final int lastPacketSize = FiTestUtil.nextRandomInt(1, PACKET_SIZE + 1);
+ final int size = (nPackets - 1)*PACKET_SIZE + lastPacketSize;
+
+ FiTestUtil.LOG.info("size=" + size + ", nPackets=" + nPackets
+ + ", lastPacketSize=" + lastPacketSize);
+
+ final MiniDFSCluster cluster = new MiniDFSCluster(conf, REPLICATION, true, null);
+ final FileSystem dfs = cluster.getFileSystem();
+ try {
+ final Path p = new Path("/" + methodName + "/foo");
+ final FSDataOutputStream out = createFile(dfs, p);
+
+ final long seed = r.nextLong();
+ final Random ran = new Random(seed);
+ ran.nextBytes(bytes);
+ out.write(bytes, 0, size);
+ out.close();
+
+ final FSDataInputStream in = dfs.open(p);
+ int totalRead = 0;
+ int nRead = 0;
+ while ((nRead = in.read(toRead, totalRead, size - totalRead)) > 0) {
+ totalRead += nRead;
+ }
+ Assert.assertEquals("Cannot read file.", size, totalRead);
+ for (int i = 0; i < size; i++) {
+ Assert.assertTrue("File content differ.", bytes[i] == toRead[i]);
+ }
+ }
+ finally {
+ dfs.close();
+ cluster.shutdown();
+ }
+ }
+
+ private static void initSlowDatanodeTest(DataTransferTest t, SleepAction a)
+ throws IOException {
+ t.fiCallReceivePacket.set(a);
+ t.fiReceiverOpWriteBlock.set(a);
+ t.fiStatusRead.set(a);
+ }
+
+ private void runTest17_19(String methodName, int dnIndex)
+ throws IOException {
+ FiTestUtil.LOG.info("Running " + methodName + " ...");
+ final int maxSleep = 3000;
+ final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+ .initTest();
+ initSlowDatanodeTest(t, new SleepAction(methodName, 0, 0, maxSleep));
+ initSlowDatanodeTest(t, new SleepAction(methodName, 1, 0, maxSleep));
+ initSlowDatanodeTest(t, new SleepAction(methodName, 2, 0, maxSleep));
+ t.fiCallWritePacketToDisk.set(new CountdownDoosAction(methodName, dnIndex, 3));
+ t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, dnIndex));
+ writeSeveralPackets(methodName);
+ Assert.assertTrue(t.isSuccess());
+ }
+
+ private void runTest29_30(String methodName, int dnIndex) throws IOException {
+ FiTestUtil.LOG.info("Running " + methodName + " ...");
+ final int maxSleep = 3000;
+ final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+ .initTest();
+ initSlowDatanodeTest(t, new SleepAction(methodName, 0, 0, maxSleep));
+ initSlowDatanodeTest(t, new SleepAction(methodName, 1, 0, maxSleep));
+ initSlowDatanodeTest(t, new SleepAction(methodName, 2, 0, maxSleep));
+ t.fiAfterDownstreamStatusRead.set(new CountdownOomAction(methodName, dnIndex, 3));
+ t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, dnIndex));
+ writeSeveralPackets(methodName);
+ Assert.assertTrue(t.isSuccess());
+ }
+
+ private void runTest34_35(String methodName, int dnIndex) throws IOException {
+ FiTestUtil.LOG.info("Running " + methodName + " ...");
+ final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+ .initTest();
+ t.fiAfterDownstreamStatusRead.set(new CountdownSleepAction(methodName, dnIndex, 0, 3));
+ t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, dnIndex));
+ writeSeveralPackets(methodName);
+ Assert.assertTrue(t.isSuccess());
+ }
+ /**
+ * Streaming:
+ * Randomize datanode speed, write several packets,
+ * DN0 throws a DiskOutOfSpaceError when it writes the third packet to disk.
+ * Client gets an IOException and determines DN0 bad.
+ */
+ @Test
+ public void pipeline_Fi_17() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ runTest17_19(methodName, 0);
+ }
+
+ /**
+ * Streaming:
+ * Randomize datanode speed, write several packets,
+ * DN1 throws a DiskOutOfSpaceError when it writes the third packet to disk.
+ * Client gets an IOException and determines DN1 bad.
+ */
+ @Test
+ public void pipeline_Fi_18() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ runTest17_19(methodName, 1);
+ }
+
+ /**
+ * Streaming:
+ * Randomize datanode speed, write several packets,
+ * DN2 throws a DiskOutOfSpaceError when it writes the third packet to disk.
+ * Client gets an IOException and determines DN2 bad.
+ */
+ @Test
+ public void pipeline_Fi_19() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ runTest17_19(methodName, 2);
+ }
+
+ /**
+ * Streaming: Client writes several packets with DN0 very slow. Client
+ * finishes write successfully.
+ */
+ @Test
+ public void pipeline_Fi_20() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ FiTestUtil.LOG.info("Running " + methodName + " ...");
+ final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+ .initTest();
+ initSlowDatanodeTest(t, new SleepAction(methodName, 0, 3000));
+ writeSeveralPackets(methodName);
+ }
+
+ /**
+ * Streaming: Client writes several packets with DN1 very slow. Client
+ * finishes write successfully.
+ */
+ @Test
+ public void pipeline_Fi_21() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ FiTestUtil.LOG.info("Running " + methodName + " ...");
+ final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+ .initTest();
+ initSlowDatanodeTest(t, new SleepAction(methodName, 1, 3000));
+ writeSeveralPackets(methodName);
+ }
+
+ /**
+ * Streaming: Client writes several packets with DN2 very slow. Client
+ * finishes write successfully.
+ */
+ @Test
+ public void pipeline_Fi_22() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ FiTestUtil.LOG.info("Running " + methodName + " ...");
+ final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+ .initTest();
+ initSlowDatanodeTest(t, new SleepAction(methodName, 2, 3000));
+ writeSeveralPackets(methodName);
+ }
+
+ /**
+ * Streaming: Randomize datanode speed, write several packets, DN1 throws a
+ * OutOfMemoryException when it receives the ack of the third packet from DN2.
+ * Client gets an IOException and determines DN1 bad.
+ */
+ @Test
+ public void pipeline_Fi_29() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ runTest29_30(methodName, 1);
+ }
+
+ /**
+ * Streaming: Randomize datanode speed, write several packets, DN0 throws a
+ * OutOfMemoryException when it receives the ack of the third packet from DN1.
+ * Client gets an IOException and determines DN0 bad.
+ */
+ @Test
+ public void pipeline_Fi_30() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ runTest29_30(methodName, 0);
+ }
+
+ /**
+ * Streaming: Write several packets, DN1 never responses when it receives the
+ * ack of the third packet from DN2. Client gets an IOException and determines
+ * DN1 bad.
+ */
+ @Test
+ public void pipeline_Fi_34() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ runTest34_35(methodName, 1);
+ }
+
+ /**
+ * Streaming: Write several packets, DN0 never responses when it receives the
+ * ack of the third packet from DN1. Client gets an IOException and determines
+ * DN0 bad.
+ */
+ @Test
+ public void pipeline_Fi_35() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ runTest34_35(methodName, 0);
+ }
+}
\ No newline at end of file
diff --git a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java
new file mode 100644
index 0000000..96513a9
--- /dev/null
+++ b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fi.DataTransferTestUtil;
+import org.apache.hadoop.fi.FiTestUtil;
+import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
+import org.apache.hadoop.fi.DataTransferTestUtil.SleepAction;
+import org.apache.hadoop.fi.FiTestUtil.Action;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test DataTransferProtocol with fault injection. */
+public class TestFiPipelineClose {
+ static final short REPLICATION = 3;
+ static final long BLOCKSIZE = 1L * (1L << 20);
+
+ static final Configuration conf = new HdfsConfiguration();
+ static {
+ conf.setInt("dfs.datanode.handler.count", 1);
+ conf.setInt("dfs.replication", REPLICATION);
+ conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000);
+ }
+
+ static private FSDataOutputStream createFile(FileSystem fs, Path p
+ ) throws IOException {
+ return fs.create(p, true, fs.getConf().getInt("io.file.buffer.size", 4096),
+ REPLICATION, BLOCKSIZE);
+ }
+
+ /**
+ * 1. create files with dfs
+ * 2. write 1 byte
+ * 3. close file
+ * 4. open the same file
+ * 5. read the 1 byte and compare results
+ */
+ private static void write1byte(String methodName) throws IOException {
+ final MiniDFSCluster cluster = new MiniDFSCluster(conf, REPLICATION, true,
+ null);
+ final FileSystem dfs = cluster.getFileSystem();
+ try {
+ final Path p = new Path("/" + methodName + "/foo");
+ final FSDataOutputStream out = createFile(dfs, p);
+ out.write(1);
+ out.close();
+
+ final FSDataInputStream in = dfs.open(p);
+ final int b = in.read();
+ in.close();
+ Assert.assertEquals(1, b);
+ }
+ finally {
+ dfs.close();
+ cluster.shutdown();
+ }
+ }
+
+ private static void runPipelineCloseTest(String methodName,
+ Action<DatanodeID, IOException> a) throws IOException {
+ FiTestUtil.LOG.info("Running " + methodName + " ...");
+ final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+ .initTest();
+ t.fiPipelineClose.set(a);
+ write1byte(methodName);
+ }
+
+ /**
+ * Pipeline close:
+ * DN0 never responses after received close request from client.
+ * Client gets an IOException and determine DN0 bad.
+ */
+ @Test
+ public void pipeline_Fi_36() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ runPipelineCloseTest(methodName, new SleepAction(methodName, 0, 0));
+ }
+
+ /**
+ * Pipeline close:
+ * DN1 never responses after received close request from client.
+ * Client gets an IOException and determine DN1 bad.
+ */
+ @Test
+ public void pipeline_Fi_37() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ runPipelineCloseTest(methodName, new SleepAction(methodName, 1, 0));
+ }
+
+ /**
+ * Pipeline close:
+ * DN2 never responses after received close request from client.
+ * Client gets an IOException and determine DN2 bad.
+ */
+ @Test
+ public void pipeline_Fi_38() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ runPipelineCloseTest(methodName, new SleepAction(methodName, 2, 0));
+ }
+}
diff --git a/src/test/hdfs/org/apache/hadoop/cli/CmdFactoryDFS.java b/src/test/hdfs/org/apache/hadoop/cli/CmdFactoryDFS.java
new file mode 100644
index 0000000..4d615f9
--- /dev/null
+++ b/src/test/hdfs/org/apache/hadoop/cli/CmdFactoryDFS.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cli;
+
+import org.apache.hadoop.cli.util.CLICommands;
+import org.apache.hadoop.cli.util.CLITestData;
+import org.apache.hadoop.cli.util.CmdFactory;
+import org.apache.hadoop.cli.util.CommandExecutor;
+import org.apache.hadoop.hdfs.tools.DFSAdmin;
+
+public abstract class CmdFactoryDFS extends CmdFactory {
+ public static CommandExecutor getCommandExecutor(CLITestData.TestCmd cmd,
+ String tag)
+ throws IllegalArgumentException {
+ CommandExecutor executor;
+ switch (cmd.getType()) {
+ case DFSADMIN:
+ executor = new CLICommands.FSCmdExecutor(tag, new DFSAdmin());
+ break;
+ default:
+ executor = CmdFactory.getCommandExecutor(cmd, tag);
+ }
+ return executor;
+ }
+}
diff --git a/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java b/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java
index d91ada9..ffc63a6 100644
--- a/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java
+++ b/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java
@@ -18,27 +18,27 @@
package org.apache.hadoop.cli;
-import org.apache.hadoop.cli.util.CommandExecutor;
import org.apache.hadoop.cli.util.CLITestData.TestCmd;
import org.apache.hadoop.cli.util.CommandExecutor.Result;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.security.authorize.PolicyProvider;
-import org.apache.hadoop.util.ToolRunner;
+import org.junit.After;
+import static org.junit.Assert.assertTrue;
+import org.junit.Before;
+import org.junit.Test;
-public class TestHDFSCLI extends TestCLI{
+public class TestHDFSCLI extends CLITestHelper {
protected MiniDFSCluster dfsCluster = null;
protected DistributedFileSystem dfs = null;
protected String namenode = null;
- protected DFSAdminCmdExecutor dfsAdmCmdExecutor = null;
- protected FSCmdExecutor fsCmdExecutor = null;
+ @Before
+ @Override
public void setUp() throws Exception {
super.setUp();
conf.setClass(PolicyProvider.POLICY_PROVIDER_CONFIG,
@@ -57,8 +57,6 @@
namenode = conf.get(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "file:///");
username = System.getProperty("user.name");
- dfsAdmCmdExecutor = new DFSAdminCmdExecutor(namenode);
- fsCmdExecutor = new FSCmdExecutor(namenode);
FileSystem fs = dfsCluster.getFileSystem();
assertTrue("Not a HDFS: "+fs.getUri(),
@@ -66,10 +64,13 @@
dfs = (DistributedFileSystem) fs;
}
+ @Override
protected String getTestFile() {
return "testHDFSConf.xml";
}
+ @After
+ @Override
public void tearDown() throws Exception {
dfs.close();
dfsCluster.shutdown();
@@ -77,6 +78,7 @@
super.tearDown();
}
+ @Override
protected String expandCommand(final String cmd) {
String expCmd = cmd;
expCmd = expCmd.replaceAll("NAMENODE", namenode);
@@ -84,43 +86,14 @@
return expCmd;
}
+ @Override
protected Result execute(TestCmd cmd) throws Exception {
- CommandExecutor executor = null;
- switch(cmd.getType()) {
- case DFSADMIN:
- executor = dfsAdmCmdExecutor;
- break;
- case FS:
- executor = fsCmdExecutor;
- break;
- default:
- throw new Exception("Unknow type of Test command:"+ cmd.getType());
- }
- return executor.executeCommand(cmd.getCmd());
+ return CmdFactoryDFS.getCommandExecutor(cmd, namenode).executeCommand(cmd.getCmd());
}
-
- public static class DFSAdminCmdExecutor extends CommandExecutor {
- private String namenode = null;
- public DFSAdminCmdExecutor(String namenode) {
- this.namenode = namenode;
- }
-
- protected void execute(final String cmd) throws Exception{
- DFSAdmin shell = new DFSAdmin();
- String[] args = getCommandAsArgs(cmd, "NAMENODE", this.namenode);
- ToolRunner.run(shell, args);
- }
- }
-
- public static class FSCmdExecutor extends CommandExecutor {
- private String namenode = null;
- public FSCmdExecutor(String namenode) {
- this.namenode = namenode;
- }
- protected void execute(final String cmd) throws Exception{
- FsShell shell = new FsShell();
- String[] args = getCommandAsArgs(cmd, "NAMENODE", this.namenode);
- ToolRunner.run(shell, args);
- }
+
+ @Test
+ @Override
+ public void testAll () {
+ super.testAll();
}
}
diff --git a/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml b/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml
index e2ec78e..847e047 100644
--- a/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml
+++ b/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml
@@ -15187,27 +15187,43 @@
<comparators>
<comparator>
<type>RegexpComparator</type>
- <expected-output>^-du <path>:( |\t)*Show the amount of space, in bytes, used by the files that( )*</expected-output>
+ <expected-output>^-du \[-s\] \[-h\] <path>:\s+Show the amount of space, in bytes, used by the files that\s*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
- <expected-output>^( |\t)*match the specified file pattern.( )*Equivalent to the unix( )*</expected-output>
+ <expected-output>^\s*match the specified file pattern. The following flags are optional:</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
- <expected-output>^( |\t)*command "du -sb <path>/\*" in case of a directory,( )*</expected-output>
+ <expected-output>^\s*-s\s*Rather than showing the size of each individual file that</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
- <expected-output>^( |\t)*and to "du -b <path>" in case of a file.( )*</expected-output>
+ <expected-output>^\s*matches the pattern, shows the total \(summary\) size.</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
- <expected-output>^( |\t)*The output is in the form( )*</expected-output>
+ <expected-output>^\s*-h\s*Formats the sizes of files in a human-readable fashion</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
- <expected-output>^( |\t)*name\(full path\) size \(in bytes\)( )*</expected-output>
+ <expected-output>\s*rather than a number of bytes.</expected-output>
+ </comparator>
+ <comparator>
+ <type>RegexpComparator</type>
+ <expected-output>^\s*Note that, even without the -s option, this only shows size summaries</expected-output>
+ </comparator>
+ <comparator>
+ <type>RegexpComparator</type>
+ <expected-output>^\s*one level deep into a directory.</expected-output>
+ </comparator>
+ <comparator>
+ <type>RegexpComparator</type>
+ <expected-output>^\s*The output is in the form </expected-output>
+ </comparator>
+ <comparator>
+ <type>RegexpComparator</type>
+ <expected-output>^\s*size\s+name\(full path\)\s*</expected-output>
</comparator>
</comparators>
</test>
@@ -15226,15 +15242,7 @@
</comparator>
<comparator>
<type>RegexpComparator</type>
- <expected-output>^( |\t)*match the specified file pattern. Equivalent to the unix( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*command "du -sb" The output is in the form( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*name\(full path\) size \(in bytes\)( )*</expected-output>
+ <expected-output>^( |\t)*match the specified file pattern. This is equivalent to -du -s above.</expected-output>
</comparator>
</comparators>
</test>
@@ -16680,7 +16688,7 @@
<!-- No cleanup -->
</cleanup-commands>
<comparators>
- <!-- miniDFS cluster started in TestCLI is set to match this output -->
+ <!-- miniDFS cluster started in CLITestHelper is set to match this output -->
<comparator>
<type>RegexpAcrossOutputComparator</type>
<expected-output>^Rack: \/rack1\s*127\.0\.0\.1:\d+\s\(localhost.*\)\s*127\.0\.0\.1:\d+\s\(localhost.*\)</expected-output>
@@ -16868,5 +16876,144 @@
</comparators>
</test>
+
+ <test> <!--Tested -->
+ <description>Verifying chmod operation is not permitted in safemode</description>
+ <test-commands>
+ <command>-fs NAMENODE -mkdir /test </command>
+ <command>-fs NAMENODE -touchz /test/file1 </command>
+ <dfs-admin-command>-fs NAMENODE -safemode enter </dfs-admin-command>
+ <command>-fs NAMENODE -chmod 777 /test/file1 </command>
+ </test-commands>
+ <cleanup-commands>
+ <dfs-admin-command>-fs NAMENODE -safemode leave </dfs-admin-command>
+ <dfs-admin-command>-fs NAMENODE -rmr /test </dfs-admin-command>
+ </cleanup-commands>
+ <comparators>
+ <comparator>
+ <type>SubstringComparator</type>
+ <expected-output>Cannot set permission for /test/file1. Name node is in safe mode.</expected-output>
+ </comparator>
+ </comparators>
+ </test>
+ <test> <!--Tested -->
+ <description>Verifying chown operation is not permitted in safemode</description>
+ <test-commands>
+ <command>-fs NAMENODE -mkdir /test </command>
+ <command>-fs NAMENODE -touchz /test/file1 </command>
+ <dfs-admin-command>-fs NAMENODE -safemode enter </dfs-admin-command>
+ <command>-fs NAMENODE -chown root /test/file1 </command>
+ </test-commands>
+ <cleanup-commands>
+ <dfs-admin-command>-fs NAMENODE -safemode leave </dfs-admin-command>
+ <dfs-admin-command>-fs NAMENODE -rmr /test </dfs-admin-command>
+ </cleanup-commands>
+ <comparators>
+ <comparator>
+ <type>SubstringComparator</type>
+ <expected-output>Cannot set owner for /test/file1. Name node is in safe mode.</expected-output>
+ </comparator>
+ </comparators>
+ </test>
+
+ <test> <!--Tested -->
+ <description>Verifying chgrp operation is not permitted in safemode</description>
+ <test-commands>
+ <command>-fs NAMENODE -mkdir /test </command>
+ <command>-fs NAMENODE -touchz /test/file1 </command>
+ <dfs-admin-command>-fs NAMENODE -safemode enter </dfs-admin-command>
+ <command>-fs NAMENODE -chgrp newgroup /test/file1 </command>
+ </test-commands>
+ <cleanup-commands>
+ <dfs-admin-command>-fs NAMENODE -safemode leave </dfs-admin-command>
+ <dfs-admin-command>-fs NAMENODE -rmr /test </dfs-admin-command>
+ </cleanup-commands>
+ <comparators>
+ <comparator>
+ <type>SubstringComparator</type>
+ <expected-output>Cannot set owner for /test/file1. Name node is in safe mode.</expected-output>
+ </comparator>
+ </comparators>
+ </test>
+
+
+ <test> <!--Tested -->
+ <description>Verifying setQuota operation is not permitted in safemode</description>
+ <test-commands>
+ <command>-fs NAMENODE -mkdir /test </command>
+ <dfs-admin-command>-fs NAMENODE -safemode enter </dfs-admin-command>
+ <dfs-admin-command>-fs NAMENODE -setQuota 100 /test </dfs-admin-command>
+ </test-commands>
+ <cleanup-commands>
+ <dfs-admin-command>-fs NAMENODE -safemode leave </dfs-admin-command>
+ <dfs-admin-command>-fs NAMENODE -rmr /test </dfs-admin-command>
+ </cleanup-commands>
+ <comparators>
+ <comparator>
+ <type>SubstringComparator</type>
+ <expected-output>setQuota: org.apache.hadoop.hdfs.server.namenode.SafeModeException: Cannot set quota on /test. Name node is in safe mode.</expected-output>
+ </comparator>
+ </comparators>
+ </test>
+
+ <test> <!--Tested -->
+ <description>Verifying clrQuota operation is not permitted in safemode</description>
+ <test-commands>
+ <command>-fs NAMENODE -mkdir /test </command>
+ <dfs-admin-command>-fs NAMENODE -safemode enter </dfs-admin-command>
+ <dfs-admin-command>-fs NAMENODE -clrQuota /test </dfs-admin-command>
+ </test-commands>
+ <cleanup-commands>
+ <dfs-admin-command>-fs NAMENODE -safemode leave </dfs-admin-command>
+ <dfs-admin-command>-fs NAMENODE -rmr /test </dfs-admin-command>
+ </cleanup-commands>
+ <comparators>
+ <comparator>
+ <type>SubstringComparator</type>
+ <expected-output>clrQuota: org.apache.hadoop.hdfs.server.namenode.SafeModeException: Cannot set quota on /test. Name node is in safe mode.</expected-output>
+ </comparator>
+ </comparators>
+ </test>
+
+
+ <test> <!--Tested -->
+ <description>Verifying setSpaceQuota operation is not permitted in safemode</description>
+ <test-commands>
+ <command>-fs NAMENODE -mkdir /test </command>
+ <dfs-admin-command>-fs NAMENODE -safemode enter </dfs-admin-command>
+ <dfs-admin-command>-fs NAMENODE -setSpaceQuota 100 /test </dfs-admin-command>
+ </test-commands>
+ <cleanup-commands>
+ <dfs-admin-command>-fs NAMENODE -safemode leave </dfs-admin-command>
+ <dfs-admin-command>-fs NAMENODE -rmr /test </dfs-admin-command>
+ </cleanup-commands>
+ <comparators>
+ <comparator>
+ <type>SubstringComparator</type>
+ <expected-output>setSpaceQuota: org.apache.hadoop.hdfs.server.namenode.SafeModeException: Cannot set quota on /test. Name node is in safe mode.</expected-output>
+ </comparator>
+ </comparators>
+ </test>
+
+ <test> <!--Tested -->
+ <description>Verifying clrSpaceQuota operation is not permitted in safemode</description>
+ <test-commands>
+ <command>-fs NAMENODE -mkdir /test </command>
+ <dfs-admin-command>-fs NAMENODE -safemode enter </dfs-admin-command>
+ <dfs-admin-command>-fs NAMENODE -clrSpaceQuota /test </dfs-admin-command>
+ </test-commands>
+ <cleanup-commands>
+ <dfs-admin-command>-fs NAMENODE -safemode leave </dfs-admin-command>
+ <dfs-admin-command>-fs NAMENODE -rmr /test </dfs-admin-command>
+ </cleanup-commands>
+ <comparators>
+ <comparator>
+ <type>SubstringComparator</type>
+ <expected-output>clrSpaceQuota: org.apache.hadoop.hdfs.server.namenode.SafeModeException: Cannot set quota on /test. Name node is in safe mode.</expected-output>
+ </comparator>
+ </comparators>
+ </test>
+
+
</tests>
</configuration>
diff --git a/src/test/hdfs/org/apache/hadoop/fs/TestHDFSFileContextMainOperations.java b/src/test/hdfs/org/apache/hadoop/fs/TestHDFSFileContextMainOperations.java
index 3de3316..7d94d12 100644
--- a/src/test/hdfs/org/apache/hadoop/fs/TestHDFSFileContextMainOperations.java
+++ b/src/test/hdfs/org/apache/hadoop/fs/TestHDFSFileContextMainOperations.java
@@ -37,6 +37,8 @@
import org.junit.BeforeClass;
import org.junit.Test;
+import static org.apache.hadoop.fs.FileContextTestHelper.*;
+
public class TestHDFSFileContextMainOperations extends
FileContextMainOperationsBaseTest {
private static MiniDFSCluster cluster;
@@ -99,10 +101,10 @@
@Test
public void testOldRenameWithQuota() throws Exception {
DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
- Path src1 = getTestRootPath("test/testOldRenameWithQuota/srcdir/src1");
- Path src2 = getTestRootPath("test/testOldRenameWithQuota/srcdir/src2");
- Path dst1 = getTestRootPath("test/testOldRenameWithQuota/dstdir/dst1");
- Path dst2 = getTestRootPath("test/testOldRenameWithQuota/dstdir/dst2");
+ Path src1 = getTestRootPath(fc, "test/testOldRenameWithQuota/srcdir/src1");
+ Path src2 = getTestRootPath(fc, "test/testOldRenameWithQuota/srcdir/src2");
+ Path dst1 = getTestRootPath(fc, "test/testOldRenameWithQuota/dstdir/dst1");
+ Path dst2 = getTestRootPath(fc, "test/testOldRenameWithQuota/dstdir/dst2");
createFile(src1);
createFile(src2);
fs.setQuota(src1.getParent(), FSConstants.QUOTA_DONT_SET,
@@ -134,10 +136,10 @@
@Test
public void testRenameWithQuota() throws Exception {
DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
- Path src1 = getTestRootPath("test/testRenameWithQuota/srcdir/src1");
- Path src2 = getTestRootPath("test/testRenameWithQuota/srcdir/src2");
- Path dst1 = getTestRootPath("test/testRenameWithQuota/dstdir/dst1");
- Path dst2 = getTestRootPath("test/testRenameWithQuota/dstdir/dst2");
+ Path src1 = getTestRootPath(fc, "test/testRenameWithQuota/srcdir/src1");
+ Path src2 = getTestRootPath(fc, "test/testRenameWithQuota/srcdir/src2");
+ Path dst1 = getTestRootPath(fc, "test/testRenameWithQuota/dstdir/dst1");
+ Path dst2 = getTestRootPath(fc, "test/testRenameWithQuota/dstdir/dst2");
createFile(src1);
createFile(src2);
fs.setQuota(src1.getParent(), FSConstants.QUOTA_DONT_SET,
@@ -184,7 +186,7 @@
@Test
public void testRenameRoot() throws Exception {
- Path src = getTestRootPath("test/testRenameRoot/srcdir/src1");
+ Path src = getTestRootPath(fc, "test/testRenameRoot/srcdir/src1");
Path dst = new Path("/");
createFile(src);
rename(src, dst, true, false, true, Rename.OVERWRITE);
@@ -198,8 +200,8 @@
@Test
public void testEditsLogOldRename() throws Exception {
DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
- Path src1 = getTestRootPath("testEditsLogOldRename/srcdir/src1");
- Path dst1 = getTestRootPath("testEditsLogOldRename/dstdir/dst1");
+ Path src1 = getTestRootPath(fc, "testEditsLogOldRename/srcdir/src1");
+ Path dst1 = getTestRootPath(fc, "testEditsLogOldRename/dstdir/dst1");
createFile(src1);
fs.mkdirs(dst1.getParent());
createFile(dst1);
@@ -214,8 +216,8 @@
// loaded from the edits log
restartCluster();
fs = (DistributedFileSystem)cluster.getFileSystem();
- src1 = getTestRootPath("testEditsLogOldRename/srcdir/src1");
- dst1 = getTestRootPath("testEditsLogOldRename/dstdir/dst1");
+ src1 = getTestRootPath(fc, "testEditsLogOldRename/srcdir/src1");
+ dst1 = getTestRootPath(fc, "testEditsLogOldRename/dstdir/dst1");
Assert.assertFalse(fs.exists(src1)); // ensure src1 is already renamed
Assert.assertTrue(fs.exists(dst1)); // ensure rename dst exists
}
@@ -227,8 +229,8 @@
@Test
public void testEditsLogRename() throws Exception {
DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
- Path src1 = getTestRootPath("testEditsLogRename/srcdir/src1");
- Path dst1 = getTestRootPath("testEditsLogRename/dstdir/dst1");
+ Path src1 = getTestRootPath(fc, "testEditsLogRename/srcdir/src1");
+ Path dst1 = getTestRootPath(fc, "testEditsLogRename/dstdir/dst1");
createFile(src1);
fs.mkdirs(dst1.getParent());
createFile(dst1);
@@ -243,8 +245,8 @@
// loaded from the edits log
restartCluster();
fs = (DistributedFileSystem)cluster.getFileSystem();
- src1 = getTestRootPath("testEditsLogRename/srcdir/src1");
- dst1 = getTestRootPath("testEditsLogRename/dstdir/dst1");
+ src1 = getTestRootPath(fc, "testEditsLogRename/srcdir/src1");
+ dst1 = getTestRootPath(fc, "testEditsLogRename/dstdir/dst1");
Assert.assertFalse(fs.exists(src1)); // ensure src1 is already renamed
Assert.assertTrue(fs.exists(dst1)); // ensure rename dst exists
}
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java b/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
index 9f12c0b..caeb6f7 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -25,11 +25,15 @@
import java.io.IOException;
import java.net.URL;
import java.net.URLConnection;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -38,8 +42,8 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.security.BlockAccessToken;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation;
@@ -294,4 +298,84 @@
new UnixUserGroupInformation(username, groups));
return c;
}
+
+
+ /**
+ * modify conf to contain fake users with fake group
+ * @param conf to modify
+ * @throws IOException
+ */
+ static public void updateConfigurationWithFakeUsername(Configuration conf) {
+ // fake users
+ String username="fakeUser1";
+ String[] groups = {"fakeGroup1"};
+ // mapping to groups
+ Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
+ u2g_map.put(username, groups);
+ updateConfWithFakeGroupMapping(conf, u2g_map);
+
+ UnixUserGroupInformation.saveToConf(conf,
+ UnixUserGroupInformation.UGI_PROPERTY_NAME,
+ new UnixUserGroupInformation(username, groups));
+ }
+
+ /**
+ * mock class to get group mapping for fake users
+ *
+ */
+ static class MockUnixGroupsMapping extends ShellBasedUnixGroupsMapping {
+ static Map<String, String []> fakeUser2GroupsMap;
+ private static final List<String> defaultGroups;
+ static {
+ defaultGroups = new ArrayList<String>(1);
+ defaultGroups.add("supergroup");
+ fakeUser2GroupsMap = new HashMap<String, String[]>();
+ }
+
+ @Override
+ public List<String> getGroups(String user) throws IOException {
+ boolean found = false;
+
+ // check to see if this is one of fake users
+ List<String> l = new ArrayList<String>();
+ for(String u : fakeUser2GroupsMap.keySet()) {
+ if(user.equals(u)) {
+ found = true;
+ for(String gr : fakeUser2GroupsMap.get(u)) {
+ l.add(gr);
+ }
+ }
+ }
+
+ // default
+ if(!found) {
+ l = super.getGroups(user);
+ if(l.size() == 0) {
+ System.out.println("failed to get real group for " + user +
+ "; using default");
+ return defaultGroups;
+ }
+ }
+ return l;
+ }
+ }
+
+ /**
+ * update the configuration with fake class for mapping user to groups
+ * @param conf
+ * @param map - user to groups mapping
+ */
+ static public void updateConfWithFakeGroupMapping
+ (Configuration conf, Map<String, String []> map) {
+ if(map!=null) {
+ MockUnixGroupsMapping.fakeUser2GroupsMap = map;
+ }
+
+ // fake mapping user to groups
+ conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+ DFSTestUtil.MockUnixGroupsMapping.class,
+ ShellBasedUnixGroupsMapping.class);
+
+ }
+
}
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java b/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
index 9b826a7..77f266a 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -37,6 +37,7 @@
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
@@ -258,10 +259,12 @@
FileSystem.setDefaultUri(conf, "hdfs://localhost:"+ Integer.toString(nameNodePort));
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "127.0.0.1:0");
if (manageNameDfsDirs) {
- conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, new File(base_dir, "name1").getPath()+","+
- new File(base_dir, "name2").getPath());
- conf.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY, new File(base_dir, "namesecondary1").
- getPath()+"," + new File(base_dir, "namesecondary2").getPath());
+ conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+ fileAsURI(new File(base_dir, "name1"))+","+
+ fileAsURI(new File(base_dir, "name2")));
+ conf.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY,
+ fileAsURI(new File(base_dir, "namesecondary1"))+","+
+ fileAsURI(new File(base_dir, "namesecondary2")));
}
int replication = conf.getInt("dfs.replication", 3);
@@ -716,8 +719,8 @@
/**
* Restart a datanode, on the same port if requested
- * @param dnprop, the datanode to restart
- * @param keepPort, whether to use the same port
+ * @param dnprop the datanode to restart
+ * @param keepPort whether to use the same port
* @return true if restarting is successful
* @throws IOException
*/
@@ -808,6 +811,28 @@
public FileSystem getFileSystem() throws IOException {
return FileSystem.get(conf);
}
+
+
+ /**
+ * Get another FileSystem instance that is different from FileSystem.get(conf).
+ * This simulating different threads working on different FileSystem instances.
+ */
+ public FileSystem getNewFileSystemInstance() throws IOException {
+ return FileSystem.newInstance(conf);
+ }
+
+ /**
+ * @return a {@link HftpFileSystem} object.
+ */
+ public HftpFileSystem getHftpFileSystem() throws IOException {
+ final String str = "hftp://"
+ + conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
+ try {
+ return (HftpFileSystem)FileSystem.get(new URI(str), conf);
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
/**
* Get the directories where the namenode stores its image.
@@ -957,7 +982,6 @@
/**
* Access to the data directory used for Datanodes
- * @throws IOException
*/
public String getDataDirectory() {
return data_dir.getAbsolutePath();
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java
deleted file mode 100644
index f77de26..0000000
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-import junit.framework.TestCase;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-
-/**
- * These tests make sure that DFSClient retries fetching data from DFS
- * properly in case of errors.
- */
-public class TestDFSClientExcludedNodes extends TestCase {
- public void testExcludedNodes() throws IOException
- {
- Configuration conf = new HdfsConfiguration();
- MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
- FileSystem fs = cluster.getFileSystem();
- Path filePath = new Path("/testExcludedNodes");
-
- // kill a datanode
- cluster.stopDataNode(AppendTestUtil.nextInt(3));
- OutputStream out = fs.create(filePath, true, 4096);
- out.write(20);
-
- try {
- out.close();
- } catch (Exception e) {
- fail("DataNode failure should not result in a block abort: \n" + e.getMessage());
- }
- }
-
-}
-
-
-
\ No newline at end of file
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
index 6cf9ff1..23776f4 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
@@ -20,6 +20,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Arrays;
+import java.security.MessageDigest;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -137,17 +139,8 @@
return versionID;
}
- public LocatedBlock addBlock(String src, String clientName,
- Block previous) throws IOException {
-
- return addBlock(src, clientName, previous, null);
- }
-
- public LocatedBlock addBlock(String src,
- String clientName,
- Block previous,
- DatanodeInfo[] excludedNode
- ) throws IOException
+ public LocatedBlock addBlock(String src, String clientName, Block previous)
+ throws IOException
{
num_calls++;
if (num_calls > num_calls_allowed) {
@@ -258,4 +251,242 @@
}
}
+
+ /**
+ * Test that a DFSClient waits for random time before retry on busy blocks.
+ */
+ public void testDFSClientRetriesOnBusyBlocks() throws IOException {
+
+ System.out.println("Testing DFSClient random waiting on busy blocks.");
+
+ //
+ // Test settings:
+ //
+ // xcievers fileLen #clients timeWindow #retries
+ // ======== ======= ======== ========== ========
+ // Test 1: 2 6 MB 50 300 ms 3
+ // Test 2: 2 6 MB 50 300 ms 50
+ // Test 3: 2 6 MB 50 1000 ms 3
+ // Test 4: 2 6 MB 50 1000 ms 50
+ //
+ // Minimum xcievers is 2 since 1 thread is reserved for registry.
+ // Test 1 & 3 may fail since # retries is low.
+ // Test 2 & 4 should never fail since (#threads)/(xcievers-1) is the upper
+ // bound for guarantee to not throw BlockMissingException.
+ //
+ int xcievers = 2;
+ int fileLen = 6*1024*1024;
+ int threads = 50;
+ int retries = 3;
+ int timeWin = 300;
+
+ //
+ // Test 1: might fail
+ //
+ long timestamp = System.currentTimeMillis();
+ boolean pass = busyTest(xcievers, threads, fileLen, timeWin, retries);
+ long timestamp2 = System.currentTimeMillis();
+ if ( pass ) {
+ LOG.info("Test 1 succeeded! Time spent: " + (timestamp2-timestamp)/1000.0 + " sec.");
+ } else {
+ LOG.warn("Test 1 failed, but relax. Time spent: " + (timestamp2-timestamp)/1000.0 + " sec.");
+ }
+
+ //
+ // Test 2: should never fail
+ //
+ retries = 50;
+ timestamp = System.currentTimeMillis();
+ pass = busyTest(xcievers, threads, fileLen, timeWin, retries);
+ timestamp2 = System.currentTimeMillis();
+ assertTrue("Something wrong! Test 2 got Exception with maxmum retries!", pass);
+ LOG.info("Test 2 succeeded! Time spent: " + (timestamp2-timestamp)/1000.0 + " sec.");
+
+ //
+ // Test 3: might fail
+ //
+ retries = 3;
+ timeWin = 1000;
+ timestamp = System.currentTimeMillis();
+ pass = busyTest(xcievers, threads, fileLen, timeWin, retries);
+ timestamp2 = System.currentTimeMillis();
+ if ( pass ) {
+ LOG.info("Test 3 succeeded! Time spent: " + (timestamp2-timestamp)/1000.0 + " sec.");
+ } else {
+ LOG.warn("Test 3 failed, but relax. Time spent: " + (timestamp2-timestamp)/1000.0 + " sec.");
+ }
+
+ //
+ // Test 4: should never fail
+ //
+ retries = 50;
+ timeWin = 1000;
+ timestamp = System.currentTimeMillis();
+ pass = busyTest(xcievers, threads, fileLen, timeWin, retries);
+ timestamp2 = System.currentTimeMillis();
+ assertTrue("Something wrong! Test 4 got Exception with maxmum retries!", pass);
+ LOG.info("Test 4 succeeded! Time spent: " + (timestamp2-timestamp)/1000.0 + " sec.");
+ }
+
+ private boolean busyTest(int xcievers, int threads, int fileLen, int timeWin, int retries)
+ throws IOException {
+
+ boolean ret = true;
+ short replicationFactor = 1;
+ long blockSize = 128*1024*1024; // DFS block size
+ int bufferSize = 4096;
+
+ Configuration conf = new HdfsConfiguration();
+ conf.setInt("dfs.datanode.max.xcievers",xcievers);
+ conf.setInt("dfs.client.max.block.acquire.failures", retries);
+ conf.setInt("dfs.client.retry.window.base", timeWin);
+
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, replicationFactor, true, null);
+ cluster.waitActive();
+
+ FileSystem fs = cluster.getFileSystem();
+ Path file1 = new Path("test_data.dat");
+ file1 = file1.makeQualified(fs.getUri(), fs.getWorkingDirectory()); // make URI hdfs://
+
+ try {
+
+ FSDataOutputStream stm = fs.create(file1, true,
+ bufferSize,
+ replicationFactor,
+ blockSize);
+
+ // verify that file exists in FS namespace
+ assertTrue(file1 + " should be a file",
+ fs.getFileStatus(file1).isDir() == false);
+ System.out.println("Path : \"" + file1 + "\"");
+ LOG.info("Path : \"" + file1 + "\"");
+
+ // write 1 block to file
+ byte[] buffer = AppendTestUtil.randomBytes(System.currentTimeMillis(), fileLen);
+ stm.write(buffer, 0, fileLen);
+ stm.close();
+
+ // verify that file size has changed to the full size
+ long len = fs.getFileStatus(file1).getLen();
+
+ assertTrue(file1 + " should be of size " + fileLen +
+ " but found to be of size " + len,
+ len == fileLen);
+
+ // read back and check data integrigy
+ byte[] read_buf = new byte[fileLen];
+ InputStream in = fs.open(file1, fileLen);
+ IOUtils.readFully(in, read_buf, 0, fileLen);
+ assert(Arrays.equals(buffer, read_buf));
+ in.close();
+ read_buf = null; // GC it if needed
+
+ // compute digest of the content to reduce memory space
+ MessageDigest m = MessageDigest.getInstance("SHA");
+ m.update(buffer, 0, fileLen);
+ byte[] hash_sha = m.digest();
+
+ // spawn multiple threads and all trying to access the same block
+ Thread[] readers = new Thread[threads];
+ Counter counter = new Counter(0);
+ for (int i = 0; i < threads; ++i ) {
+ DFSClientReader reader = new DFSClientReader(file1, cluster, hash_sha, fileLen, counter);
+ readers[i] = new Thread(reader);
+ readers[i].start();
+ }
+
+ // wait for them to exit
+ for (int i = 0; i < threads; ++i ) {
+ readers[i].join();
+ }
+ if ( counter.get() == threads )
+ ret = true;
+ else
+ ret = false;
+
+ } catch (InterruptedException e) {
+ System.out.println("Thread got InterruptedException.");
+ e.printStackTrace();
+ ret = false;
+ } catch (Exception e) {
+ e.printStackTrace();
+ ret = false;
+ } finally {
+ fs.delete(file1, false);
+ cluster.shutdown();
+ }
+ return ret;
+ }
+
+ class DFSClientReader implements Runnable {
+
+ DFSClient client;
+ Configuration conf;
+ byte[] expected_sha;
+ FileSystem fs;
+ Path filePath;
+ MiniDFSCluster cluster;
+ int len;
+ Counter counter;
+
+ DFSClientReader(Path file, MiniDFSCluster cluster, byte[] hash_sha, int fileLen, Counter cnt) {
+ filePath = file;
+ this.cluster = cluster;
+ counter = cnt;
+ len = fileLen;
+ conf = new HdfsConfiguration();
+ expected_sha = hash_sha;
+ try {
+ cluster.waitActive();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void run() {
+ try {
+ fs = cluster.getNewFileSystemInstance();
+
+ int bufferSize = len;
+ byte[] buf = new byte[bufferSize];
+
+ InputStream in = fs.open(filePath, bufferSize);
+
+ // read the whole file
+ IOUtils.readFully(in, buf, 0, bufferSize);
+
+ // compare with the expected input
+ MessageDigest m = MessageDigest.getInstance("SHA");
+ m.update(buf, 0, bufferSize);
+ byte[] hash_sha = m.digest();
+
+ buf = null; // GC if needed since there may be too many threads
+ in.close();
+ fs.close();
+
+ assertTrue("hashed keys are not the same size",
+ hash_sha.length == expected_sha.length);
+
+ assertTrue("hashed keys are not equal",
+ Arrays.equals(hash_sha, expected_sha));
+
+ counter.inc(); // count this thread as successful
+
+ LOG.info("Thread correctly read the block.");
+
+ } catch (BlockMissingException e) {
+ LOG.info("Bad - BlockMissingException is caught.");
+ e.printStackTrace();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ class Counter {
+ int counter;
+ Counter(int n) { counter = n; }
+ public synchronized void inc() { ++counter; }
+ public int get() { return counter; }
+ }
}
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSPermission.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSPermission.java
index 113f385..623012f 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSPermission.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSPermission.java
@@ -19,22 +19,29 @@
import java.io.IOException;
import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Random;
import javax.security.auth.login.LoginException;
-import org.apache.commons.logging.*;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.server.common.Util;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UnixUserGroupInformation;
-
import junit.framework.AssertionFailedError;
import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.server.common.Util;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+
/** Unit tests for permission */
public class TestDFSPermission extends TestCase {
public static final Log LOG = LogFactory.getLog(TestDFSPermission.class);
@@ -81,6 +88,13 @@
// explicitly turn on permission checking
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
+ // create fake mapping for the groups
+ Map<String, String[]> u2g_map = new HashMap<String, String[]> (3);
+ u2g_map.put(USER1_NAME, new String[] {GROUP1_NAME, GROUP2_NAME });
+ u2g_map.put(USER2_NAME, new String[] {GROUP2_NAME, GROUP3_NAME });
+ u2g_map.put(USER3_NAME, new String[] {GROUP3_NAME, GROUP4_NAME });
+ DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
+
// Initiate all four users
SUPERUSER = UnixUserGroupInformation.login(conf);
USER1 = new UnixUserGroupInformation(USER1_NAME, new String[] {
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
index 0096a51..f59d005 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
@@ -19,6 +19,8 @@
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.READ_BLOCK;
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.WRITE_BLOCK;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR;
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
@@ -157,9 +159,8 @@
//ok finally write a block with 0 len
SUCCESS.write(recvOut);
- Text.writeString(recvOut, ""); // first bad node
- recvOut.writeLong(100); // sequencenumber
- SUCCESS.write(recvOut);
+ Text.writeString(recvOut, "");
+ new PipelineAck(100, new Status[]{SUCCESS}).write(recvOut);
sendRecvData(description, false);
}
@@ -381,9 +382,8 @@
// bad data chunk length
sendOut.writeInt(-1-random.nextInt(oneMil));
SUCCESS.write(recvOut);
- Text.writeString(recvOut, ""); // first bad node
- recvOut.writeLong(100); // sequencenumber
- ERROR.write(recvOut);
+ Text.writeString(recvOut, "");
+ new PipelineAck(100, new Status[]{ERROR}).write(recvOut);
sendRecvData("negative DATA_CHUNK len while writing block " + newBlockId,
true);
@@ -406,9 +406,8 @@
sendOut.flush();
//ok finally write a block with 0 len
SUCCESS.write(recvOut);
- Text.writeString(recvOut, ""); // first bad node
- recvOut.writeLong(100); // sequencenumber
- SUCCESS.write(recvOut);
+ Text.writeString(recvOut, "");
+ new PipelineAck(100, new Status[]{SUCCESS}).write(recvOut);
sendRecvData("Writing a zero len block blockid " + newBlockId, false);
/* Test OP_READ_BLOCK */
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestFSInputChecker.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestFSInputChecker.java
index 79a763f..a979581 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestFSInputChecker.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestFSInputChecker.java
@@ -212,19 +212,26 @@
private void testChecker(FileSystem fileSys, boolean readCS)
throws Exception {
Path file = new Path("try.dat");
- if( readCS ) {
- writeFile(fileSys, file);
- } else {
- writeFile(fileSys, file);
+ writeFile(fileSys, file);
+
+ try {
+ if (!readCS) {
+ fileSys.setVerifyChecksum(false);
+ }
+
+ stm = fileSys.open(file);
+ checkReadAndGetPos();
+ checkSeek();
+ checkSkip();
+ //checkMark
+ assertFalse(stm.markSupported());
+ stm.close();
+ } finally {
+ if (!readCS) {
+ fileSys.setVerifyChecksum(true);
+ }
+ cleanupFile(fileSys, file);
}
- stm = fileSys.open(file);
- checkReadAndGetPos();
- checkSeek();
- checkSkip();
- //checkMark
- assertFalse(stm.markSupported());
- stm.close();
- cleanupFile(fileSys, file);
}
private void testFileCorruption(LocalFileSystem fileSys) throws IOException {
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java
index 3f36360..03243eb 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java
@@ -17,22 +17,31 @@
*/
package org.apache.hadoop.hdfs;
-import junit.framework.TestCase;
-import java.io.*;
+import java.io.IOException;
import java.util.Random;
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.RemoteException;
+import org.apache.log4j.Level;
/**
* This class tests the FileStatus API.
*/
public class TestFileStatus extends TestCase {
+ {
+ ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)FileSystem.LOG).getLogger().setLevel(Level.ALL);
+ }
+
static final long seed = 0xDEADBEEFL;
static final int blockSize = 8192;
static final int fileSize = 16384;
@@ -64,6 +73,7 @@
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
FileSystem fs = cluster.getFileSystem();
+ final HftpFileSystem hftpfs = cluster.getHftpFileSystem();
final DFSClient dfsClient = new DFSClient(NameNode.getAddress(conf), conf);
try {
@@ -111,8 +121,10 @@
assertTrue(fs.exists(dir));
assertTrue(dir + " should be a directory",
fs.getFileStatus(path).isDir() == true);
- assertTrue(dir + " should be zero size ",
- fs.getContentSummary(dir).getLength() == 0);
+ assertEquals(dir + " should be zero size ",
+ 0, fs.getContentSummary(dir).getLength());
+ assertEquals(dir + " should be zero size using hftp",
+ 0, hftpfs.getContentSummary(dir).getLength());
assertTrue(dir + " should be zero size ",
fs.getFileStatus(dir).getLen() == 0);
System.out.println("Dir : \"" + dir + "\"");
@@ -139,8 +151,11 @@
// verify that the size of the directory increased by the size
// of the two files
- assertTrue(dir + " size should be " + (blockSize/2),
- blockSize/2 == fs.getContentSummary(dir).getLength());
+ final int expected = blockSize/2;
+ assertEquals(dir + " size should be " + expected,
+ expected, fs.getContentSummary(dir).getLength());
+ assertEquals(dir + " size should be " + expected + " using hftp",
+ expected, hftpfs.getContentSummary(dir).getLength());
} finally {
fs.close();
cluster.shutdown();
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestHDFSServerPorts.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestHDFSServerPorts.java
index 5792b27..a1d27d6 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestHDFSServerPorts.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestHDFSServerPorts.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
+import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.BackupNode;
@@ -92,7 +93,8 @@
throw new IOException("Could not delete hdfs directory '" + hdfsDir + "'");
}
config = new HdfsConfiguration();
- config.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, new File(hdfsDir, "name1").getPath());
+ config.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+ fileAsURI(new File(hdfsDir, "name1")).toString());
FileSystem.setDefaultUri(config, "hdfs://"+NAME_NODE_HOST + "0");
config.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, NAME_NODE_HTTP_HOST + "0");
NameNode.format(config);
@@ -120,7 +122,8 @@
assertTrue(currDir2.mkdirs());
assertTrue(currDir3.mkdirs());
- conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, new File(hdfsDir, "name2").getPath());
+ conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+ fileAsURI(new File(hdfsDir, "name2")).toString());
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, "${dfs.name.dir}");
// Start BackupNode
@@ -246,7 +249,8 @@
// start another namenode on the same port
Configuration conf2 = new HdfsConfiguration(config);
- conf2.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, new File(hdfsDir, "name2").getPath());
+ conf2.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+ fileAsURI(new File(hdfsDir, "name2")).toString());
NameNode.format(conf2);
boolean started = canStartNameNode(conf2);
assertFalse(started); // should fail
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java
index 3a3d455..718a7cb 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java
@@ -17,10 +17,14 @@
*/
package org.apache.hadoop.hdfs;
+import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.log4j.Level;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
@@ -30,6 +34,11 @@
/** Class contains a set of tests to verify the correctness of
* newly introduced {@link FSDataOutputStream#hflush()} method */
public class TestHFlush {
+ {
+ ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+ }
+
private final String fName = "hflushtest.dat";
/** The test uses {@link #doTheJob(Configuration, String, long, short)
@@ -143,4 +152,55 @@
actual[idx] = 0;
}
}
+
+ /** This creates a slow writer and check to see
+ * if pipeline heartbeats work fine
+ */
+ @Test
+ public void testPipelineHeartbeat() throws Exception {
+ final int DATANODE_NUM = 2;
+ final int fileLen = 6;
+ Configuration conf = new HdfsConfiguration();
+ final int timeout = 2000;
+ conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
+ timeout);
+
+ final Path p = new Path("/pipelineHeartbeat/foo");
+ System.out.println("p=" + p);
+
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, DATANODE_NUM, true, null);
+ DistributedFileSystem fs = (DistributedFileSystem)cluster.getFileSystem();
+
+ byte[] fileContents = AppendTestUtil.initBuffer(fileLen);
+
+ // create a new file.
+ FSDataOutputStream stm = AppendTestUtil.createFile(fs, p, DATANODE_NUM);
+
+ stm.write(fileContents, 0, 1);
+ Thread.sleep(timeout);
+ stm.hflush();
+ System.out.println("Wrote 1 byte and hflush " + p);
+
+ // write another byte
+ Thread.sleep(timeout);
+ stm.write(fileContents, 1, 1);
+ stm.hflush();
+
+ stm.write(fileContents, 2, 1);
+ Thread.sleep(timeout);
+ stm.hflush();
+
+ stm.write(fileContents, 3, 1);
+ Thread.sleep(timeout);
+ stm.write(fileContents, 4, 1);
+ stm.hflush();
+
+ stm.write(fileContents, 5, 1);
+ Thread.sleep(timeout);
+ stm.close();
+
+ // verify that entire file is good
+ AppendTestUtil.checkFullFile(fs, p, fileLen,
+ fileContents, "Failed to slowly write to a file");
+ }
}
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
index 4eb8366..20a41c1 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
@@ -29,7 +31,6 @@
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Level;
@@ -45,6 +46,9 @@
static final int FILE_SIZE = 1024*16;
static final short REPLICATION_NUM = (short)3;
static byte[] buffer = new byte[FILE_SIZE];
+
+ static private String fakeUsername = "fakeUser1";
+ static private String fakeGroup = "supergroup";
public void testBlockSynchronization() throws Exception {
final long softLease = 1000;
@@ -56,6 +60,13 @@
conf.setInt("dfs.heartbeat.interval", 1);
// conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 16);
+ // create fake mapping user to group and set it to the conf
+ // NOTE. this must be done at the beginning, before first call to mapping
+ // functions
+ Map<String, String []> u2g_map = new HashMap<String, String []>(1);
+ u2g_map.put(fakeUsername, new String[] {fakeGroup});
+ DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
+
MiniDFSCluster cluster = null;
DistributedFileSystem dfs = null;
byte[] actual = new byte[FILE_SIZE];
@@ -93,10 +104,9 @@
// should fail but will trigger lease recovery.
{
Configuration conf2 = new HdfsConfiguration(conf);
- String username = UserGroupInformation.getCurrentUGI().getUserName()+"_1";
UnixUserGroupInformation.saveToConf(conf2,
UnixUserGroupInformation.UGI_PROPERTY_NAME,
- new UnixUserGroupInformation(username, new String[]{"supergroup"}));
+ new UnixUserGroupInformation(fakeUsername, new String[]{fakeGroup}));
FileSystem dfs2 = FileSystem.get(conf2);
boolean done = false;
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestListPathServlet.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestListPathServlet.java
index 3e1148f..921d411 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestListPathServlet.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestListPathServlet.java
@@ -60,7 +60,7 @@
final String str = "hftp://"
+ CONF.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
hftpURI = new URI(str);
- hftpFs = (HftpFileSystem) FileSystem.newInstance(hftpURI, CONF);
+ hftpFs = cluster.getHftpFileSystem();
}
@AfterClass
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java
index b2b6c6e..6e9a05b 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java
@@ -17,23 +17,24 @@
*/
package org.apache.hadoop.hdfs;
+import java.io.IOException;
+import java.io.OutputStream;
+
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
/** Test reading from hdfs while a file is being written. */
public class TestReadWhileWriting {
{
@@ -44,6 +45,7 @@
private static final String DIR = "/"
+ TestReadWhileWriting.class.getSimpleName() + "/";
private static final int BLOCK_SIZE = 8192;
+ private static final long LEASE_LIMIT = 500;
/** Test reading while writing. */
@Test
@@ -51,13 +53,13 @@
final Configuration conf = new HdfsConfiguration();
//enable append
conf.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
+ conf.setLong("dfs.heartbeat.interval", 1);
// create cluster
final MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
try {
- //change the lease soft limit to 1 second.
- final long leaseSoftLimit = 1000;
- cluster.setLeasePeriod(leaseSoftLimit, FSConstants.LEASE_HARDLIMIT_PERIOD);
+ //change the lease limits.
+ cluster.setLeasePeriod(LEASE_LIMIT, LEASE_LIMIT);
//wait for the cluster
cluster.waitActive();
@@ -81,26 +83,44 @@
//b. On another machine M2, open file and verify that the half-block
// of data can be read successfully.
checkFile(p, half, conf);
+ AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
+ ((DistributedFileSystem)fs).dfs.leasechecker.interruptAndJoin();
- /* TODO: enable the following when append is done.
//c. On M1, append another half block of data. Close file on M1.
{
- //sleep to make sure the lease is expired the soft limit.
- Thread.sleep(2*leaseSoftLimit);
-
- FSDataOutputStream out = fs.append(p);
+ //sleep to let the lease is expired.
+ Thread.sleep(2*LEASE_LIMIT);
+
+ final DistributedFileSystem dfs = (DistributedFileSystem)FileSystem.newInstance(conf);
+ final FSDataOutputStream out = append(dfs, p);
write(out, 0, half);
out.close();
}
//d. On M2, open file and read 1 block of data from it. Close file.
checkFile(p, 2*half, conf);
- */
} finally {
cluster.shutdown();
}
}
+ /** Try openning a file for append. */
+ private static FSDataOutputStream append(FileSystem fs, Path p) throws Exception {
+ for(int i = 0; i < 10; i++) {
+ try {
+ return fs.append(p);
+ } catch(RemoteException re) {
+ if (re.getClassName().equals(RecoveryInProgressException.class.getName())) {
+ AppendTestUtil.LOG.info("Will sleep and retry, i=" + i +", p="+p, re);
+ Thread.sleep(1000);
+ }
+ else
+ throw re;
+ }
+ }
+ throw new IOException("Cannot append to " + p);
+ }
+
static private int userCount = 0;
//check the file
static void checkFile(Path p, int expectedsize, Configuration conf
@@ -113,10 +133,10 @@
UnixUserGroupInformation.UGI_PROPERTY_NAME,
new UnixUserGroupInformation(username, new String[]{"supergroup"}));
final FileSystem fs = FileSystem.get(conf2);
- final InputStream in = fs.open(p);
+ final DFSDataInputStream in = (DFSDataInputStream)fs.open(p);
- //Is the data available?
- Assert.assertTrue(available(in, expectedsize));
+ //Check visible length
+ Assert.assertTrue(in.getVisibleLength() >= expectedsize);
//Able to read?
for(int i = 0; i < expectedsize; i++) {
@@ -135,15 +155,5 @@
}
out.write(bytes);
}
-
- /** Is the data available? */
- private static boolean available(InputStream in, int expectedsize
- ) throws IOException {
- final int available = in.available();
- System.out.println(" in.available()=" + available);
- Assert.assertTrue(available >= 0);
- Assert.assertTrue(available <= expectedsize);
- return available == expectedsize;
- }
}
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 5b19019..91c0fa1 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -180,6 +180,7 @@
totalCapacity += capacity;
}
runBalancer(conf, totalUsedSpace, totalCapacity);
+ cluster.shutdown();
}
/* wait for one heartbeat */
@@ -261,6 +262,38 @@
} while(!balanced);
}
+
+ private void runBalancerDefaultConstructor(Configuration conf,
+ long totalUsedSpace, long totalCapacity) throws Exception {
+ waitForHeartBeat(totalUsedSpace, totalCapacity);
+
+ // start rebalancing
+ balancer = new Balancer();
+ balancer.setConf(conf);
+ balancer.run(new String[0]);
+
+ waitForHeartBeat(totalUsedSpace, totalCapacity);
+ boolean balanced;
+ do {
+ DatanodeInfo[] datanodeReport = client
+ .getDatanodeReport(DatanodeReportType.ALL);
+ assertEquals(datanodeReport.length, cluster.getDataNodes().size());
+ balanced = true;
+ double avgUtilization = ((double) totalUsedSpace) / totalCapacity * 100;
+ for (DatanodeInfo datanode : datanodeReport) {
+ if (Math.abs(avgUtilization - ((double) datanode.getDfsUsed())
+ / datanode.getCapacity() * 100) > 10) {
+ balanced = false;
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ignored) {
+ }
+ break;
+ }
+ }
+ } while (!balanced);
+
+ }
/** one-node cluster test*/
private void oneNodeTest(Configuration conf) throws Exception {
@@ -298,6 +331,44 @@
new long[]{CAPACITY, CAPACITY},
new String[] {RACK0, RACK1});
}
+
+ public void testBalancer2() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+ testBalancerDefaultConstructor(conf, new long[] { CAPACITY, CAPACITY },
+ new String[] { RACK0, RACK1 }, CAPACITY, RACK2);
+ }
+
+ private void testBalancerDefaultConstructor(Configuration conf,
+ long[] capacities, String[] racks, long newCapacity, String newRack)
+ throws Exception {
+ int numOfDatanodes = capacities.length;
+ assertEquals(numOfDatanodes, racks.length);
+ cluster = new MiniDFSCluster(0, conf, capacities.length, true, true, null,
+ racks, capacities);
+ try {
+ cluster.waitActive();
+ client = DFSClient.createNamenode(conf);
+
+ long totalCapacity = 0L;
+ for (long capacity : capacities) {
+ totalCapacity += capacity;
+ }
+ // fill up the cluster to be 30% full
+ long totalUsedSpace = totalCapacity * 3 / 10;
+ createFile(totalUsedSpace / numOfDatanodes, (short) numOfDatanodes);
+ // start up an empty node with the same capacity and on the same rack
+ cluster.startDataNodes(conf, 1, true, null, new String[] { newRack },
+ new long[] { newCapacity });
+
+ totalCapacity += newCapacity;
+
+ // run balancer and validate results
+ runBalancerDefaultConstructor(conf, totalUsedSpace, totalCapacity);
+ } finally {
+ cluster.shutdown();
+ }
+ }
/**
* @param args
@@ -306,5 +377,6 @@
TestBalancer balancerTest = new TestBalancer();
balancerTest.testBalancer0();
balancerTest.testBalancer1();
+ balancerTest.testBalancer2();
}
}
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestGetUriFromString.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestGetUriFromString.java
new file mode 100644
index 0000000..b1ffca8
--- /dev/null
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestGetUriFromString.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common;
+
+import java.io.IOException;
+import java.net.URI;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This is a unit test, which tests {@link Util#stringAsURI(String)}
+ * for Windows and Unix style file paths.
+ */
+public class TestGetUriFromString extends TestCase {
+ private static final Log LOG = LogFactory.getLog(TestGetUriFromString.class);
+
+ private static final String RELATIVE_FILE_PATH = "relativeFilePath";
+ private static final String ABSOLUTE_PATH_UNIX = "/tmp/file1";
+ private static final String ABSOLUTE_PATH_WINDOWS =
+ "C:\\Documents and Settings\\All Users";
+ private static final String URI_FILE_SCHEMA = "file";
+ private static final String URI_PATH_UNIX = "/var/www";
+ private static final String URI_PATH_WINDOWS =
+ "/C:/Documents%20and%20Settings/All%20Users";
+ private static final String URI_UNIX = URI_FILE_SCHEMA + "://"
+ + URI_PATH_UNIX;
+ private static final String URI_WINDOWS = URI_FILE_SCHEMA + "://"
+ + URI_PATH_WINDOWS;
+
+ /**
+ * Test for a relative path, os independent
+ * @throws IOException
+ */
+ public void testRelativePathAsURI() throws IOException {
+ URI u = Util.stringAsURI(RELATIVE_FILE_PATH);
+ LOG.info("Uri: " + u);
+ assertNotNull(u);
+ }
+
+ /**
+ * Test for an OS dependent absolute paths.
+ * @throws IOException
+ */
+ public void testAbsolutePathAsURI() throws IOException {
+ URI u = null;
+ u = Util.stringAsURI(ABSOLUTE_PATH_WINDOWS);
+ assertNotNull(
+ "Uri should not be null for Windows path" + ABSOLUTE_PATH_WINDOWS, u);
+ assertEquals(URI_FILE_SCHEMA, u.getScheme());
+ u = Util.stringAsURI(ABSOLUTE_PATH_UNIX);
+ assertNotNull("Uri should not be null for Unix path" + ABSOLUTE_PATH_UNIX, u);
+ assertEquals(URI_FILE_SCHEMA, u.getScheme());
+ }
+
+ /**
+ * Test for a URI
+ * @throws IOException
+ */
+ public void testURI() throws IOException {
+ LOG.info("Testing correct Unix URI: " + URI_UNIX);
+ URI u = Util.stringAsURI(URI_UNIX);
+ LOG.info("Uri: " + u);
+ assertNotNull("Uri should not be null at this point", u);
+ assertEquals(URI_FILE_SCHEMA, u.getScheme());
+ assertEquals(URI_PATH_UNIX, u.getPath());
+
+ LOG.info("Testing correct windows URI: " + URI_WINDOWS);
+ u = Util.stringAsURI(URI_WINDOWS);
+ LOG.info("Uri: " + u);
+ assertNotNull("Uri should not be null at this point", u);
+ assertEquals(URI_FILE_SCHEMA, u.getScheme());
+ assertEquals(URI_PATH_WINDOWS.replace("%20", " "), u.getPath());
+ }
+}
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java
index f0e898e..81da8b8 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java
@@ -215,12 +215,12 @@
try {
// start name-node and backup node 1
cluster = new MiniDFSCluster(conf1, 0, true, null);
- conf1.set(DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY, "0.0.0.0:7770");
+ conf1.set(DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY, "0.0.0.0:7771");
conf1.set(DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY, "0.0.0.0:7775");
backup1 = startBackupNode(conf1, StartupOption.BACKUP, 1);
// try to start backup node 2
conf2 = new HdfsConfiguration(conf1);
- conf2.set(DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY, "0.0.0.0:7771");
+ conf2.set(DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY, "0.0.0.0:7772");
conf2.set(DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY, "0.0.0.0:7776");
try {
backup2 = startBackupNode(conf2, StartupOption.BACKUP, 2);
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStartup.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStartup.java
index 0bbca80..ed1d6be 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStartup.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStartup.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
@@ -48,7 +49,7 @@
*/
public class TestStartup extends TestCase {
public static final String NAME_NODE_HOST = "localhost:";
- public static final String NAME_NODE_HTTP_HOST = "0.0.0.0:";
+ public static final String WILDCARD_HTTP_HOST = "0.0.0.0:";
private static final Log LOG =
LogFactory.getLog(TestStartup.class.getName());
private Configuration config;
@@ -74,18 +75,20 @@
protected void setUp() throws Exception {
config = new HdfsConfiguration();
- String baseDir = System.getProperty("test.build.data", "/tmp");
+ hdfsDir = new File(MiniDFSCluster.getBaseDirectory());
- hdfsDir = new File(baseDir, "dfs");
if ( hdfsDir.exists() && !FileUtil.fullyDelete(hdfsDir) ) {
throw new IOException("Could not delete hdfs directory '" + hdfsDir + "'");
}
LOG.info("--hdfsdir is " + hdfsDir.getAbsolutePath());
- config.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, new File(hdfsDir, "name").getPath());
- config.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, new File(hdfsDir, "data").getPath());
-
- config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY,new File(hdfsDir, "secondary").getPath());
- //config.set("fs.default.name", "hdfs://"+ NAME_NODE_HOST + "0");
+ config.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+ fileAsURI(new File(hdfsDir, "name")).toString());
+ config.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
+ new File(hdfsDir, "data").getPath());
+ config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY,
+ fileAsURI(new File(hdfsDir, "secondary")).toString());
+ config.set(DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY,
+ WILDCARD_HTTP_HOST + "0");
FileSystem.setDefaultUri(config, "hdfs://"+NAME_NODE_HOST + "0");
}
@@ -231,11 +234,15 @@
public void testChkpointStartup2() throws IOException{
LOG.info("--starting checkpointStartup2 - same directory for checkpoint");
// different name dirs
- config.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, new File(hdfsDir, "name").getPath());
- config.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, new File(hdfsDir, "edits").getPath());
+ config.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+ fileAsURI(new File(hdfsDir, "name")).toString());
+ config.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
+ fileAsURI(new File(hdfsDir, "edits")).toString());
// same checkpoint dirs
- config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY, new File(hdfsDir, "chkpt").getPath());
- config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY, new File(hdfsDir, "chkpt").getPath());
+ config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY,
+ fileAsURI(new File(hdfsDir, "chkpt")).toString());
+ config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY,
+ fileAsURI(new File(hdfsDir, "chkpt")).toString());
createCheckPoint();
@@ -253,11 +260,15 @@
//setUpConfig();
LOG.info("--starting testStartup Recovery");
// different name dirs
- config.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, new File(hdfsDir, "name").getPath());
- config.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, new File(hdfsDir, "edits").getPath());
+ config.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+ fileAsURI(new File(hdfsDir, "name")).toString());
+ config.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
+ fileAsURI(new File(hdfsDir, "edits")).toString());
// same checkpoint dirs
- config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY, new File(hdfsDir, "chkpt_edits").getPath());
- config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY, new File(hdfsDir, "chkpt").getPath());
+ config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY,
+ fileAsURI(new File(hdfsDir, "chkpt_edits")).toString());
+ config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY,
+ fileAsURI(new File(hdfsDir, "chkpt")).toString());
createCheckPoint();
corruptNameNodeFiles();
@@ -274,11 +285,15 @@
//setUpConfig();
LOG.info("--starting SecondNN startup test");
// different name dirs
- config.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, new File(hdfsDir, "name").getPath());
- config.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, new File(hdfsDir, "name").getPath());
+ config.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+ fileAsURI(new File(hdfsDir, "name")).toString());
+ config.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
+ fileAsURI(new File(hdfsDir, "name")).toString());
// same checkpoint dirs
- config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY, new File(hdfsDir, "chkpt_edits").getPath());
- config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY, new File(hdfsDir, "chkpt").getPath());
+ config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY,
+ fileAsURI(new File(hdfsDir, "chkpt_edits")).toString());
+ config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY,
+ fileAsURI(new File(hdfsDir, "chkpt")).toString());
LOG.info("--starting NN ");
MiniDFSCluster cluster = null;
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
index 77284b0..c67fc4b 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
@@ -33,7 +33,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.cli.TestHDFSCLI;
+import org.apache.hadoop.cli.CmdFactoryDFS;
+import org.apache.hadoop.cli.util.CLITestData;
import org.apache.hadoop.cli.util.CommandExecutor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -164,7 +165,7 @@
/**
* This function returns a md5 hash of a file.
*
- * @param FileToMd5
+ * @param file input file
* @return The md5 string
*/
public String getFileMD5(File file) throws Exception {
@@ -189,7 +190,7 @@
/**
* read currentCheckpointTime directly from the file
* @param currDir
- * @return
+ * @return the checkpoint time
* @throws IOException
*/
long readCheckpointTime(File currDir) throws IOException {
@@ -351,25 +352,25 @@
String cmd = "-fs NAMENODE -restoreFailedStorage false";
String namenode = config.get(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "file:///");
- CommandExecutor executor = new TestHDFSCLI.DFSAdminCmdExecutor(namenode);
+ CommandExecutor executor =
+ CmdFactoryDFS.getCommandExecutor(
+ new CLITestData.TestCmd(cmd, CLITestData.TestCmd.CommandType.DFSADMIN),
+ namenode);
executor.executeCommand(cmd);
restore = fsi.getRestoreFailedStorage();
- LOG.info("After set true call restore is " + restore);
- assertEquals(restore, false);
+ assertFalse("After set true call restore is " + restore, restore);
// run one more time - to set it to true again
cmd = "-fs NAMENODE -restoreFailedStorage true";
executor.executeCommand(cmd);
restore = fsi.getRestoreFailedStorage();
- LOG.info("After set false call restore is " + restore);
- assertEquals(restore, true);
+ assertTrue("After set false call restore is " + restore, restore);
// run one more time - no change in value
cmd = "-fs NAMENODE -restoreFailedStorage check";
CommandExecutor.Result cmdResult = executor.executeCommand(cmd);
restore = fsi.getRestoreFailedStorage();
- LOG.info("After check call restore is " + restore);
- assertEquals(restore, true);
+ assertTrue("After check call restore is " + restore, restore);
String commandOutput = cmdResult.getCommandOutput();
commandOutput.trim();
assertTrue(commandOutput.contains("restoreFailedStorage is set to true"));
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
index 7feacd7..2dbacf6 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
@@ -39,11 +39,19 @@
*/
public class TestNameNodeMetrics extends TestCase {
private static final Configuration CONF = new HdfsConfiguration();
+ private static final int DFS_REPLICATION_INTERVAL = 1;
+ private static final Path TEST_ROOT_DIR_PATH =
+ new Path(System.getProperty("test.build.data", "build/test/data"));
+
+ // Number of datanodes in the cluster
+ private static final int DATANODE_COUNT = 3;
static {
CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 100);
CONF.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1);
- CONF.setLong("dfs.heartbeat.interval", 1L);
- CONF.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
+ CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+ DFS_REPLICATION_INTERVAL);
+ CONF.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
+ DFS_REPLICATION_INTERVAL);
}
private MiniDFSCluster cluster;
@@ -52,9 +60,13 @@
private Random rand = new Random();
private FSNamesystem namesystem;
+ private static Path getTestPath(String fileName) {
+ return new Path(TEST_ROOT_DIR_PATH, fileName);
+ }
+
@Override
protected void setUp() throws Exception {
- cluster = new MiniDFSCluster(CONF, 3, true, null);
+ cluster = new MiniDFSCluster(CONF, DATANODE_COUNT, true, null);
cluster.waitActive();
namesystem = cluster.getNamesystem();
fs = (DistributedFileSystem) cluster.getFileSystem();
@@ -67,9 +79,8 @@
}
/** create a file with a length of <code>fileLen</code> */
- private void createFile(String fileName, long fileLen, short replicas) throws IOException {
- Path filePath = new Path(fileName);
- DFSTestUtil.createFile(fs, filePath, fileLen, replicas, rand.nextLong());
+ private void createFile(Path file, long fileLen, short replicas) throws IOException {
+ DFSTestUtil.createFile(fs, file, fileLen, replicas, rand.nextLong());
}
private void updateMetrics() throws Exception {
@@ -82,7 +93,7 @@
/** Test metrics associated with addition of a file */
public void testFileAdd() throws Exception {
// Add files with 100 blocks
- final String file = "/tmp/t";
+ final Path file = getTestPath("testFileAdd");
createFile(file, 3200, (short)3);
final int blockCount = 32;
int blockCapacity = namesystem.getBlockCapacity();
@@ -96,27 +107,37 @@
blockCapacity <<= 1;
}
updateMetrics();
- assertEquals(3, metrics.filesTotal.get());
+ int filesTotal = file.depth() + 1; // Add 1 for root
+ assertEquals(filesTotal, metrics.filesTotal.get());
assertEquals(blockCount, metrics.blocksTotal.get());
assertEquals(blockCapacity, metrics.blockCapacity.get());
- fs.delete(new Path(file), true);
+ fs.delete(file, true);
+ filesTotal--; // reduce the filecount for deleted file
+
+ // Wait for more than DATANODE_COUNT replication intervals to ensure all
+ // the blocks pending deletion are sent for deletion to the datanodes.
+ Thread.sleep(DFS_REPLICATION_INTERVAL * (DATANODE_COUNT + 1) * 1000);
+ updateMetrics();
+ assertEquals(filesTotal, metrics.filesTotal.get());
+ assertEquals(0, metrics.blocksTotal.get());
+ assertEquals(0, metrics.pendingDeletionBlocks.get());
}
/** Corrupt a block and ensure metrics reflects it */
public void testCorruptBlock() throws Exception {
// Create a file with single block with two replicas
- String file = "/tmp/t";
+ final Path file = getTestPath("testCorruptBlock");
createFile(file, 100, (short)2);
// Corrupt first replica of the block
LocatedBlock block = NameNodeAdapter.getBlockLocations(
- cluster.getNameNode(), file, 0, 1).get(0);
+ cluster.getNameNode(), file.toString(), 0, 1).get(0);
namesystem.markBlockAsCorrupt(block.getBlock(), block.getLocations()[0]);
updateMetrics();
assertEquals(1, metrics.corruptBlocks.get());
assertEquals(1, metrics.pendingReplicationBlocks.get());
assertEquals(1, metrics.scheduledReplicationBlocks.get());
- fs.delete(new Path(file), true);
+ fs.delete(file, true);
updateMetrics();
assertEquals(0, metrics.corruptBlocks.get());
assertEquals(0, metrics.pendingReplicationBlocks.get());
@@ -127,30 +148,29 @@
* for a file and ensure metrics reflects it
*/
public void testExcessBlocks() throws Exception {
- String file = "/tmp/t";
+ Path file = getTestPath("testExcessBlocks");
createFile(file, 100, (short)2);
int totalBlocks = 1;
- namesystem.setReplication(file, (short)1);
+ namesystem.setReplication(file.toString(), (short)1);
updateMetrics();
assertEquals(totalBlocks, metrics.excessBlocks.get());
- assertEquals(totalBlocks, metrics.pendingDeletionBlocks.get());
- fs.delete(new Path(file), true);
+ fs.delete(file, true);
}
/** Test to ensure metrics reflects missing blocks */
public void testMissingBlock() throws Exception {
// Create a file with single block with two replicas
- String file = "/tmp/t";
+ Path file = getTestPath("testMissingBlocks");
createFile(file, 100, (short)1);
// Corrupt the only replica of the block to result in a missing block
LocatedBlock block = NameNodeAdapter.getBlockLocations(
- cluster.getNameNode(), file, 0, 1).get(0);
+ cluster.getNameNode(), file.toString(), 0, 1).get(0);
namesystem.markBlockAsCorrupt(block.getBlock(), block.getLocations()[0]);
updateMetrics();
assertEquals(1, metrics.underReplicatedBlocks.get());
assertEquals(1, metrics.missingBlocks.get());
- fs.delete(new Path(file), true);
+ fs.delete(file, true);
updateMetrics();
assertEquals(0, metrics.underReplicatedBlocks.get());
}
diff --git a/src/test/hdfs/org/apache/hadoop/security/TestGroupMappingServiceRefresh.java b/src/test/hdfs/org/apache/hadoop/security/TestGroupMappingServiceRefresh.java
new file mode 100644
index 0000000..c4b2285
--- /dev/null
+++ b/src/test/hdfs/org/apache/hadoop/security/TestGroupMappingServiceRefresh.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.tools.DFSAdmin;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.hadoop.security.GroupMappingServiceProvider;
+
+public class TestGroupMappingServiceRefresh {
+ private MiniDFSCluster cluster;
+ Configuration config;
+ private static long groupRefreshTimeoutSec = 1;
+
+ public static class MockUnixGroupsMapping implements GroupMappingServiceProvider {
+ private int i=0;
+
+ @Override
+ public List<String> getGroups(String user) throws IOException {
+ String g1 = user + (10 * i + 1);
+ String g2 = user + (10 * i + 2);
+ List<String> l = new ArrayList<String>(2);
+ l.add(g1);
+ l.add(g2);
+ i++;
+ return l;
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ config = new HdfsConfiguration();
+ config.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+ TestGroupMappingServiceRefresh.MockUnixGroupsMapping.class,
+ GroupMappingServiceProvider.class);
+ config.setLong(CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS,
+ groupRefreshTimeoutSec);
+
+ FileSystem.setDefaultUri(config, "hdfs://localhost:" + "0");
+ cluster = new MiniDFSCluster(0, config, 1, true, true, true, null, null, null, null);
+ cluster.waitActive();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if(cluster!=null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testGroupMappingRefresh() throws Exception {
+ DFSAdmin admin = new DFSAdmin(config);
+ String [] args = new String[]{"-refreshUserToGroupsMappings"};
+ Groups groups = SecurityUtil.getUserToGroupsMappingService(config);
+ String user = UnixUserGroupInformation.getUnixUserName();
+ System.out.println("first attempt:");
+ List<String> g1 = groups.getGroups(user);
+ String [] str_groups = new String [g1.size()];
+ g1.toArray(str_groups);
+ System.out.println(Arrays.toString(str_groups));
+
+ System.out.println("second attempt, should be same:");
+ List<String> g2 = groups.getGroups(user);
+ g2.toArray(str_groups);
+ System.out.println(Arrays.toString(str_groups));
+ for(int i=0; i<g2.size(); i++) {
+ assertEquals("Should be same group ", g1.get(i), g2.get(i));
+ }
+ admin.run(args);
+ System.out.println("third attempt(after refresh command), should be different:");
+ List<String> g3 = groups.getGroups(user);
+ g3.toArray(str_groups);
+ System.out.println(Arrays.toString(str_groups));
+ for(int i=0; i<g3.size(); i++) {
+ assertFalse("Should be different group ", g1.get(i).equals(g3.get(i)));
+ }
+
+ // test time out
+ Thread.sleep(groupRefreshTimeoutSec*1100);
+ System.out.println("fourth attempt(after timeout), should be different:");
+ List<String> g4 = groups.getGroups(user);
+ g4.toArray(str_groups);
+ System.out.println(Arrays.toString(str_groups));
+ for(int i=0; i<g4.size(); i++) {
+ assertFalse("Should be different group ", g3.get(i).equals(g4.get(i)));
+ }
+ }
+}
diff --git a/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestNNLeaseRecovery.java b/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestNNLeaseRecovery.java
index 6fa0569..f6125b3 100644
--- a/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestNNLeaseRecovery.java
+++ b/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestNNLeaseRecovery.java
@@ -19,15 +19,6 @@
package org.apache.hadoop.hdfs.server.namenode;
import static junit.framework.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
-import java.io.File;
-import java.io.IOException;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
@@ -39,13 +30,20 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.After;
+import static org.junit.Assert.assertFalse;
import org.junit.Before;
import org.junit.Test;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.*;
+
+import java.io.File;
+import java.io.IOException;
public class TestNNLeaseRecovery {
public static final Log LOG = LogFactory.getLog(TestNNLeaseRecovery.class);
@@ -128,8 +126,8 @@
PermissionStatus ps =
new PermissionStatus("test", "test", new FsPermission((short)0777));
- mockFileBlocks(null,
- HdfsConstants.BlockUCState.UNDER_CONSTRUCTION, file, dnd, ps);
+ mockFileBlocks(2, null,
+ HdfsConstants.BlockUCState.UNDER_CONSTRUCTION, file, dnd, ps, false);
fsn.internalReleaseLease(lm, file.toString(), null);
assertTrue("FSNamesystem.internalReleaseLease suppose to throw " +
@@ -152,8 +150,52 @@
PermissionStatus ps =
new PermissionStatus("test", "test", new FsPermission((short)0777));
- mockFileBlocks(HdfsConstants.BlockUCState.COMMITTED,
- HdfsConstants.BlockUCState.COMMITTED, file, dnd, ps);
+ mockFileBlocks(2, HdfsConstants.BlockUCState.COMMITTED,
+ HdfsConstants.BlockUCState.COMMITTED, file, dnd, ps, false);
+
+ fsn.internalReleaseLease(lm, file.toString(), null);
+ assertTrue("FSNamesystem.internalReleaseLease suppose to throw " +
+ "AlreadyBeingCreatedException here", false);
+ }
+
+ /**
+ * Mocks FSNamesystem instance, adds an empty file with 0 blocks
+ * and invokes lease recovery method.
+ *
+ */
+ @Test
+ public void testInternalReleaseLease_0blocks () throws IOException {
+ LOG.debug("Running " + GenericTestUtils.getMethodName());
+ LeaseManager.Lease lm = mock(LeaseManager.Lease.class);
+ Path file =
+ spy(new Path("/" + GenericTestUtils.getMethodName() + "_test.dat"));
+ DatanodeDescriptor dnd = mock(DatanodeDescriptor.class);
+ PermissionStatus ps =
+ new PermissionStatus("test", "test", new FsPermission((short)0777));
+
+ mockFileBlocks(0, null, null, file, dnd, ps, false);
+
+ assertTrue("True has to be returned in this case",
+ fsn.internalReleaseLease(lm, file.toString(), null));
+ }
+
+ /**
+ * Mocks FSNamesystem instance, adds an empty file with 1 block
+ * and invokes lease recovery method.
+ * AlreadyBeingCreatedException is expected.
+ * @throws AlreadyBeingCreatedException as the result
+ */
+ @Test(expected=AlreadyBeingCreatedException.class)
+ public void testInternalReleaseLease_1blocks () throws IOException {
+ LOG.debug("Running " + GenericTestUtils.getMethodName());
+ LeaseManager.Lease lm = mock(LeaseManager.Lease.class);
+ Path file =
+ spy(new Path("/" + GenericTestUtils.getMethodName() + "_test.dat"));
+ DatanodeDescriptor dnd = mock(DatanodeDescriptor.class);
+ PermissionStatus ps =
+ new PermissionStatus("test", "test", new FsPermission((short)0777));
+
+ mockFileBlocks(1, null, HdfsConstants.BlockUCState.COMMITTED, file, dnd, ps, false);
fsn.internalReleaseLease(lm, file.toString(), null);
assertTrue("FSNamesystem.internalReleaseLease suppose to throw " +
@@ -176,25 +218,159 @@
PermissionStatus ps =
new PermissionStatus("test", "test", new FsPermission((short)0777));
- mockFileBlocks(HdfsConstants.BlockUCState.COMMITTED,
- HdfsConstants.BlockUCState.UNDER_CONSTRUCTION, file, dnd, ps);
+ mockFileBlocks(2, HdfsConstants.BlockUCState.COMMITTED,
+ HdfsConstants.BlockUCState.UNDER_CONSTRUCTION, file, dnd, ps, false);
assertFalse("False is expected in return in this case",
fsn.internalReleaseLease(lm, file.toString(), null));
}
- private void mockFileBlocks(HdfsConstants.BlockUCState penUltState,
- HdfsConstants.BlockUCState lastState,
- Path file, DatanodeDescriptor dnd,
- PermissionStatus ps) throws IOException {
+ @Test
+ public void testCommitBlockSynchronization_BlockNotFound ()
+ throws IOException {
+ LOG.debug("Running " + GenericTestUtils.getMethodName());
+ long recoveryId = 2002;
+ long newSize = 273487234;
+ Path file =
+ spy(new Path("/" + GenericTestUtils.getMethodName() + "_test.dat"));
+ DatanodeDescriptor dnd = mock(DatanodeDescriptor.class);
+ PermissionStatus ps =
+ new PermissionStatus("test", "test", new FsPermission((short)0777));
+
+ mockFileBlocks(2, HdfsConstants.BlockUCState.COMMITTED,
+ HdfsConstants.BlockUCState.UNDER_CONSTRUCTION, file, dnd, ps, false);
+
+ BlockInfo lastBlock = fsn.dir.getFileINode(anyString()).getLastBlock();
+ try {
+ fsn.commitBlockSynchronization(lastBlock,
+ recoveryId, newSize, true, false, new DatanodeID[1]);
+ } catch (IOException ioe) {
+ assertTrue(ioe.getMessage().startsWith("Block (="));
+ }
+ }
+
+ @Test
+ public void testCommitBlockSynchronization_notUR ()
+ throws IOException {
+ LOG.debug("Running " + GenericTestUtils.getMethodName());
+ long recoveryId = 2002;
+ long newSize = 273487234;
+ Path file =
+ spy(new Path("/" + GenericTestUtils.getMethodName() + "_test.dat"));
+ DatanodeDescriptor dnd = mock(DatanodeDescriptor.class);
+ PermissionStatus ps =
+ new PermissionStatus("test", "test", new FsPermission((short)0777));
+
+ mockFileBlocks(2, HdfsConstants.BlockUCState.COMMITTED,
+ HdfsConstants.BlockUCState.COMPLETE, file, dnd, ps, true);
+
+ BlockInfo lastBlock = fsn.dir.getFileINode(anyString()).getLastBlock();
+ when(lastBlock.isComplete()).thenReturn(true);
+
+ try {
+ fsn.commitBlockSynchronization(lastBlock,
+ recoveryId, newSize, true, false, new DatanodeID[1]);
+ } catch (IOException ioe) {
+ assertTrue(ioe.getMessage().startsWith("Unexpected block (="));
+ }
+ }
+
+ @Test
+ public void testCommitBlockSynchronization_WrongGreaterRecoveryID()
+ throws IOException {
+ LOG.debug("Running " + GenericTestUtils.getMethodName());
+ long recoveryId = 2002;
+ long newSize = 273487234;
+ Path file =
+ spy(new Path("/" + GenericTestUtils.getMethodName() + "_test.dat"));
+ DatanodeDescriptor dnd = mock(DatanodeDescriptor.class);
+ PermissionStatus ps =
+ new PermissionStatus("test", "test", new FsPermission((short)0777));
+
+ mockFileBlocks(2, HdfsConstants.BlockUCState.COMMITTED,
+ HdfsConstants.BlockUCState.UNDER_CONSTRUCTION, file, dnd, ps, true);
+
+ BlockInfo lastBlock = fsn.dir.getFileINode(anyString()).getLastBlock();
+ when(((BlockInfoUnderConstruction)lastBlock).getBlockRecoveryId()).thenReturn(recoveryId-100);
+
+ try {
+ fsn.commitBlockSynchronization(lastBlock,
+ recoveryId, newSize, true, false, new DatanodeID[1]);
+ } catch (IOException ioe) {
+ assertTrue(ioe.getMessage().startsWith("The recovery id " + recoveryId + " does not match current recovery id " + (recoveryId-100)));
+ }
+ }
+
+ @Test
+ public void testCommitBlockSynchronization_WrongLesserRecoveryID()
+ throws IOException {
+ LOG.debug("Running " + GenericTestUtils.getMethodName());
+ long recoveryId = 2002;
+ long newSize = 273487234;
+ Path file =
+ spy(new Path("/" + GenericTestUtils.getMethodName() + "_test.dat"));
+ DatanodeDescriptor dnd = mock(DatanodeDescriptor.class);
+ PermissionStatus ps =
+ new PermissionStatus("test", "test", new FsPermission((short)0777));
+
+ mockFileBlocks(2, HdfsConstants.BlockUCState.COMMITTED,
+ HdfsConstants.BlockUCState.UNDER_CONSTRUCTION, file, dnd, ps, true);
+
+ BlockInfo lastBlock = fsn.dir.getFileINode(anyString()).getLastBlock();
+ when(((BlockInfoUnderConstruction)lastBlock).getBlockRecoveryId()).thenReturn(recoveryId+100);
+
+ try {
+ fsn.commitBlockSynchronization(lastBlock,
+ recoveryId, newSize, true, false, new DatanodeID[1]);
+ } catch (IOException ioe) {
+ assertTrue(ioe.getMessage().startsWith("The recovery id " + recoveryId + " does not match current recovery id " + (recoveryId+100)));
+ }
+ }
+
+ @Test
+ public void testCommitBlockSynchronization_EqualRecoveryID()
+ throws IOException {
+ LOG.debug("Running " + GenericTestUtils.getMethodName());
+ long recoveryId = 2002;
+ long newSize = 273487234;
+ Path file =
+ spy(new Path("/" + GenericTestUtils.getMethodName() + "_test.dat"));
+ DatanodeDescriptor dnd = mock(DatanodeDescriptor.class);
+ PermissionStatus ps =
+ new PermissionStatus("test", "test", new FsPermission((short)0777));
+
+ mockFileBlocks(2, HdfsConstants.BlockUCState.COMMITTED,
+ HdfsConstants.BlockUCState.UNDER_CONSTRUCTION, file, dnd, ps, true);
+
+ BlockInfo lastBlock = fsn.dir.getFileINode(anyString()).getLastBlock();
+ when(((BlockInfoUnderConstruction)lastBlock).getBlockRecoveryId()).thenReturn(recoveryId);
+
+ boolean recoveryChecked = false;
+ try {
+ fsn.commitBlockSynchronization(lastBlock,
+ recoveryId, newSize, true, false, new DatanodeID[1]);
+ } catch (NullPointerException ioe) {
+ // It is fine to get NPE here because the datanodes array is empty
+ recoveryChecked = true;
+ }
+ assertTrue("commitBlockSynchronization had to throw NPE here", recoveryChecked);
+ }
+
+ private void mockFileBlocks(int fileBlocksNumber,
+ HdfsConstants.BlockUCState penUltState,
+ HdfsConstants.BlockUCState lastState,
+ Path file, DatanodeDescriptor dnd,
+ PermissionStatus ps,
+ boolean setStoredBlock) throws IOException {
BlockInfo b = mock(BlockInfo.class);
BlockInfoUnderConstruction b1 = mock(BlockInfoUnderConstruction.class);
when(b.getBlockUCState()).thenReturn(penUltState);
when(b1.getBlockUCState()).thenReturn(lastState);
- BlockInfo[] blocks = new BlockInfo[]{b, b1};
+ BlockInfo[] blocks;
FSDirectory fsDir = mock(FSDirectory.class);
INodeFileUnderConstruction iNFmock = mock(INodeFileUnderConstruction.class);
+
fsn.dir = fsDir;
FSImage fsImage = mock(FSImage.class);
FSEditLog editLog = mock(FSEditLog.class);
@@ -203,14 +379,34 @@
when(fsn.getFSImage().getEditLog()).thenReturn(editLog);
fsn.getFSImage().setFSNamesystem(fsn);
+ switch (fileBlocksNumber) {
+ case 0:
+ blocks = new BlockInfo[0];
+ break;
+ case 1:
+ blocks = new BlockInfo[]{b1};
+ when(iNFmock.getLastBlock()).thenReturn(b1);
+ break;
+ default:
+ when(iNFmock.getPenultimateBlock()).thenReturn(b);
+ when(iNFmock.getLastBlock()).thenReturn(b1);
+ blocks = new BlockInfo[]{b, b1};
+ }
+
when(iNFmock.getBlocks()).thenReturn(blocks);
- when(iNFmock.numBlocks()).thenReturn(2);
- when(iNFmock.getPenultimateBlock()).thenReturn(b);
- when(iNFmock.getLastBlock()).thenReturn(b1);
+ when(iNFmock.numBlocks()).thenReturn(blocks.length);
when(iNFmock.isUnderConstruction()).thenReturn(true);
+ when(iNFmock.convertToInodeFile()).thenReturn(iNFmock);
fsDir.addFile(file.toString(), ps, (short)3, 1l, "test",
"test-machine", dnd, 1001l);
+ fsn.leaseManager = mock(LeaseManager.class);
+ fsn.leaseManager.addLease("mock-lease", file.toString());
+ if (setStoredBlock) {
+ when(b1.getINode()).thenReturn(iNFmock);
+ fsn.blockManager.blocksMap.addINode(b1, iNFmock);
+ }
+
when(fsDir.getFileINode(anyString())).thenReturn(iNFmock);
}