Merging changes r1090113:r1095461 from trunk to federation
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-1052@1095512 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 923576f..1524c6d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -258,6 +258,16 @@
HDFS-1813. Federation: Authentication using BlockToken in RPC to datanode
fails. (jitendra)
+ HDFS_1630. Support fsedits checksum. (hairong)
+
+ HDFS-1606. Provide a stronger data guarantee in the write pipeline by
+ adding a new datanode when an existing datanode failed. (szetszwo)
+
+ HDFS-1442. Api to get delegation token in Hdfs class. (jitendra)
+
+ HDFS-1070. Speedup namenode image loading and saving by storing only
+ local file names. (hairong)
+
IMPROVEMENTS
HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel)
@@ -327,6 +337,25 @@
HDFS-1767. Namenode ignores non-initial block report from datanodes
when in safemode during startup. (Matt Foley via suresh)
+ HDFS-1817. Move pipeline_Fi_[39-51] from TestFiDataTransferProtocol
+ to TestFiPipelineClose. (szetszwo)
+
+ HDFS-1760. In FSDirectory.getFullPathName(..), it is better to return "/"
+ for root directory instead of an empty string. (Daryn Sharp via szetszwo)
+
+ HDFS-1833. Reduce repeated string constructions and unnecessary fields,
+ and fix comments in BlockReceiver.PacketResponder. (szetszwo)
+
+ HDFS-1486. Generalize CLITest structure and interfaces to faciliate
+ upstream adoption (e.g. for web testing). (cos)
+
+ HDFS-1844. Move "fs -help" shell command tests from HDFS to COMMOM; see
+ also HADOOP-7230. (Daryn Sharp via szetszwo)
+
+ HDFS-1840. In DFSClient, terminate the lease renewing thread when all files
+ being written are closed for a grace period, and start a new thread when
+ new files are opened for write. (szetszwo)
+
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
@@ -397,6 +426,18 @@
HDFS-1543. Reduce dev. cycle time by moving system testing artifacts from
default build and push to maven for HDFS (Luke Lu via cos)
+ HDFS-1818. TestHDFSCLI is failing on trunk after HADOOP-7202.
+ (Aaron T. Myers via todd)
+
+ HDFS-1828. TestBlocksWithNotEnoughRacks intermittently fails assert.
+ (Matt Foley via eli)
+
+ HDFS-1824. delay instantiation of file system object until it is
+ needed (linked to HADOOP-7207) (boryas)
+
+ HDFS-1831. Fix append bug in FileContext and implement CreateFlag
+ check (related to HADOOP-7223). (suresh)
+
Release 0.22.0 - Unreleased
NEW FEATURES
@@ -853,9 +894,20 @@
HDFS-1781. Fix the path for jsvc in bin/hdfs. (John George via szetszwo)
- HDFS-1782. Fix an NPE in RFSNamesystem.startFileInternal(..).
+ HDFS-1782. Fix an NPE in FSNamesystem.startFileInternal(..).
(John George via szetszwo)
+ HDFS-1821. Fix username resolution in NameNode.createSymlink(..) and
+ FSDirectory.addSymlink(..). (John George via szetszwo)
+
+ HDFS-1806. TestBlockReport.blockReport_08() and _09() are timing-dependent
+ and likely to fail on fast servers. (Matt Foley via eli)
+
+ HDFS-1845. Symlink comes up as directory after namenode restart.
+ (John George via eli)
+
+ HDFS-1666. Disable failing hdfsproxy test TestAuthorizationFilter (todd)
+
Release 0.21.1 - Unreleased
HDFS-1411. Correct backup node startup command in hdfs user guide.
diff --git a/src/contrib/build.xml b/src/contrib/build.xml
index 05e781f..8bbc85e 100644
--- a/src/contrib/build.xml
+++ b/src/contrib/build.xml
@@ -46,9 +46,11 @@
<!-- Test all the contribs. -->
<!-- ====================================================== -->
<target name="test">
+ <!-- hdfsproxy tests failing due to HDFS-1666
<subant target="test">
<fileset dir="." includes="hdfsproxy/build.xml"/>
</subant>
+ -->
</target>
diff --git a/src/java/hdfs-default.xml b/src/java/hdfs-default.xml
index 03a2c2c..2bb84e6 100644
--- a/src/java/hdfs-default.xml
+++ b/src/java/hdfs-default.xml
@@ -318,6 +318,42 @@
</property>
<property>
+ <name>dfs.client.block.write.replace-datanode-on-failure.enable</name>
+ <value>ture</value>
+ <description>
+ If there is a datanode/network failure in the write pipeline,
+ DFSClient will try to remove the failed datanode from the pipeline
+ and then continue writing with the remaining datanodes. As a result,
+ the number of datanodes in the pipeline is decreased. The feature is
+ to add new datanodes to the pipeline.
+
+ This is a site-wise property to enable/disable the feature.
+
+ See also dfs.client.block.write.replace-datanode-on-failure.policy
+ </description>
+</property>
+
+<property>
+ <name>dfs.client.block.write.replace-datanode-on-failure.policy</name>
+ <value>DEFAULT</value>
+ <description>
+ This property is used only if the value of
+ dfs.client.block.write.replace-datanode-on-failure.enable is true.
+
+ ALWAYS: always add a new datanode when an existing datanode is removed.
+
+ NEVER: never add a new datanode.
+
+ DEFAULT:
+ Let r be the replication number.
+ Let n be the number of existing datanodes.
+ Add a new datanode only if r is greater than or equal to 3 and either
+ (1) floor(r/2) is greater than or equal to n; or
+ (2) r is greater than n and the block is hflushed/appended.
+ </description>
+</property>
+
+<property>
<name>dfs.blockreport.intervalMsec</name>
<value>21600000</value>
<description>Determines block reporting interval in milliseconds.</description>
diff --git a/src/java/org/apache/hadoop/fs/Hdfs.java b/src/java/org/apache/hadoop/fs/Hdfs.java
index c674b4c..fbcf042 100644
--- a/src/java/org/apache/hadoop/fs/Hdfs.java
+++ b/src/java/org/apache/hadoop/fs/Hdfs.java
@@ -25,6 +25,8 @@
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.EnumSet;
+import java.util.List;
+import java.util.NoSuchElementException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -37,8 +39,13 @@
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.util.Progressable;
@InterfaceAudience.Private
@@ -249,7 +256,7 @@
if (hasNext()) {
return thisListing.getPartialListing()[i++];
}
- throw new java.util.NoSuchElementException("No more entry in " + src);
+ throw new NoSuchElementException("No more entry in " + src);
}
}
@@ -384,4 +391,43 @@
public Path getLinkTarget(Path p) throws IOException {
return new Path(dfs.getLinkTarget(getUriPath(p)));
}
+
+ @Override //AbstractFileSystem
+ public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
+ Token<DelegationTokenIdentifier> result = dfs
+ .getDelegationToken(renewer == null ? null : new Text(renewer));
+ result.setService(new Text(this.getCanonicalServiceName()));
+ List<Token<?>> tokenList = new ArrayList<Token<?>>();
+ tokenList.add(result);
+ return tokenList;
+ }
+
+ /**
+ * Renew an existing delegation token.
+ *
+ * @param token delegation token obtained earlier
+ * @return the new expiration time
+ * @throws InvalidToken
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ public long renewDelegationToken(
+ Token<? extends AbstractDelegationTokenIdentifier> token)
+ throws InvalidToken, IOException {
+ return dfs.renewDelegationToken((Token<DelegationTokenIdentifier>) token);
+ }
+
+ /**
+ * Cancel an existing delegation token.
+ *
+ * @param token delegation token
+ * @throws InvalidToken
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ public void cancelDelegationToken(
+ Token<? extends AbstractDelegationTokenIdentifier> token)
+ throws InvalidToken, IOException {
+ dfs.cancelDelegationToken((Token<DelegationTokenIdentifier>) token);
+ }
}
diff --git a/src/java/org/apache/hadoop/hdfs/DFSClient.java b/src/java/org/apache/hadoop/hdfs/DFSClient.java
index f528d24..d0c6d97 100644
--- a/src/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/src/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -45,6 +45,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
@@ -128,15 +129,16 @@
private volatile long serverDefaultsLastUpdate;
static Random r = new Random();
final String clientName;
- final LeaseChecker leasechecker = new LeaseChecker();
Configuration conf;
long defaultBlockSize;
private short defaultReplication;
SocketFactory socketFactory;
int socketTimeout;
final int writePacketSize;
+ final DataTransferProtocol.ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
final FileSystem.Statistics stats;
final int hdfsTimeout; // timeout value for a DFS operation.
+ final LeaseChecker leasechecker;
/**
* The locking hierarchy is to first acquire lock on DFSClient object, followed by
@@ -248,8 +250,11 @@
this.writePacketSize =
conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
+ this.dtpReplaceDatanodeOnFailure = DataTransferProtocol.ReplaceDatanodeOnFailure.get(conf);
+
// The hdfsTimeout is currently the same as the ipc timeout
this.hdfsTimeout = Client.getTimeout(conf);
+ this.leasechecker = new LeaseChecker(hdfsTimeout);
this.ugi = UserGroupInformation.getCurrentUser();
@@ -570,8 +575,9 @@
int buffersize)
throws IOException {
return create(src, FsPermission.getDefault(),
- overwrite ? EnumSet.of(CreateFlag.OVERWRITE) : EnumSet.of(CreateFlag.CREATE),
- replication, blockSize, progress, buffersize);
+ overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+ : EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress,
+ buffersize);
}
/**
@@ -637,9 +643,29 @@
}
/**
+ * Append to an existing file if {@link CreateFlag#APPEND} is present
+ */
+ private OutputStream primitiveAppend(String src, EnumSet<CreateFlag> flag,
+ int buffersize, Progressable progress) throws IOException {
+ if (flag.contains(CreateFlag.APPEND)) {
+ HdfsFileStatus stat = getFileInfo(src);
+ if (stat == null) { // No file to append to
+ // New file needs to be created if create option is present
+ if (!flag.contains(CreateFlag.CREATE)) {
+ throw new FileNotFoundException("failed to append to non-existent file "
+ + src + " on client " + clientName);
+ }
+ return null;
+ }
+ return callAppend(stat, src, buffersize, progress);
+ }
+ return null;
+ }
+
+ /**
* Same as {{@link #create(String, FsPermission, EnumSet, short, long,
* Progressable, int)} except that the permission
- * is absolute (ie has already been masked with umask.
+ * is absolute (ie has already been masked with umask.
*/
public OutputStream primitiveCreate(String src,
FsPermission absPermission,
@@ -652,9 +678,13 @@
int bytesPerChecksum)
throws IOException, UnresolvedLinkException {
checkOpen();
- OutputStream result = new DFSOutputStream(this, src, absPermission,
- flag, createParent, replication, blockSize, progress, buffersize,
- bytesPerChecksum);
+ CreateFlag.validate(flag);
+ OutputStream result = primitiveAppend(src, flag, buffersize, progress);
+ if (result == null) {
+ result = new DFSOutputStream(this, src, absPermission,
+ flag, createParent, replication, blockSize, progress, buffersize,
+ bytesPerChecksum);
+ }
leasechecker.put(src, result);
return result;
}
@@ -696,6 +726,25 @@
}
}
+ /** Method to get stream returned by append call */
+ private OutputStream callAppend(HdfsFileStatus stat, String src,
+ int buffersize, Progressable progress) throws IOException {
+ LocatedBlock lastBlock = null;
+ try {
+ lastBlock = namenode.append(src, clientName);
+ } catch(RemoteException re) {
+ throw re.unwrapRemoteException(AccessControlException.class,
+ FileNotFoundException.class,
+ SafeModeException.class,
+ DSQuotaExceededException.class,
+ UnsupportedOperationException.class,
+ UnresolvedPathException.class);
+ }
+ return new DFSOutputStream(this, src, buffersize, progress,
+ lastBlock, stat, conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
+ DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT));
+ }
+
/**
* Append to an existing HDFS file.
*
@@ -709,22 +758,8 @@
OutputStream append(String src, int buffersize, Progressable progress)
throws IOException {
checkOpen();
- HdfsFileStatus stat = null;
- LocatedBlock lastBlock = null;
- try {
- stat = getFileInfo(src);
- lastBlock = namenode.append(src, clientName);
- } catch(RemoteException re) {
- throw re.unwrapRemoteException(AccessControlException.class,
- FileNotFoundException.class,
- SafeModeException.class,
- DSQuotaExceededException.class,
- UnsupportedOperationException.class,
- UnresolvedPathException.class);
- }
- OutputStream result = new DFSOutputStream(this, src, buffersize, progress,
- lastBlock, stat, conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
- DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT));
+ HdfsFileStatus stat = getFileInfo(src);
+ OutputStream result = callAppend(stat, src, buffersize, progress);
leasechecker.put(src, result);
return result;
}
@@ -1325,38 +1360,106 @@
}
}
- boolean isLeaseCheckerStarted() {
- return leasechecker.daemon != null;
- }
-
/** Lease management*/
- class LeaseChecker implements Runnable {
+ class LeaseChecker {
+ static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L;
+ static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
/** A map from src -> DFSOutputStream of files that are currently being
* written by this client.
*/
private final SortedMap<String, OutputStream> pendingCreates
= new TreeMap<String, OutputStream>();
+ /** The time in milliseconds that the map became empty. */
+ private long emptyTime = Long.MAX_VALUE;
+ /** A fixed lease renewal time period in milliseconds */
+ private final long renewal;
+ /** A daemon for renewing lease */
private Daemon daemon = null;
-
+ /** Only the daemon with currentId should run. */
+ private int currentId = 0;
+
+ /**
+ * A period in milliseconds that the lease renewer thread should run
+ * after the map became empty.
+ * If the map is empty for a time period longer than the grace period,
+ * the renewer should terminate.
+ */
+ private long gracePeriod;
+ /**
+ * The time period in milliseconds
+ * that the renewer sleeps for each iteration.
+ */
+ private volatile long sleepPeriod;
+
+ private LeaseChecker(final long timeout) {
+ this.renewal = (timeout > 0 && timeout < LEASE_SOFTLIMIT_PERIOD)?
+ timeout/2: LEASE_SOFTLIMIT_PERIOD/2;
+ setGraceSleepPeriod(LEASE_RENEWER_GRACE_DEFAULT);
+ }
+
+ /** Set the grace period and adjust the sleep period accordingly. */
+ void setGraceSleepPeriod(final long gracePeriod) {
+ if (gracePeriod < 100L) {
+ throw new HadoopIllegalArgumentException(gracePeriod
+ + " = gracePeriod < 100ms is too small.");
+ }
+ synchronized(this) {
+ this.gracePeriod = gracePeriod;
+ }
+ final long half = gracePeriod/2;
+ this.sleepPeriod = half < LEASE_RENEWER_SLEEP_DEFAULT?
+ half: LEASE_RENEWER_SLEEP_DEFAULT;
+ }
+
+ /** Is the daemon running? */
+ synchronized boolean isRunning() {
+ return daemon != null && daemon.isAlive();
+ }
+
+ /** Is the empty period longer than the grace period? */
+ private synchronized boolean isRenewerExpired() {
+ return emptyTime != Long.MAX_VALUE
+ && System.currentTimeMillis() - emptyTime > gracePeriod;
+ }
+
synchronized void put(String src, OutputStream out) {
if (clientRunning) {
- if (daemon == null) {
- daemon = new Daemon(this);
+ if (daemon == null || isRenewerExpired()) {
+ //start a new deamon with a new id.
+ final int id = ++currentId;
+ daemon = new Daemon(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ LeaseChecker.this.run(id);
+ } catch(InterruptedException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(LeaseChecker.this.getClass().getSimpleName()
+ + " is interrupted.", e);
+ }
+ }
+ }
+ });
daemon.start();
}
pendingCreates.put(src, out);
+ emptyTime = Long.MAX_VALUE;
}
}
synchronized void remove(String src) {
pendingCreates.remove(src);
+ if (pendingCreates.isEmpty() && emptyTime == Long.MAX_VALUE) {
+ //discover the first time that the map is empty.
+ emptyTime = System.currentTimeMillis();
+ }
}
void interruptAndJoin() throws InterruptedException {
Daemon daemonCopy = null;
synchronized (this) {
- if (daemon != null) {
+ if (isRunning()) {
daemon.interrupt();
daemonCopy = daemon;
}
@@ -1423,37 +1526,30 @@
* Periodically check in with the namenode and renew all the leases
* when the lease period is half over.
*/
- public void run() {
- long lastRenewed = 0;
- int renewal = (int)(LEASE_SOFTLIMIT_PERIOD / 2);
- if (hdfsTimeout > 0) {
- renewal = Math.min(renewal, hdfsTimeout/2);
- }
- while (clientRunning && !Thread.interrupted()) {
- if (System.currentTimeMillis() - lastRenewed > renewal) {
+ private void run(final int id) throws InterruptedException {
+ for(long lastRenewed = System.currentTimeMillis();
+ clientRunning && !Thread.interrupted();
+ Thread.sleep(sleepPeriod)) {
+ if (System.currentTimeMillis() - lastRenewed >= renewal) {
try {
renew();
lastRenewed = System.currentTimeMillis();
} catch (SocketTimeoutException ie) {
- LOG.warn("Problem renewing lease for " + clientName +
- " for a period of " + (hdfsTimeout/1000) +
- " seconds. Shutting down HDFS client...", ie);
+ LOG.warn("Failed to renew lease for " + clientName + " for "
+ + (renewal/1000) + " seconds. Aborting ...", ie);
abort();
break;
} catch (IOException ie) {
- LOG.warn("Problem renewing lease for " + clientName +
- " for a period of " + (hdfsTimeout/1000) +
- " seconds. Will retry shortly...", ie);
+ LOG.warn("Failed to renew lease for " + clientName + " for "
+ + (renewal/1000) + " seconds. Will retry shortly ...", ie);
}
}
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ie) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(this + " is interrupted.", ie);
+ synchronized(this) {
+ if (id != currentId || isRenewerExpired()) {
+ //no longer the current daemon or expired
+ return;
}
- return;
}
}
}
diff --git a/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 8737c4b..25616a6 100644
--- a/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -40,6 +40,10 @@
public static final int DFS_BYTES_PER_CHECKSUM_DEFAULT = 512;
public static final String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";
public static final int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
+ public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY = "dfs.client.block.write.replace-datanode-on-failure.enable";
+ public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT = true;
+ public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY = "dfs.client.block.write.replace-datanode-on-failure.policy";
+ public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT = "DEFAULT";
public static final String DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address";
public static final String DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100";
diff --git a/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index efc460e..53add30 100644
--- a/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -31,8 +31,10 @@
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.EnumSet;
import java.util.LinkedList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
@@ -96,9 +98,6 @@
* starts sending packets from the dataQueue.
****************************************************************/
class DFSOutputStream extends FSOutputSummer implements Syncable {
- /**
- *
- */
private final DFSClient dfsClient;
private Configuration conf;
private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
@@ -295,10 +294,18 @@
private BlockConstructionStage stage; // block construction stage
private long bytesSent = 0; // number of bytes that've been sent
+ /** Nodes have been used in the pipeline before and have failed. */
+ private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>();
+ /** Has the current block been hflushed? */
+ private boolean isHflushed = false;
+ /** Append on an existing block? */
+ private final boolean isAppend;
+
/**
* Default construction for file create
*/
private DataStreamer() {
+ isAppend = false;
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
}
@@ -311,6 +318,7 @@
*/
private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
int bytesPerChecksum) throws IOException {
+ isAppend = true;
stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
block = lastBlock.getBlock();
bytesSent = block.getNumBytes();
@@ -750,6 +758,105 @@
return doSleep;
}
+ private void setHflush() {
+ isHflushed = true;
+ }
+
+ private int findNewDatanode(final DatanodeInfo[] original
+ ) throws IOException {
+ if (nodes.length != original.length + 1) {
+ throw new IOException("Failed to add a datanode:"
+ + " nodes.length != original.length + 1, nodes="
+ + Arrays.asList(nodes) + ", original=" + Arrays.asList(original));
+ }
+ for(int i = 0; i < nodes.length; i++) {
+ int j = 0;
+ for(; j < original.length && !nodes[i].equals(original[j]); j++);
+ if (j == original.length) {
+ return i;
+ }
+ }
+ throw new IOException("Failed: new datanode not found: nodes="
+ + Arrays.asList(nodes) + ", original=" + Arrays.asList(original));
+ }
+
+ private void addDatanode2ExistingPipeline() throws IOException {
+ if (DataTransferProtocol.LOG.isDebugEnabled()) {
+ DataTransferProtocol.LOG.debug("lastAckedSeqno = " + lastAckedSeqno);
+ }
+ /*
+ * Is data transfer necessary? We have the following cases.
+ *
+ * Case 1: Failure in Pipeline Setup
+ * - Append
+ * + Transfer the stored replica, which may be a RBW or a finalized.
+ * - Create
+ * + If no data, then no transfer is required.
+ * + If there are data written, transfer RBW. This case may happens
+ * when there are streaming failure earlier in this pipeline.
+ *
+ * Case 2: Failure in Streaming
+ * - Append/Create:
+ * + transfer RBW
+ *
+ * Case 3: Failure in Close
+ * - Append/Create:
+ * + no transfer, let NameNode replicates the block.
+ */
+ if (!isAppend && lastAckedSeqno < 0
+ && stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
+ //no data have been written
+ return;
+ } else if (stage == BlockConstructionStage.PIPELINE_CLOSE
+ || stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
+ //pipeline is closing
+ return;
+ }
+
+ //get a new datanode
+ final DatanodeInfo[] original = nodes;
+ final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode(
+ src, block, nodes, failed.toArray(new DatanodeInfo[failed.size()]),
+ 1, dfsClient.clientName);
+ nodes = lb.getLocations();
+
+ //find the new datanode
+ final int d = findNewDatanode(original);
+
+ //transfer replica
+ final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1];
+ final DatanodeInfo[] targets = {nodes[d]};
+ transfer(src, targets, lb.getBlockToken());
+ }
+
+ private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
+ final Token<BlockTokenIdentifier> blockToken) throws IOException {
+ //transfer replica to the new datanode
+ Socket sock = null;
+ DataOutputStream out = null;
+ DataInputStream in = null;
+ try {
+ sock = createSocketForPipeline(src, 2, dfsClient);
+ final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
+ out = new DataOutputStream(new BufferedOutputStream(
+ NetUtils.getOutputStream(sock, writeTimeout),
+ DataNode.SMALL_BUFFER_SIZE));
+
+ //send the TRANSFER_BLOCK request
+ DataTransferProtocol.Sender.opTransferBlock(out, block,
+ dfsClient.clientName, targets, blockToken);
+
+ //ack
+ in = new DataInputStream(NetUtils.getInputStream(sock));
+ if (SUCCESS != DataTransferProtocol.Status.read(in)) {
+ throw new IOException("Failed to add a datanode");
+ }
+ } finally {
+ IOUtils.closeStream(in);
+ IOUtils.closeStream(out);
+ IOUtils.closeSocket(sock);
+ }
+ }
/**
* Open a DataOutputStream to a DataNode pipeline so that
@@ -793,6 +900,8 @@
DFSClient.LOG.warn("Error Recovery for block " + block +
" in pipeline " + pipelineMsg +
": bad datanode " + nodes[errorIndex].getName());
+ failed.add(nodes[errorIndex]);
+
DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
@@ -803,6 +912,12 @@
errorIndex = -1;
}
+ // Check if replace-datanode policy is satisfied.
+ if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(blockReplication,
+ nodes, isAppend, isHflushed)) {
+ addDatanode2ExistingPipeline();
+ }
+
// get a new generation stamp and an access token
LocatedBlock lb = dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName);
newGS = lb.getBlock().getGenerationStamp();
@@ -888,7 +1003,7 @@
boolean result = false;
try {
- s = createSocketForPipeline(nodes, dfsClient);
+ s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
//
@@ -1026,18 +1141,19 @@
/**
* Create a socket for a write pipeline
- * @param datanodes the datanodes on the pipeline
+ * @param first the first datanode
+ * @param length the pipeline length
* @param client
* @return the socket connected to the first datanode
*/
- static Socket createSocketForPipeline(final DatanodeInfo[] datanodes,
- final DFSClient client) throws IOException {
+ static Socket createSocketForPipeline(final DatanodeInfo first,
+ final int length, final DFSClient client) throws IOException {
if(DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("Connecting to datanode " + datanodes[0].getName());
+ DFSClient.LOG.debug("Connecting to datanode " + first.getName());
}
- final InetSocketAddress isa = NetUtils.createSocketAddr(datanodes[0].getName());
+ final InetSocketAddress isa = NetUtils.createSocketAddr(first.getName());
final Socket sock = client.socketFactory.createSocket();
- final int timeout = client.getDatanodeReadTimeout(datanodes.length);
+ final int timeout = client.getDatanodeReadTimeout(length);
NetUtils.connect(sock, isa, timeout);
sock.setSoTimeout(timeout);
sock.setSendBufferSize(DFSClient.DEFAULT_DATA_SOCKET_SIZE);
@@ -1363,6 +1479,12 @@
throw ioe;
}
}
+
+ synchronized(this) {
+ if (streamer != null) {
+ streamer.setHflush();
+ }
+ }
} catch (InterruptedIOException interrupt) {
// This kind of error doesn't mean that the stream itself is broken - just the
// flushing thread got interrupted. So, we shouldn't close down the writer,
@@ -1577,7 +1699,7 @@
/**
* Returns the access token currently used by streamer, for testing only
*/
- Token<BlockTokenIdentifier> getBlockToken() {
+ synchronized Token<BlockTokenIdentifier> getBlockToken() {
return streamer.getBlockToken();
}
diff --git a/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index a4cb9e3..3e7de73 100644
--- a/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -24,6 +24,7 @@
import java.net.URI;
import java.util.ArrayList;
import java.util.EnumSet;
+import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -242,9 +243,9 @@
Progressable progress) throws IOException {
statistics.incrementWriteOps(1);
return new FSDataOutputStream(dfs.create(getPathName(f), permission,
- overwrite ? EnumSet.of(CreateFlag.OVERWRITE) : EnumSet.of(CreateFlag.CREATE),
- replication, blockSize, progress, bufferSize),
- statistics);
+ overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+ : EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress,
+ bufferSize), statistics);
}
@SuppressWarnings("deprecation")
@@ -266,6 +267,9 @@
EnumSet<CreateFlag> flag, int bufferSize, short replication,
long blockSize, Progressable progress) throws IOException {
statistics.incrementWriteOps(1);
+ if (flag.contains(CreateFlag.OVERWRITE)) {
+ flag.add(CreateFlag.CREATE);
+ }
return new FSDataOutputStream(dfs.create(getPathName(f), permission, flag,
false, replication, blockSize, progress, bufferSize), statistics);
}
@@ -810,6 +814,14 @@
throws IOException {
return dfs.getDelegationToken(renewer);
}
+
+ @Override // FileSystem
+ public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
+ List<Token<?>> tokenList = new ArrayList<Token<?>>();
+ Token<DelegationTokenIdentifier> token = this.getDelegationToken(renewer);
+ tokenList.add(token);
+ return tokenList;
+ }
/**
* Renew an existing delegation token.
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
index 22e8e04..9db6fa4 100644
--- a/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
+++ b/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
@@ -67,9 +67,9 @@
* Compared to the previous version the following changes have been introduced:
* (Only the latest change is reflected.
* The log of historical changes can be retrieved from the svn).
- * 66: Add block pool ID to Block
+ * 67: Add block pool ID to Block
*/
- public static final long versionID = 66L;
+ public static final long versionID = 67L;
///////////////////////////////////////
// File contents
@@ -298,6 +298,30 @@
NotReplicatedYetException, SafeModeException, UnresolvedLinkException,
IOException;
+ /**
+ * Get a datanode for an existing pipeline.
+ *
+ * @param src the file being written
+ * @param blk the block being written
+ * @param existings the existing nodes in the pipeline
+ * @param excludes the excluded nodes
+ * @param numAdditionalNodes number of additional datanodes
+ * @param clientName the name of the client
+ *
+ * @return the located block.
+ *
+ * @throws AccessControlException If access is denied
+ * @throws FileNotFoundException If file <code>src</code> is not found
+ * @throws SafeModeException create not allowed in safemode
+ * @throws UnresolvedLinkException If <code>src</code> contains a symlink
+ * @throws IOException If an I/O error occurred
+ */
+ public LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
+ final DatanodeInfo[] existings, final DatanodeInfo[] excludes,
+ final int numAdditionalNodes, final String clientName
+ ) throws AccessControlException, FileNotFoundException,
+ SafeModeException, UnresolvedLinkException, IOException;
+
/**
* The client is done writing data to the given filename, and would
* like to complete it.
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java b/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
index a225982..2a4895b 100644
--- a/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
+++ b/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
@@ -25,8 +25,13 @@
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -39,18 +44,18 @@
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface DataTransferProtocol {
-
+ public static final Log LOG = LogFactory.getLog(DataTransferProtocol.class);
/** Version for data transfers between clients and datanodes
* This should change when serialization of DatanodeInfo, not just
* when protocol changes. It is not very obvious.
*/
/*
- * Version 22:
+ * Version 23:
* Changed the protocol methods to use ExtendedBlock instead
* of Block.
*/
- public static final int DATA_TRANSFER_VERSION = 21;
+ public static final int DATA_TRANSFER_VERSION = 23;
/** Operation */
public enum Op {
@@ -750,4 +755,93 @@
}
}
+ /**
+ * The setting of replace-datanode-on-failure feature.
+ */
+ public enum ReplaceDatanodeOnFailure {
+ /** The feature is disabled in the entire site. */
+ DISABLE,
+ /** Never add a new datanode. */
+ NEVER,
+ /**
+ * DEFAULT policy:
+ * Let r be the replication number.
+ * Let n be the number of existing datanodes.
+ * Add a new datanode only if r >= 3 and either
+ * (1) floor(r/2) >= n; or
+ * (2) r > n and the block is hflushed/appended.
+ */
+ DEFAULT,
+ /** Always add a new datanode when an existing datanode is removed. */
+ ALWAYS;
+
+ /** Check if the feature is enabled. */
+ public void checkEnabled() {
+ if (this == DISABLE) {
+ throw new UnsupportedOperationException(
+ "This feature is disabled. Please refer to "
+ + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY
+ + " configuration property.");
+ }
+ }
+
+ /** Is the policy satisfied? */
+ public boolean satisfy(
+ final short replication, final DatanodeInfo[] existings,
+ final boolean isAppend, final boolean isHflushed) {
+ final int n = existings == null? 0: existings.length;
+ if (n == 0 || n >= replication) {
+ //don't need to add datanode for any policy.
+ return false;
+ } else if (this == DISABLE || this == NEVER) {
+ return false;
+ } else if (this == ALWAYS) {
+ return true;
+ } else {
+ //DEFAULT
+ if (replication < 3) {
+ return false;
+ } else {
+ if (n <= (replication/2)) {
+ return true;
+ } else {
+ return isAppend || isHflushed;
+ }
+ }
+ }
+ }
+
+ /** Get the setting from configuration. */
+ public static ReplaceDatanodeOnFailure get(final Configuration conf) {
+ final boolean enabled = conf.getBoolean(
+ DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY,
+ DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT);
+ if (!enabled) {
+ return DISABLE;
+ }
+
+ final String policy = conf.get(
+ DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY,
+ DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT);
+ for(int i = 1; i < values().length; i++) {
+ final ReplaceDatanodeOnFailure rdof = values()[i];
+ if (rdof.name().equalsIgnoreCase(policy)) {
+ return rdof;
+ }
+ }
+ throw new HadoopIllegalArgumentException("Illegal configuration value for "
+ + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY
+ + ": " + policy);
+ }
+
+ /** Write the setting to configuration. */
+ public void write(final Configuration conf) {
+ conf.setBoolean(
+ DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY,
+ this != DISABLE);
+ conf.set(
+ DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY,
+ name());
+ }
+ }
}
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java b/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
index 69f7117..72d8157 100644
--- a/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
+++ b/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
@@ -88,7 +88,7 @@
// Version is reflected in the data storage file.
// Versions are negative.
// Decrement LAYOUT_VERSION to define a new version.
- public static final int LAYOUT_VERSION = -29;
+ public static final int LAYOUT_VERSION = -31;
// Current version:
- // -29: Adding support for block pools and multiple namenodes
+ // -31: Adding support for block pools and multiple namenodes
}
diff --git a/src/java/org/apache/hadoop/hdfs/server/common/Storage.java b/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
index 816a266..b02e083 100644
--- a/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
+++ b/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
@@ -79,7 +79,7 @@
public static final int PRE_RBW_LAYOUT_VERSION = -19;
// last layout version that is before federation
- public static final int LAST_PRE_FEDERATION_LAYOUT_VERSION = -28;
+ public static final int LAST_PRE_FEDERATION_LAYOUT_VERSION = -30;
private static final String STORAGE_FILE_LOCK = "in_use.lock";
protected static final String STORAGE_FILE_VERSION = "VERSION";
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 5fba5ec..9f253e1 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -22,6 +22,7 @@
import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
import java.io.BufferedOutputStream;
+import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
@@ -56,7 +57,7 @@
* may copies it to another site. If a throttler is provided,
* streaming throttling is also supported.
**/
-class BlockReceiver implements java.io.Closeable, FSConstants {
+class BlockReceiver implements Closeable, FSConstants {
public static final Log LOG = DataNode.LOG;
static final Log ClientTraceLog = DataNode.ClientTraceLog;
@@ -652,7 +653,7 @@
DataInputStream mirrIn, // input from next datanode
DataOutputStream replyOut, // output to previous datanode
String mirrAddr, DataTransferThrottler throttlerArg,
- int numTargets) throws IOException {
+ DatanodeInfo[] downstreams) throws IOException {
boolean responderClosed = false;
mirrorOut = mirrOut;
@@ -662,9 +663,8 @@
try {
if (isClient && !isTransfer) {
responder = new Daemon(datanode.threadGroup,
- new PacketResponder(this, block, mirrIn, replyOut,
- numTargets, Thread.currentThread()));
- responder.start(); // start thread to processes reponses
+ new PacketResponder(replyOut, mirrIn, downstreams));
+ responder.start(); // start thread to processes responses
}
/*
@@ -700,8 +700,7 @@
}
} catch (IOException ioe) {
- LOG.info("Exception in receiveBlock for block " + block +
- " " + ioe);
+ LOG.info("Exception in receiveBlock for " + block, ioe);
throw ioe;
} finally {
if (!responderClosed) { // Abnormal termination of the flow above
@@ -808,51 +807,71 @@
}
}
+ private static enum PacketResponderType {
+ NON_PIPELINE, LAST_IN_PIPELINE, HAS_DOWNSTREAM_IN_PIPELINE
+ }
/**
* Processed responses from downstream datanodes in the pipeline
* and sends back replies to the originator.
*/
- class PacketResponder implements Runnable, FSConstants {
+ class PacketResponder implements Runnable, Closeable, FSConstants {
- //packet waiting for ack
- private LinkedList<Packet> ackQueue = new LinkedList<Packet>();
+ /** queue for packets waiting for ack */
+ private final LinkedList<Packet> ackQueue = new LinkedList<Packet>();
+ /** the thread that spawns this responder */
+ private final Thread receiverThread = Thread.currentThread();
+ /** is this responder running? */
private volatile boolean running = true;
- private ExtendedBlock block;
- DataInputStream mirrorIn; // input from downstream datanode
- DataOutputStream replyOut; // output to upstream datanode
- private int numTargets; // number of downstream datanodes including myself
- private BlockReceiver receiver; // The owner of this responder.
- private Thread receiverThread; // the thread that spawns this responder
+ /** input from the next downstream datanode */
+ private final DataInputStream downstreamIn;
+ /** output to upstream datanode/client */
+ private final DataOutputStream upstreamOut;
+
+ /** The type of this responder */
+ private final PacketResponderType type;
+ /** for log and error messages */
+ private final String myString;
+
+ @Override
public String toString() {
- return "PacketResponder " + numTargets + " for Block " + this.block;
+ return myString;
}
- PacketResponder(BlockReceiver receiver, ExtendedBlock b, DataInputStream in,
- DataOutputStream out, int numTargets,
- Thread receiverThread) {
- this.receiverThread = receiverThread;
- this.receiver = receiver;
- this.block = b;
- mirrorIn = in;
- replyOut = out;
- this.numTargets = numTargets;
+ PacketResponder(final DataOutputStream upstreamOut,
+ final DataInputStream downstreamIn,
+ final DatanodeInfo[] downstreams) {
+ this.downstreamIn = downstreamIn;
+ this.upstreamOut = upstreamOut;
+
+ this.type = downstreams == null? PacketResponderType.NON_PIPELINE
+ : downstreams.length == 0? PacketResponderType.LAST_IN_PIPELINE
+ : PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE;
+
+ final StringBuilder b = new StringBuilder(getClass().getSimpleName())
+ .append(": ").append(block).append(", type=").append(type);
+ if (type != PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) {
+ b.append(", downstreams=").append(downstreams.length)
+ .append(":").append(Arrays.asList(downstreams));
+ }
+ this.myString = b.toString();
}
/**
* enqueue the seqno that is still be to acked by the downstream datanode.
* @param seqno
* @param lastPacketInBlock
- * @param lastByteInPacket
+ * @param offsetInBlock
*/
- synchronized void enqueue(long seqno, boolean lastPacketInBlock, long lastByteInPacket) {
+ synchronized void enqueue(final long seqno,
+ final boolean lastPacketInBlock, final long offsetInBlock) {
if (running) {
+ final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock);
if(LOG.isDebugEnabled()) {
- LOG.debug("PacketResponder " + numTargets + " adding seqno " + seqno +
- " to ack queue.");
+ LOG.debug(myString + ": enqueue " + p);
}
- ackQueue.addLast(new Packet(seqno, lastPacketInBlock, lastByteInPacket));
+ ackQueue.addLast(p);
notifyAll();
}
}
@@ -860,7 +879,8 @@
/**
* wait for all pending packets to be acked. Then shutdown thread.
*/
- synchronized void close() {
+ @Override
+ public synchronized void close() {
while (running && ackQueue.size() != 0 && datanode.shouldRun) {
try {
wait();
@@ -869,8 +889,7 @@
}
}
if(LOG.isDebugEnabled()) {
- LOG.debug("PacketResponder " + numTargets +
- " for block " + block + " Closing down.");
+ LOG.debug(myString + ": closing");
}
running = false;
notifyAll();
@@ -892,21 +911,21 @@
PipelineAck ack = new PipelineAck();
long seqno = PipelineAck.UNKOWN_SEQNO;
try {
- if (numTargets != 0 && !mirrorError) {// not the last DN & no mirror error
+ if (type != PacketResponderType.LAST_IN_PIPELINE
+ && !mirrorError) {
// read an ack from downstream datanode
- ack.readFields(mirrorIn);
+ ack.readFields(downstreamIn);
if (LOG.isDebugEnabled()) {
- LOG.debug("PacketResponder " + numTargets + " got " + ack);
+ LOG.debug(myString + " got " + ack);
}
seqno = ack.getSeqno();
}
- if (seqno != PipelineAck.UNKOWN_SEQNO || numTargets == 0) {
+ if (seqno != PipelineAck.UNKOWN_SEQNO
+ || type == PacketResponderType.LAST_IN_PIPELINE) {
synchronized (this) {
while (running && datanode.shouldRun && ackQueue.size() == 0) {
if (LOG.isDebugEnabled()) {
- LOG.debug("PacketResponder " + numTargets +
- " seqno = " + seqno +
- " for block " + block +
+ LOG.debug(myString + ": seqno=" + seqno +
" waiting for local datanode to finish write.");
}
wait();
@@ -916,11 +935,10 @@
}
pkt = ackQueue.getFirst();
expected = pkt.seqno;
- if (numTargets > 0 && seqno != expected) {
- throw new IOException("PacketResponder " + numTargets +
- " for block " + block +
- " expected seqno:" + expected +
- " received:" + seqno);
+ if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE
+ && seqno != expected) {
+ throw new IOException(myString + "seqno: expected="
+ + expected + ", received=" + seqno);
}
lastPacketInBlock = pkt.lastPacketInBlock;
}
@@ -935,8 +953,7 @@
// notify client of the error
// and wait for the client to shut down the pipeline
mirrorError = true;
- LOG.info("PacketResponder " + block + " " + numTargets +
- " Exception " + StringUtils.stringifyException(ioe));
+ LOG.info(myString, ioe);
}
}
@@ -948,8 +965,7 @@
* because this datanode has a problem. The upstream datanode
* will detect that this datanode is bad, and rightly so.
*/
- LOG.info("PacketResponder " + block + " " + numTargets +
- " : Thread is interrupted.");
+ LOG.info(myString + ": Thread is interrupted.");
running = false;
continue;
}
@@ -957,7 +973,7 @@
// If this is the last packet in block, then close block
// file and finalize the block before responding success
if (lastPacketInBlock) {
- receiver.close();
+ BlockReceiver.this.close();
final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
block.setNumBytes(replicaInfo.getNumBytes());
datanode.data.finalizeBlock(block);
@@ -967,13 +983,12 @@
DatanodeRegistration dnR =
datanode.getDNRegistrationForBP(block.getBlockPoolId());
ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
- receiver.inAddr, receiver.myAddr, block.getNumBytes(),
- "HDFS_WRITE", receiver.clientname, offset,
+ inAddr, myAddr, block.getNumBytes(),
+ "HDFS_WRITE", clientname, offset,
dnR.getStorageID(), block, endTime-startTime));
} else {
- LOG.info("Received block " + block +
- " of size " + block.getNumBytes() +
- " from " + receiver.inAddr);
+ LOG.info("Received block " + block + " of size "
+ + block.getNumBytes() + " from " + inAddr);
}
}
@@ -984,7 +999,8 @@
replies[0] = SUCCESS;
replies[1] = ERROR;
} else {
- short ackLen = numTargets == 0 ? 0 : ack.getNumOfReplies();
+ short ackLen = type == PacketResponderType.LAST_IN_PIPELINE? 0
+ : ack.getNumOfReplies();
replies = new Status[1+ackLen];
replies[0] = SUCCESS;
for (int i=0; i<ackLen; i++) {
@@ -994,20 +1010,18 @@
PipelineAck replyAck = new PipelineAck(expected, replies);
// send my ack back to upstream datanode
- replyAck.write(replyOut);
- replyOut.flush();
+ replyAck.write(upstreamOut);
+ upstreamOut.flush();
if (LOG.isDebugEnabled()) {
- LOG.debug("PacketResponder " + numTargets +
- " for block " + block +
- " responded an ack: " + replyAck);
+ LOG.debug(myString + ", replyAck=" + replyAck);
}
if (pkt != null) {
// remove the packet from the ack queue
removeAckHead();
// update bytes acked
if (replyAck.isSuccess() &&
- pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
- replicaInfo.setBytesAcked(pkt.lastByteInBlock);
+ pkt.offsetInBlock > replicaInfo.getBytesAcked()) {
+ replicaInfo.setBytesAcked(pkt.offsetInBlock);
}
}
} catch (IOException e) {
@@ -1018,8 +1032,7 @@
} catch (IOException ioe) {
LOG.warn("DataNode.checkDiskError failed in run() with: ", ioe);
}
- LOG.info("PacketResponder " + block + " " + numTargets +
- " Exception " + StringUtils.stringifyException(e));
+ LOG.info(myString, e);
running = false;
if (!Thread.interrupted()) { // failure not caused by interruption
receiverThread.interrupt();
@@ -1027,15 +1040,13 @@
}
} catch (Throwable e) {
if (running) {
- LOG.info("PacketResponder " + block + " " + numTargets +
- " Exception " + StringUtils.stringifyException(e));
+ LOG.info(myString, e);
running = false;
receiverThread.interrupt();
}
}
}
- LOG.info("PacketResponder " + numTargets +
- " for block " + block + " terminating");
+ LOG.info(myString + " terminating");
}
/**
@@ -1052,15 +1063,23 @@
/**
* This information is cached by the Datanode in the ackQueue.
*/
- static private class Packet {
- long seqno;
- boolean lastPacketInBlock;
- long lastByteInBlock;
+ private static class Packet {
+ final long seqno;
+ final boolean lastPacketInBlock;
+ final long offsetInBlock;
- Packet(long seqno, boolean lastPacketInBlock, long lastByteInPacket) {
+ Packet(long seqno, boolean lastPacketInBlock, long offsetInBlock) {
this.seqno = seqno;
this.lastPacketInBlock = lastPacketInBlock;
- this.lastByteInBlock = lastByteInPacket;
+ this.offsetInBlock = offsetInBlock;
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "(seqno=" + seqno
+ + ", lastPacketInBlock=" + lastPacketInBlock
+ + ", offsetInBlock=" + offsetInBlock
+ + ")";
}
}
}
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 937e631..a1d7ccf 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1907,8 +1907,9 @@
*/
DataTransfer(DatanodeInfo targets[], ExtendedBlock b, BlockConstructionStage stage,
final String clientname) throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug(getClass().getSimpleName() + ": " + b
+ if (DataTransferProtocol.LOG.isDebugEnabled()) {
+ DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": "
+ + b + " (numBytes=" + b.getNumBytes() + ")"
+ ", stage=" + stage
+ ", clientname=" + clientname
+ ", targests=" + Arrays.asList(targets));
@@ -2573,12 +2574,9 @@
* the stored GS and the visible length.
* @param targets
* @param client
- * @return whether the replica is an RBW
*/
- boolean transferReplicaForPipelineRecovery(final ExtendedBlock b,
+ void transferReplicaForPipelineRecovery(final ExtendedBlock b,
final DatanodeInfo[] targets, final String client) throws IOException {
- checkWriteAccess(b);
-
final long storedGS;
final long visible;
final BlockConstructionStage stage;
@@ -2590,7 +2588,8 @@
} else if (data.isValidBlock(b)) {
stage = BlockConstructionStage.TRANSFER_FINALIZED;
} else {
- throw new IOException(b + " is not a RBW or a Finalized");
+ final String r = data.getReplicaString(b.getBlockPoolId(), b.getBlockId());
+ throw new IOException(b + " is neither a RBW nor a Finalized, r=" + r);
}
storedGS = data.getStoredBlock(b.getBlockPoolId(),
@@ -2609,7 +2608,6 @@
if (targets.length > 0) {
new DataTransfer(targets, b, stage, client).run();
}
- return stage == BlockConstructionStage.TRANSFER_RBW;
}
// Determine a Datanode's streaming address
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index a6f4d34..ed94f50 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -154,7 +154,7 @@
datanode.socketWriteTimeout);
DataOutputStream out = new DataOutputStream(
new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
- checkAccess(out, block, blockToken,
+ checkAccess(out, true, block, blockToken,
DataTransferProtocol.Op.READ_BLOCK,
BlockTokenSecretManager.AccessMode.READ);
@@ -258,7 +258,7 @@
new BufferedOutputStream(
NetUtils.getOutputStream(s, datanode.socketWriteTimeout),
SMALL_BUFFER_SIZE));
- checkAccess(isClient? replyOut: null, block, blockToken,
+ checkAccess(replyOut, isClient, block, blockToken,
DataTransferProtocol.Op.WRITE_BLOCK,
BlockTokenSecretManager.AccessMode.WRITE);
@@ -365,7 +365,7 @@
if (blockReceiver != null) {
String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
- mirrorAddr, null, targets.length);
+ mirrorAddr, null, targets);
// send close-ack for transfer-RBW/Finalized
if (isTransfer) {
@@ -419,13 +419,14 @@
final ExtendedBlock blk, final String client,
final DatanodeInfo[] targets,
final Token<BlockTokenIdentifier> blockToken) throws IOException {
- final DataOutputStream out = new DataOutputStream(
- NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
- checkAccess(out, blk, blockToken,
+ checkAccess(null, true, blk, blockToken,
DataTransferProtocol.Op.TRANSFER_BLOCK,
BlockTokenSecretManager.AccessMode.COPY);
updateCurrentThreadName(DataTransferProtocol.Op.TRANSFER_BLOCK + " " + blk);
+
+ final DataOutputStream out = new DataOutputStream(
+ NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
try {
datanode.transferReplicaForPipelineRecovery(blk, targets, client);
SUCCESS.write(out);
@@ -442,7 +443,7 @@
Token<BlockTokenIdentifier> blockToken) throws IOException {
final DataOutputStream out = new DataOutputStream(
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
- checkAccess(out, block, blockToken,
+ checkAccess(out, true, block, blockToken,
DataTransferProtocol.Op.BLOCK_CHECKSUM,
BlockTokenSecretManager.AccessMode.READ);
updateCurrentThreadName("Reading metadata for block " + block);
@@ -634,7 +635,7 @@
// receive a block
blockReceiver.receiveBlock(null, null, null, null,
- dataXceiverServer.balanceThrottler, -1);
+ dataXceiverServer.balanceThrottler, null);
// notify name node
datanode.notifyNamenodeReceivedBlock(block, sourceID);
@@ -699,7 +700,7 @@
}
}
- private void checkAccess(final DataOutputStream out,
+ private void checkAccess(DataOutputStream out, final boolean reply,
final ExtendedBlock blk,
final Token<BlockTokenIdentifier> t,
final DataTransferProtocol.Op op,
@@ -709,7 +710,11 @@
datanode.blockPoolTokenSecretManager.checkAccess(t, null, blk, mode);
} catch(InvalidToken e) {
try {
- if (out != null) {
+ if (reply) {
+ if (out == null) {
+ out = new DataOutputStream(
+ NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
+ }
ERROR_ACCESS_TOKEN.write(out);
if (mode == BlockTokenSecretManager.AccessMode.WRITE) {
DatanodeRegistration dnR =
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java b/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
index 03f38e3..200ac7c 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
@@ -2369,6 +2369,12 @@
return volumeMap.get(bpid, blockId);
}
+ @Override
+ public synchronized String getReplicaString(String bpid, long blockId) {
+ final Replica r = volumeMap.get(bpid, blockId);
+ return r == null? "null": r.toString();
+ }
+
@Override // FSDatasetInterface
public synchronized ReplicaRecoveryInfo initReplicaRecovery(
RecoveringBlock rBlock) throws IOException {
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java b/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
index 449d153..867eebe 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
@@ -107,6 +107,11 @@
public Replica getReplica(String bpid, long blockId);
/**
+ * @return replica meta information
+ */
+ public String getReplicaString(String bpid, long blockId);
+
+ /**
* @return the generation stamp stored with the block.
*/
public Block getStoredBlock(String bpid, long blkid)
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java b/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
index dba232f..40ae574 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
@@ -17,12 +17,15 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.Iterator;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.Checksum;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -226,8 +229,15 @@
// update NameSpace in memory
backupInputStream.setBytes(data);
FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
- logLoader.loadEditRecords(storage.getLayoutVersion(),
- backupInputStream.getDataInputStream(), true);
+ int logVersion = storage.getLayoutVersion();
+ BufferedInputStream bin = new BufferedInputStream(backupInputStream);
+ DataInputStream in = new DataInputStream(bin);
+ Checksum checksum = null;
+ if (logVersion <= -28) { // support fsedits checksum
+ checksum = FSEditLog.getChecksum();
+ in = new DataInputStream(new CheckedInputStream(bin, checksum));
+ }
+ logLoader.loadEditRecords(logVersion, in, checksum, true);
getFSNamesystem().dir.updateCountForINodeWithQuota(); // inefficient!
break;
case INPROGRESS:
@@ -346,14 +356,21 @@
if(jSpoolFile.exists()) {
// load edits.new
EditLogFileInputStream edits = new EditLogFileInputStream(jSpoolFile);
- DataInputStream in = edits.getDataInputStream();
+ BufferedInputStream bin = new BufferedInputStream(edits);
+ DataInputStream in = new DataInputStream(bin);
FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
- numEdits += logLoader.loadFSEdits(in, false);
+ int logVersion = logLoader.readLogVersion(in);
+ Checksum checksum = null;
+ if (logVersion <= -28) { // support fsedits checksum
+ checksum = FSEditLog.getChecksum();
+ in = new DataInputStream(new CheckedInputStream(bin, checksum));
+ }
+ numEdits += logLoader.loadEditRecords(logVersion, in, checksum, false);
// first time reached the end of spool
jsState = JSpoolState.WAIT;
- numEdits += logLoader.loadEditRecords(storage.getLayoutVersion(),
- in, true);
+ numEdits += logLoader.loadEditRecords(logVersion,
+ in, checksum, true);
getFSNamesystem().dir.updateCountForINodeWithQuota();
edits.close();
}
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java b/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
index 292598b..38b0866 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
@@ -24,6 +24,7 @@
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.zip.Checksum;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -84,10 +85,18 @@
/** {@inheritDoc} */
@Override
void write(byte op, Writable... writables) throws IOException {
+ int start = bufCurrent.getLength();
write(op);
for (Writable w : writables) {
w.write(bufCurrent);
}
+ // write transaction checksum
+ int end = bufCurrent.getLength();
+ Checksum checksum = FSEditLog.getChecksum();
+ checksum.reset();
+ checksum.update(bufCurrent.getData(), start, end-start);
+ int sum = (int)checksum.getValue();
+ bufCurrent.writeInt(sum);
}
/**
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 889e1fd..a0c6e3e 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -1285,9 +1285,10 @@
* Check whether the path specifies a directory
*/
boolean isDir(String src) throws UnresolvedLinkException {
+ src = normalizePath(src);
readLock();
try {
- INode node = rootDir.getNode(normalizePath(src), false);
+ INode node = rootDir.getNode(src, false);
return node != null && node.isDirectory();
} finally {
readUnlock();
@@ -1385,6 +1386,12 @@
/** Return the name of the path represented by inodes at [0, pos] */
private static String getFullPathName(INode[] inodes, int pos) {
StringBuilder fullPathName = new StringBuilder();
+ if (inodes[0].isRoot()) {
+ if (pos == 0) return Path.SEPARATOR;
+ } else {
+ fullPathName.append(inodes[0].getLocalName());
+ }
+
for (int i=1; i<=pos; i++) {
fullPathName.append(Path.SEPARATOR_CHAR).append(inodes[i].getLocalName());
}
@@ -2018,7 +2025,7 @@
return null;
}
}
- final String userName = UserGroupInformation.getCurrentUser().getUserName();
+ final String userName = dirPerms.getUserName();
INodeSymlink newNode = unprotectedSymlink(path, target, modTime, modTime,
new PermissionStatus(userName, null, FsPermission.getDefault()));
if (newNode == null) {
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index 9ec33eb..b8e7930 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -47,6 +48,7 @@
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.util.PureJavaCrc32;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
@@ -90,6 +92,18 @@
private NNStorage storage;
+ private static ThreadLocal<Checksum> localChecksum =
+ new ThreadLocal<Checksum>() {
+ protected Checksum initialValue() {
+ return new PureJavaCrc32();
+ }
+ };
+
+ /** Get a thread local checksum */
+ public static Checksum getChecksum() {
+ return localChecksum.get();
+ }
+
private static class TransactionId {
public long txid;
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index c442920..2d70237 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -17,12 +17,16 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import java.io.BufferedInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.EOFException;
import java.io.IOException;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.Checksum;
+import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
@@ -54,42 +58,62 @@
* along.
*/
int loadFSEdits(EditLogInputStream edits) throws IOException {
- DataInputStream in = edits.getDataInputStream();
long startTime = now();
- int numEdits = loadFSEdits(in, true);
+ int numEdits = loadFSEdits(edits, true);
FSImage.LOG.info("Edits file " + edits.getName()
+ " of size " + edits.length() + " edits # " + numEdits
+ " loaded in " + (now()-startTime)/1000 + " seconds.");
return numEdits;
}
- int loadFSEdits(DataInputStream in, boolean closeOnExit) throws IOException {
+ /**
+ * Read the header of fsedit log
+ * @param in fsedit stream
+ * @return the edit log version number
+ * @throws IOException if error occurs
+ */
+ int readLogVersion(DataInputStream in) throws IOException {
+ int logVersion = 0;
+ // Read log file version. Could be missing.
+ in.mark(4);
+ // If edits log is greater than 2G, available method will return negative
+ // numbers, so we avoid having to call available
+ boolean available = true;
+ try {
+ logVersion = in.readByte();
+ } catch (EOFException e) {
+ available = false;
+ }
+ if (available) {
+ in.reset();
+ logVersion = in.readInt();
+ if (logVersion < FSConstants.LAYOUT_VERSION) // future version
+ throw new IOException(
+ "Unexpected version of the file system log file: "
+ + logVersion + ". Current version = "
+ + FSConstants.LAYOUT_VERSION + ".");
+ }
+ assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
+ "Unsupported version " + logVersion;
+ return logVersion;
+ }
+
+ int loadFSEdits(EditLogInputStream edits, boolean closeOnExit) throws IOException {
+ BufferedInputStream bin = new BufferedInputStream(edits);
+ DataInputStream in = new DataInputStream(bin);
+
int numEdits = 0;
int logVersion = 0;
try {
- // Read log file version. Could be missing.
- in.mark(4);
- // If edits log is greater than 2G, available method will return negative
- // numbers, so we avoid having to call available
- boolean available = true;
- try {
- logVersion = in.readByte();
- } catch (EOFException e) {
- available = false;
+ logVersion = readLogVersion(in);
+ Checksum checksum = null;
+ if (logVersion <= -28) { // support fsedits checksum
+ checksum = FSEditLog.getChecksum();
+ in = new DataInputStream(new CheckedInputStream(bin, checksum));
}
- if (available) {
- in.reset();
- logVersion = in.readInt();
- if (logVersion < FSConstants.LAYOUT_VERSION) // future version
- throw new IOException(
- "Unexpected version of the file system log file: "
- + logVersion + ". Current version = "
- + FSConstants.LAYOUT_VERSION + ".");
- }
- assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
- "Unsupported version " + logVersion;
- numEdits = loadEditRecords(logVersion, in, false);
+
+ numEdits = loadEditRecords(logVersion, in, checksum, false);
} finally {
if(closeOnExit)
in.close();
@@ -101,7 +125,7 @@
@SuppressWarnings("deprecation")
int loadEditRecords(int logVersion, DataInputStream in,
- boolean closeOnExit) throws IOException {
+ Checksum checksum, boolean closeOnExit) throws IOException {
FSDirectory fsDir = fsNamesys.dir;
int numEdits = 0;
String clientName = null;
@@ -123,6 +147,9 @@
long blockSize = 0;
FSEditLogOpCodes opCode;
try {
+ if (checksum != null) {
+ checksum.reset();
+ }
in.mark(1);
byte opCodeByte = in.readByte();
opCode = FSEditLogOpCodes.fromByte(opCodeByte);
@@ -480,6 +507,7 @@
throw new IOException("Never seen opCode " + opCode);
}
}
+ validateChecksum(in, checksum, numEdits);
}
} finally {
if(closeOnExit)
@@ -505,6 +533,22 @@
return numEdits;
}
+ /**
+ * Validate a transaction's checksum
+ */
+ private static void validateChecksum(
+ DataInputStream in, Checksum checksum, int tid)
+ throws IOException {
+ if (checksum != null) {
+ int calculatedChecksum = (int)checksum.getValue();
+ int readChecksum = in.readInt(); // read in checksum
+ if (readChecksum != calculatedChecksum) {
+ throw new ChecksumException(
+ "Transaction " + tid + " is corrupt. Calculated checksum is " +
+ calculatedChecksum + " but read checksum " + readChecksum, tid);
+ }
+ }
+ }
/**
* A class to read in blocks stored in the old format. The only two
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
index ddb64f9..5e38f2b 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
@@ -30,6 +30,7 @@
import java.security.DigestOutputStream;
import java.security.MessageDigest;
import java.util.Arrays;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -179,7 +180,11 @@
// load all inodes
LOG.info("Number of files = " + numFiles);
- loadFullNameINodes(numFiles, in);
+ if (imgVersion <= -30) {
+ loadLocalNameINodes(numFiles, in);
+ } else {
+ loadFullNameINodes(numFiles, in);
+ }
// load datanode info
this.loadDatanodes(in);
@@ -215,6 +220,64 @@
fsDir.rootDir.setPermissionStatus(root.getPermissionStatus());
}
+ /**
+ * load fsimage files assuming only local names are stored
+ *
+ * @param numFiles number of files expected to be read
+ * @param in image input stream
+ * @throws IOException
+ */
+ private void loadLocalNameINodes(long numFiles, DataInputStream in)
+ throws IOException {
+ assert imgVersion <= -30; // -30: store only local name in image
+ assert numFiles > 0;
+
+ // load root
+ if( in.readShort() != 0) {
+ throw new IOException("First node is not root");
+ }
+ INode root = loadINode(in);
+ // update the root's attributes
+ updateRootAttr(root);
+ numFiles--;
+
+ // load rest of the nodes directory by directory
+ while (numFiles > 0) {
+ numFiles -= loadDirectory(in);
+ }
+ if (numFiles != 0) {
+ throw new IOException("Read unexpect number of files: " + -numFiles);
+ }
+ }
+
+ /**
+ * Load all children of a directory
+ *
+ * @param in
+ * @return number of child inodes read
+ * @throws IOException
+ */
+ private int loadDirectory(DataInputStream in) throws IOException {
+ String parentPath = FSImageSerialization.readString(in);
+ FSDirectory fsDir = namesystem.dir;
+ INode parent = fsDir.rootDir.getNode(parentPath, true);
+ if (parent == null || !parent.isDirectory()) {
+ throw new IOException("Path " + parentPath + "is not a directory.");
+ }
+
+ int numChildren = in.readInt();
+ for(int i=0; i<numChildren; i++) {
+ // load single inode
+ byte[] localName = new byte[in.readShort()];
+ in.readFully(localName); // read local name
+ INode newNode = loadINode(in); // read rest of inode
+
+ // add to parent
+ namesystem.dir.addToParent(localName, (INodeDirectory)parent, newNode, false);
+ }
+ return numChildren;
+ }
+
/**
* load fsimage files assuming full path names are stored
*
@@ -485,9 +548,10 @@
byte[] byteStore = new byte[4*FSConstants.MAX_PATH_LENGTH];
ByteBuffer strbuf = ByteBuffer.wrap(byteStore);
// save the root
- FSImageSerialization.saveINode2Image(strbuf, fsDir.rootDir, out);
+ FSImageSerialization.saveINode2Image(fsDir.rootDir, out);
// save the rest of the nodes
- saveImage(strbuf, 0, fsDir.rootDir, out);
+ saveImage(strbuf, fsDir.rootDir, out);
+ // save files under construction
sourceNamesystem.saveFilesUnderConstruction(out);
sourceNamesystem.saveSecretManagerState(out);
strbuf = null;
@@ -511,28 +575,33 @@
* This is a recursive procedure, which first saves all children of
* a current directory and then moves inside the sub-directories.
*/
- private static void saveImage(ByteBuffer parentPrefix,
- int prefixLength,
+ private static void saveImage(ByteBuffer currentDirName,
INodeDirectory current,
DataOutputStream out) throws IOException {
- int newPrefixLength = prefixLength;
- if (current.getChildrenRaw() == null)
+ List<INode> children = current.getChildrenRaw();
+ if (children == null || children.isEmpty())
return;
- for(INode child : current.getChildren()) {
- // print all children first
- parentPrefix.position(prefixLength);
- parentPrefix.put(PATH_SEPARATOR).put(child.getLocalNameBytes());
- FSImageSerialization.saveINode2Image(parentPrefix, child, out);
+ // print prefix (parent directory name)
+ int prefixLen = currentDirName.position();
+ if (prefixLen == 0) { // root
+ out.writeShort(PATH_SEPARATOR.length);
+ out.write(PATH_SEPARATOR);
+ } else { // non-root directories
+ out.writeShort(prefixLen);
+ out.write(currentDirName.array(), 0, prefixLen);
}
- for(INode child : current.getChildren()) {
+ out.writeInt(children.size());
+ for(INode child : children) {
+ // print all children first
+ FSImageSerialization.saveINode2Image(child, out);
+ }
+ for(INode child : children) {
if(!child.isDirectory())
continue;
- parentPrefix.position(prefixLength);
- parentPrefix.put(PATH_SEPARATOR).put(child.getLocalNameBytes());
- newPrefixLength = parentPrefix.position();
- saveImage(parentPrefix, newPrefixLength, (INodeDirectory)child, out);
+ currentDirName.put(PATH_SEPARATOR).put(child.getLocalNameBytes());
+ saveImage(currentDirName, (INodeDirectory)child, out);
+ currentDirName.position(prefixLen);
}
- parentPrefix.position(prefixLength);
}
}
}
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
index 14b313e..d993ae6 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
@@ -145,12 +145,11 @@
/*
* Save one inode's attributes to the image.
*/
- static void saveINode2Image(ByteBuffer name,
- INode node,
+ static void saveINode2Image(INode node,
DataOutputStream out) throws IOException {
- int nameLen = name.position();
- out.writeShort(nameLen);
- out.write(name.array(), name.arrayOffset(), nameLen);
+ byte[] name = node.getLocalNameBytes();
+ out.writeShort(name.length);
+ out.write(name);
FsPermission filePerm = TL_DATA.get().FILE_PERM;
if (node.isDirectory()) {
out.writeShort(0); // replication
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 983e937..bac6fcc 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -259,6 +259,8 @@
private FsServerDefaults serverDefaults;
// allow appending to hdfs files
private boolean supportAppends = true;
+ private DataTransferProtocol.ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure =
+ DataTransferProtocol.ReplaceDatanodeOnFailure.DEFAULT;
private volatile SafeModeInfo safeMode; // safe mode information
private Host2NodesMap host2DataNodeMap = new Host2NodesMap();
@@ -529,6 +531,8 @@
+ " blockKeyUpdateInterval=" + blockKeyUpdateInterval / (60 * 1000)
+ " min(s), blockTokenLifetime=" + blockTokenLifetime / (60 * 1000)
+ " min(s)");
+
+ this.dtpReplaceDatanodeOnFailure = DataTransferProtocol.ReplaceDatanodeOnFailure.get(conf);
}
/**
@@ -1329,22 +1333,16 @@
long blockSize) throws SafeModeException, FileAlreadyExistsException,
AccessControlException, UnresolvedLinkException, FileNotFoundException,
ParentNotDirectoryException, IOException {
- writeLock();
- try {
- boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
- boolean append = flag.contains(CreateFlag.APPEND);
- boolean create = flag.contains(CreateFlag.CREATE);
-
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: src=" + src
+ ", holder=" + holder
+ ", clientMachine=" + clientMachine
+ ", createParent=" + createParent
+ ", replication=" + replication
- + ", overwrite=" + overwrite
- + ", append=" + append);
+ + ", createFlag=" + flag.toString());
}
-
+ writeLock();
+ try {
if (isInSafeMode())
throw new SafeModeException("Cannot create file" + src, safeMode);
if (!DFSUtil.isValidName(src)) {
@@ -1354,14 +1352,16 @@
// Verify that the destination does not exist as a directory already.
boolean pathExists = dir.exists(src);
if (pathExists && dir.isDir(src)) {
- throw new FileAlreadyExistsException("Cannot create file "+ src + "; already exists as a directory.");
+ throw new FileAlreadyExistsException("Cannot create file " + src
+ + "; already exists as a directory.");
}
+ boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
+ boolean append = flag.contains(CreateFlag.APPEND);
if (isPermissionEnabled) {
if (append || (overwrite && pathExists)) {
checkPathAccess(src, FsAction.WRITE);
- }
- else {
+ } else {
checkAncestorAccess(src, FsAction.WRITE);
}
}
@@ -1434,34 +1434,27 @@
} catch(IOException e) {
throw new IOException("failed to create "+e.getMessage());
}
- if (append) {
- if (myFile == null) {
- if(!create)
- throw new FileNotFoundException("failed to append to non-existent file "
- + src + " on client " + clientMachine);
- else {
- //append & create a nonexist file equals to overwrite
- return startFileInternal(src, permissions, holder, clientMachine,
- EnumSet.of(CreateFlag.OVERWRITE), createParent, replication, blockSize);
- }
- } else if (myFile.isDirectory()) {
- throw new IOException("failed to append to directory " + src
- +" on client " + clientMachine);
+ boolean create = flag.contains(CreateFlag.CREATE);
+ if (myFile == null) {
+ if (!create) {
+ throw new FileNotFoundException("failed to overwrite or append to non-existent file "
+ + src + " on client " + clientMachine);
}
- } else if (!dir.isValidToCreate(src)) {
+ } else {
+ // File exists - must be one of append or overwrite
if (overwrite) {
delete(src, true);
- } else {
- throw new IOException("failed to create file " + src
- +" on client " + clientMachine
- +" either because the filename is invalid or the file exists");
+ } else if (!append) {
+ throw new FileAlreadyExistsException("failed to create file " + src
+ + " on client " + clientMachine
+ + " because the file exists");
}
}
DatanodeDescriptor clientNode =
host2DataNodeMap.getDatanodeByHost(clientMachine);
- if (append) {
+ if (append && myFile != null) {
//
// Replace current node with a INodeUnderConstruction.
// Recreate in-memory lease record.
@@ -1663,6 +1656,53 @@
return b;
}
+ /** @see NameNode#getAdditionalDatanode(String, ExtendedBlock, DatanodeInfo[], DatanodeInfo[], int, String) */
+ LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
+ final DatanodeInfo[] existings, final HashMap<Node, Node> excludes,
+ final int numAdditionalNodes, final String clientName
+ ) throws IOException {
+ //check if the feature is enabled
+ dtpReplaceDatanodeOnFailure.checkEnabled();
+
+ final DatanodeDescriptor clientnode;
+ final long preferredblocksize;
+ readLock();
+ try {
+ //check safe mode
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot add datanode; src=" + src
+ + ", blk=" + blk, safeMode);
+ }
+
+ //check lease
+ final INodeFileUnderConstruction file = checkLease(src, clientName);
+ clientnode = file.getClientNode();
+ preferredblocksize = file.getPreferredBlockSize();
+ } finally {
+ readUnlock();
+ }
+
+ //find datanode descriptors
+ final List<DatanodeDescriptor> chosen = new ArrayList<DatanodeDescriptor>();
+ for(DatanodeInfo d : existings) {
+ final DatanodeDescriptor descriptor = getDatanode(d);
+ if (descriptor != null) {
+ chosen.add(descriptor);
+ }
+ }
+
+ // choose new datanodes.
+ final DatanodeInfo[] targets = blockManager.replicator.chooseTarget(
+ src, numAdditionalNodes, clientnode, chosen, true,
+ excludes, preferredblocksize);
+ final LocatedBlock lb = new LocatedBlock(blk, targets);
+ if (isBlockTokenEnabled) {
+ lb.setBlockToken(blockTokenSecretManager.generateToken(lb.getBlock(),
+ EnumSet.of(BlockTokenSecretManager.AccessMode.COPY)));
+ }
+ return lb;
+ }
+
/**
* The client would like to let go of the given block
*/
@@ -2691,7 +2731,6 @@
* Get registrationID for datanodes based on the namespaceID.
*
* @see #registerDatanode(DatanodeRegistration)
- * @see FSImage#newNamespaceID()
* @return registration ID
*/
public String getRegistrationID() {
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java b/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
index 3133734..2faa430 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
@@ -233,12 +233,8 @@
String getLocalParentDir() {
- INode p_node=getParent();
-
- if(p_node == null)
- return "/";
- else
- return p_node.getFullPathName();
+ INode inode = isRoot() ? this : getParent();
+ return (inode != null) ? inode.getFullPathName() : "";
}
/**
@@ -271,12 +267,7 @@
/** {@inheritDoc} */
public String toString() {
- String i_path=getFullPathName();
-
- if(i_path.length() == 0)
- i_path="/";
-
- return "\"" + i_path + "\":"
+ return "\"" + getFullPathName() + "\":"
+ getUserName() + ":" + getGroupName() + ":"
+ (isDirectory()? "d": "-") + getFsPermission();
}
@@ -470,7 +461,9 @@
long nsQuota,
long dsQuota,
long preferredBlockSize) {
- if (blocks == null) {
+ if (symlink.length() != 0) { // check if symbolic link
+ return new INodeSymlink(symlink, modificationTime, atime, permissions);
+ } else if (blocks == null) { //not sym link and blocks null? directory!
if (nsQuota >= 0 || dsQuota >= 0) {
return new INodeDirectoryWithQuota(
permissions, modificationTime, nsQuota, dsQuota);
@@ -478,10 +471,6 @@
// regular directory
return new INodeDirectory(permissions, modificationTime);
}
- // check if symbolic link
- if (symlink.length() != 0) {
- return new INodeSymlink(symlink, modificationTime, atime, permissions);
- }
// file
return new INodeFile(permissions, blocks, replication,
modificationTime, atime, preferredBlockSize);
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index 12f416c..63ed01a 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -17,13 +17,12 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
@@ -846,6 +845,33 @@
return locatedBlock;
}
+ @Override
+ public LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
+ final DatanodeInfo[] existings, final DatanodeInfo[] excludes,
+ final int numAdditionalNodes, final String clientName
+ ) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getAdditionalDatanode: src=" + src
+ + ", blk=" + blk
+ + ", existings=" + Arrays.asList(existings)
+ + ", excludes=" + Arrays.asList(excludes)
+ + ", numAdditionalNodes=" + numAdditionalNodes
+ + ", clientName=" + clientName);
+ }
+
+ myMetrics.numGetAdditionalDatanodeOps.inc();
+
+ HashMap<Node, Node> excludeSet = null;
+ if (excludes != null) {
+ excludeSet = new HashMap<Node, Node>(excludes.length);
+ for (Node node : excludes) {
+ excludeSet.put(node, node);
+ }
+ }
+ return namesystem.getAdditionalDatanode(src, blk,
+ existings, excludeSet, numAdditionalNodes, clientName);
+ }
+
/**
* The client needs to give up on the block.
*/
@@ -1202,7 +1228,7 @@
}
final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
namesystem.createSymlink(target, link,
- new PermissionStatus(ugi.getUserName(), null, dirPerms), createParent);
+ new PermissionStatus(ugi.getShortUserName(), null, dirPerms), createParent);
}
/** @inheritDoc */
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java b/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
index 2b589d2..259615c 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
@@ -72,6 +72,8 @@
new MetricsTimeVaryingInt("FileInfoOps", registry);
public MetricsTimeVaryingInt numAddBlockOps =
new MetricsTimeVaryingInt("AddBlockOps", registry);
+ public final MetricsTimeVaryingInt numGetAdditionalDatanodeOps
+ = new MetricsTimeVaryingInt("GetAdditionalDatanodeOps", registry);
public MetricsTimeVaryingInt numcreateSymlinkOps =
new MetricsTimeVaryingInt("CreateSymlinkOps", registry);
public MetricsTimeVaryingInt numgetLinkTargetOps =
diff --git a/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
index b7f7fb4..4c3d0e0 100644
--- a/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
+++ b/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
@@ -269,13 +269,21 @@
super(conf);
}
+ protected DistributedFileSystem getDFS() throws IOException {
+ FileSystem fs = getFS();
+ if (!(fs instanceof DistributedFileSystem)) {
+ throw new IllegalArgumentException("FileSystem " + fs.getUri() +
+ " is not a distributed file system");
+ }
+ return (DistributedFileSystem)fs;
+ }
+
/**
* Gives a report on how the FileSystem is doing.
* @exception IOException if the filesystem does not exist.
*/
public void report() throws IOException {
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ DistributedFileSystem dfs = getDFS();
FsStatus ds = dfs.getStatus();
long capacity = ds.getCapacity();
long used = ds.getUsed();
@@ -342,7 +350,6 @@
System.out.println();
}
}
- }
}
/**
@@ -353,10 +360,6 @@
* @exception IOException if the filesystem does not exist.
*/
public void setSafeMode(String[] argv, int idx) throws IOException {
- if (!(fs instanceof DistributedFileSystem)) {
- System.err.println("FileSystem is " + fs.getUri());
- return;
- }
if (idx != argv.length - 1) {
printUsage("-safemode");
return;
@@ -377,7 +380,7 @@
printUsage("-safemode");
return;
}
- DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ DistributedFileSystem dfs = getDFS();
boolean inSafeMode = dfs.setSafeMode(action);
//
@@ -407,12 +410,7 @@
public int saveNamespace() throws IOException {
int exitCode = -1;
- if (!(fs instanceof DistributedFileSystem)) {
- System.err.println("FileSystem is " + fs.getUri());
- return exitCode;
- }
-
- DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ DistributedFileSystem dfs = getDFS();
dfs.saveNamespace();
exitCode = 0;
@@ -428,17 +426,12 @@
public int restoreFaileStorage(String arg) throws IOException {
int exitCode = -1;
- if (!(fs instanceof DistributedFileSystem)) {
- System.err.println("FileSystem is " + fs.getUri());
- return exitCode;
- }
-
if(!arg.equals("check") && !arg.equals("true") && !arg.equals("false")) {
System.err.println("restoreFailedStorage valid args are true|false|check");
return exitCode;
}
- DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ DistributedFileSystem dfs = getDFS();
Boolean res = dfs.restoreFailedStorage(arg);
System.out.println("restoreFailedStorage is set to " + res);
exitCode = 0;
@@ -455,12 +448,7 @@
public int refreshNodes() throws IOException {
int exitCode = -1;
- if (!(fs instanceof DistributedFileSystem)) {
- System.err.println("FileSystem is " + fs.getUri());
- return exitCode;
- }
-
- DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ DistributedFileSystem dfs = getDFS();
dfs.refreshNodes();
exitCode = 0;
@@ -636,18 +624,10 @@
* @exception IOException
*/
public int finalizeUpgrade() throws IOException {
- int exitCode = -1;
-
- if (!(fs instanceof DistributedFileSystem)) {
- System.out.println("FileSystem is " + fs.getUri());
- return exitCode;
- }
-
- DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ DistributedFileSystem dfs = getDFS();
dfs.finalizeUpgrade();
- exitCode = 0;
-
- return exitCode;
+
+ return 0;
}
/**
@@ -658,10 +638,7 @@
* @exception IOException
*/
public int upgradeProgress(String[] argv, int idx) throws IOException {
- if (!(fs instanceof DistributedFileSystem)) {
- System.out.println("FileSystem is " + fs.getUri());
- return -1;
- }
+
if (idx != argv.length - 1) {
printUsage("-upgradeProgress");
return -1;
@@ -679,7 +656,7 @@
return -1;
}
- DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ DistributedFileSystem dfs = getDFS();
UpgradeStatusReport status = dfs.distributedUpgradeProgress(action);
String statusText = (status == null ?
"There are no upgrades in progress." :
@@ -698,7 +675,7 @@
*/
public int metaSave(String[] argv, int idx) throws IOException {
String pathname = argv[idx];
- DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ DistributedFileSystem dfs = getDFS();
dfs.metaSave(pathname);
System.out.println("Created file " + pathname + " on server " +
dfs.getUri());
@@ -713,8 +690,7 @@
* @throws IOException If an error while getting datanode report
*/
public int printTopology() throws IOException {
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem dfs = (DistributedFileSystem)fs;
+ DistributedFileSystem dfs = getDFS();
DFSClient client = dfs.getClient();
DatanodeInfo[] report = client.datanodeReport(DatanodeReportType.ALL);
@@ -749,7 +725,6 @@
System.out.println();
}
- }
return 0;
}
@@ -1052,13 +1027,13 @@
} else if ("-metasave".equals(cmd)) {
exitCode = metaSave(argv, i);
} else if (ClearQuotaCommand.matches(cmd)) {
- exitCode = new ClearQuotaCommand(argv, i, fs).runAll();
+ exitCode = new ClearQuotaCommand(argv, i, getDFS()).runAll();
} else if (SetQuotaCommand.matches(cmd)) {
- exitCode = new SetQuotaCommand(argv, i, fs).runAll();
+ exitCode = new SetQuotaCommand(argv, i, getDFS()).runAll();
} else if (ClearSpaceQuotaCommand.matches(cmd)) {
- exitCode = new ClearSpaceQuotaCommand(argv, i, fs).runAll();
+ exitCode = new ClearSpaceQuotaCommand(argv, i, getDFS()).runAll();
} else if (SetSpaceQuotaCommand.matches(cmd)) {
- exitCode = new SetSpaceQuotaCommand(argv, i, fs).runAll();
+ exitCode = new SetSpaceQuotaCommand(argv, i, getDFS()).runAll();
} else if ("-refreshServiceAcl".equals(cmd)) {
exitCode = refreshServiceAcl();
} else if ("-refreshUserToGroupsMappings".equals(cmd)) {
diff --git a/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsElement.java b/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsElement.java
index 3003649..80e2927 100644
--- a/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsElement.java
+++ b/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsElement.java
@@ -80,5 +80,6 @@
KEY_ID,
KEY_EXPIRY_DATE,
KEY_LENGTH,
- KEY_BLOB
+ KEY_BLOB,
+ CHECKSUM
}
diff --git a/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java b/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java
index bef2532..b98e68d 100644
--- a/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java
+++ b/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java
@@ -17,26 +17,15 @@
*/
package org.apache.hadoop.hdfs.tools.offlineEditsViewer;
-import java.io.DataInputStream;
import java.io.IOException;
-import java.io.EOFException;
-
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.ByteToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.ShortToken;
import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.IntToken;
import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.VIntToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.LongToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.VLongToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.StringUTF8Token;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.StringTextToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.BlobToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.BytesWritableToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.EmptyToken;
/**
* EditsLoaderCurrent processes Hadoop EditLogs files and walks over
@@ -49,7 +38,7 @@
class EditsLoaderCurrent implements EditsLoader {
private static int [] supportedVersions = {
- -18, -19, -20, -21, -22, -23, -24, -25, -26, -27 };
+ -18, -19, -20, -21, -22, -23, -24, -25, -26, -27, -28, -30, -31 };
private EditsVisitor v;
private int editsVersion = 0;
@@ -464,6 +453,10 @@
visitOpCode(editsOpCode);
v.leaveEnclosingElement(); // DATA
+
+ if (editsOpCode != FSEditLogOpCodes.OP_INVALID && editsVersion <= -28) {
+ v.visitInt(EditsElement.CHECKSUM);
+ }
v.leaveEnclosingElement(); // RECORD
} while(editsOpCode != FSEditLogOpCodes.OP_INVALID);
diff --git a/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java b/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
index c660514..64f8329 100644
--- a/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
+++ b/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
@@ -27,7 +27,6 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
import org.apache.hadoop.hdfs.tools.offlineImageViewer.ImageVisitor.ImageElement;
import org.apache.hadoop.io.Text;
@@ -121,7 +120,7 @@
protected final DateFormat dateFormat =
new SimpleDateFormat("yyyy-MM-dd HH:mm");
private static int [] versions =
- {-16, -17, -18, -19, -20, -21, -22, -23, -24, -25, -26, -27};
+ {-16, -17, -18, -19, -20, -21, -22, -23, -24, -25, -26, -27, -28, -30, -31};
private int imageVersion = 0;
/* (non-Javadoc)
@@ -334,35 +333,106 @@
long numInodes, boolean skipBlocks) throws IOException {
v.visitEnclosingElement(ImageElement.INODES,
ImageElement.NUM_INODES, numInodes);
-
- for(long i = 0; i < numInodes; i++) {
- v.visitEnclosingElement(ImageElement.INODE);
- v.visit(ImageElement.INODE_PATH, FSImageSerialization.readString(in));
- v.visit(ImageElement.REPLICATION, in.readShort());
- v.visit(ImageElement.MODIFICATION_TIME, formatDate(in.readLong()));
- if(imageVersion <= -17) // added in version -17
- v.visit(ImageElement.ACCESS_TIME, formatDate(in.readLong()));
- v.visit(ImageElement.BLOCK_SIZE, in.readLong());
- int numBlocks = in.readInt();
-
- processBlocks(in, v, numBlocks, skipBlocks);
-
- // File or directory
- if (numBlocks > 0 || numBlocks == -1) {
- v.visit(ImageElement.NS_QUOTA, numBlocks == -1 ? in.readLong() : -1);
- if(imageVersion <= -18) // added in version -18
- v.visit(ImageElement.DS_QUOTA, numBlocks == -1 ? in.readLong() : -1);
- }
- if (imageVersion <= -23 && numBlocks == -2) {
- v.visit(ImageElement.SYMLINK, Text.readString(in));
- }
-
- processPermission(in, v);
- v.leaveEnclosingElement(); // INode
+
+ if (imageVersion <= -30) { // local file name
+ processLocalNameINodes(in, v, numInodes, skipBlocks);
+ } else { // full path name
+ processFullNameINodes(in, v, numInodes, skipBlocks);
}
+
v.leaveEnclosingElement(); // INodes
}
+
+ /**
+ * Process image with full path name
+ *
+ * @param in image stream
+ * @param v visitor
+ * @param numInodes number of indoes to read
+ * @param skipBlocks skip blocks or not
+ * @throws IOException if there is any error occurs
+ */
+ private void processLocalNameINodes(DataInputStream in, ImageVisitor v,
+ long numInodes, boolean skipBlocks) throws IOException {
+ // process root
+ processINode(in, v, skipBlocks, "");
+ numInodes--;
+ while (numInodes > 0) {
+ numInodes -= processDirectory(in, v, skipBlocks);
+ }
+ }
+
+ private int processDirectory(DataInputStream in, ImageVisitor v,
+ boolean skipBlocks) throws IOException {
+ String parentName = FSImageSerialization.readString(in);
+ int numChildren = in.readInt();
+ for (int i=0; i<numChildren; i++) {
+ processINode(in, v, skipBlocks, parentName);
+ }
+ return numChildren;
+ }
+
+ /**
+ * Process image with full path name
+ *
+ * @param in image stream
+ * @param v visitor
+ * @param numInodes number of indoes to read
+ * @param skipBlocks skip blocks or not
+ * @throws IOException if there is any error occurs
+ */
+ private void processFullNameINodes(DataInputStream in, ImageVisitor v,
+ long numInodes, boolean skipBlocks) throws IOException {
+ for(long i = 0; i < numInodes; i++) {
+ processINode(in, v, skipBlocks, null);
+ }
+ }
+
+ /**
+ * Process an INode
+ *
+ * @param in image stream
+ * @param v visitor
+ * @param skipBlocks skip blocks or not
+ * @param parentName the name of its parent node
+ * @return the number of Children
+ * @throws IOException
+ */
+ private void processINode(DataInputStream in, ImageVisitor v,
+ boolean skipBlocks, String parentName) throws IOException {
+ v.visitEnclosingElement(ImageElement.INODE);
+ String pathName = FSImageSerialization.readString(in);
+ if (parentName != null) { // local name
+ pathName = "/" + pathName;
+ if (!"/".equals(parentName)) { // children of non-root directory
+ pathName = parentName + pathName;
+ }
+ }
+
+ v.visit(ImageElement.INODE_PATH, pathName);
+ v.visit(ImageElement.REPLICATION, in.readShort());
+ v.visit(ImageElement.MODIFICATION_TIME, formatDate(in.readLong()));
+ if(imageVersion <= -17) // added in version -17
+ v.visit(ImageElement.ACCESS_TIME, formatDate(in.readLong()));
+ v.visit(ImageElement.BLOCK_SIZE, in.readLong());
+ int numBlocks = in.readInt();
+
+ processBlocks(in, v, numBlocks, skipBlocks);
+
+ // File or directory
+ if (numBlocks > 0 || numBlocks == -1) {
+ v.visit(ImageElement.NS_QUOTA, numBlocks == -1 ? in.readLong() : -1);
+ if(imageVersion <= -18) // added in version -18
+ v.visit(ImageElement.DS_QUOTA, numBlocks == -1 ? in.readLong() : -1);
+ }
+ if (imageVersion <= -23 && numBlocks == -2) {
+ v.visit(ImageElement.SYMLINK, Text.readString(in));
+ }
+
+ processPermission(in, v);
+ v.leaveEnclosingElement(); // INode
+ }
/**
* Helper method to format dates during processing.
diff --git a/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java b/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java
index 898cabf..de8be73 100644
--- a/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java
+++ b/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java
@@ -343,7 +343,7 @@
if (!test.isSuccess() && p.contains(index, id)) {
FiTestUtil.LOG.info(toString(id));
if (maxDuration <= 0) {
- for(; true; FiTestUtil.sleep(1000)); //sleep forever
+ for(; FiTestUtil.sleep(1000); ); //sleep forever until interrupt
} else {
FiTestUtil.sleep(minDuration, maxDuration);
}
@@ -391,7 +391,7 @@
+ minDuration + "," + maxDuration + ")";
FiTestUtil.LOG.info(s);
if (maxDuration <= 1) {
- for(; true; FiTestUtil.sleep(1000)); //sleep forever
+ for(; FiTestUtil.sleep(1000); ); //sleep forever until interrupt
} else {
FiTestUtil.sleep(minDuration, maxDuration);
}
diff --git a/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java b/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java
index 3f608a4..396f5fe 100644
--- a/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java
+++ b/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java
@@ -73,14 +73,17 @@
/**
* Sleep.
- * If there is an InterruptedException, re-throw it as a RuntimeException.
+ * @return true if sleep exits normally; false if InterruptedException.
*/
- public static void sleep(long ms) {
+ public static boolean sleep(long ms) {
+ LOG.info("Sleep " + ms + " ms");
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
- throw new RuntimeException(e);
+ LOG.info("Sleep is interrupted", e);
+ return false;
}
+ return true;
}
/**
diff --git a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
index 2d68aac..73062de 100644
--- a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
+++ b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
@@ -45,6 +45,11 @@
privileged public aspect BlockReceiverAspects {
public static final Log LOG = LogFactory.getLog(BlockReceiverAspects.class);
+ BlockReceiver BlockReceiver.PacketResponder.getReceiver(){
+ LOG.info("FI: getReceiver() " + getClass().getName());
+ return BlockReceiver.this;
+ }
+
pointcut callReceivePacket(BlockReceiver blockreceiver) :
call(* receivePacket(..)) && target(blockreceiver);
@@ -82,7 +87,7 @@
after(BlockReceiver.PacketResponder responder)
throws IOException: afterDownstreamStatusRead(responder) {
- final DataNode d = responder.receiver.getDataNode();
+ final DataNode d = responder.getReceiver().getDataNode();
DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
if (dtTest != null)
dtTest.fiAfterDownstreamStatusRead.run(d.getDatanodeId());
diff --git a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
index 434e0cc..ed9de6d 100644
--- a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
+++ b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
@@ -22,18 +22,13 @@
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fi.DataTransferTestUtil;
-import org.apache.hadoop.fi.FiTestUtil;
-import org.apache.hadoop.fi.DataTransferTestUtil.DataNodeAction;
import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
-import org.apache.hadoop.fi.DataTransferTestUtil.DatanodeMarkingAction;
import org.apache.hadoop.fi.DataTransferTestUtil.DoosAction;
-import org.apache.hadoop.fi.DataTransferTestUtil.IoeAction;
import org.apache.hadoop.fi.DataTransferTestUtil.OomAction;
import org.apache.hadoop.fi.DataTransferTestUtil.SleepAction;
import org.apache.hadoop.fi.DataTransferTestUtil.VerificationAction;
+import org.apache.hadoop.fi.FiTestUtil;
import org.apache.hadoop.fi.FiTestUtil.Action;
-import org.apache.hadoop.fi.FiTestUtil.ConstraintSatisfactionAction;
-import org.apache.hadoop.fi.FiTestUtil.MarkerConstraint;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -41,7 +36,9 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;
@@ -63,6 +60,10 @@
REPLICATION, BLOCKSIZE);
}
+ {
+ ((Log4JLogger)DataTransferProtocol.LOG).getLogger().setLevel(Level.ALL);
+ }
+
/**
* 1. create files with dfs
* 2. write 1 byte
@@ -70,9 +71,9 @@
* 4. open the same file
* 5. read the 1 byte and compare results
*/
- private static void write1byte(String methodName) throws IOException {
- final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(REPLICATION).format(true).build();
+ static void write1byte(String methodName) throws IOException {
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf
+ ).numDataNodes(REPLICATION + 1).build();
final FileSystem dfs = cluster.getFileSystem();
try {
final Path p = new Path("/" + methodName + "/foo");
@@ -305,184 +306,4 @@
final String methodName = FiTestUtil.getMethodName();
runCallWritePacketToDisk(methodName, 2, new DoosAction(methodName, 2));
}
-
- private static void runPipelineCloseTest(String methodName,
- Action<DatanodeID, IOException> a) throws IOException {
- FiTestUtil.LOG.info("Running " + methodName + " ...");
- final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
- .initTest();
- t.fiPipelineClose.set(a);
- write1byte(methodName);
- }
-
- private static void run41_43(String name, int i) throws IOException {
- runPipelineCloseTest(name, new SleepAction(name, i, 3000));
- }
-
- private static void runPipelineCloseAck(String name, int i, DataNodeAction a
- ) throws IOException {
- FiTestUtil.LOG.info("Running " + name + " ...");
- final DataTransferTest t = (DataTransferTest)DataTransferTestUtil.initTest();
- final MarkerConstraint marker = new MarkerConstraint(name);
- t.fiPipelineClose.set(new DatanodeMarkingAction(name, i, marker));
- t.fiPipelineAck.set(new ConstraintSatisfactionAction<DatanodeID, IOException>(a, marker));
- write1byte(name);
- }
-
- private static void run39_40(String name, int i) throws IOException {
- runPipelineCloseAck(name, i, new SleepAction(name, i, 0));
- }
-
- /**
- * Pipeline close:
- * DN1 never responses after received close ack DN2.
- * Client gets an IOException and determine DN1 bad.
- */
- @Test
- public void pipeline_Fi_39() throws IOException {
- run39_40(FiTestUtil.getMethodName(), 1);
- }
-
- /**
- * Pipeline close:
- * DN0 never responses after received close ack DN1.
- * Client gets an IOException and determine DN0 bad.
- */
- @Test
- public void pipeline_Fi_40() throws IOException {
- run39_40(FiTestUtil.getMethodName(), 0);
- }
-
- /**
- * Pipeline close with DN0 very slow but it won't lead to timeout.
- * Client finishes close successfully.
- */
- @Test
- public void pipeline_Fi_41() throws IOException {
- run41_43(FiTestUtil.getMethodName(), 0);
- }
-
- /**
- * Pipeline close with DN1 very slow but it won't lead to timeout.
- * Client finishes close successfully.
- */
- @Test
- public void pipeline_Fi_42() throws IOException {
- run41_43(FiTestUtil.getMethodName(), 1);
- }
-
- /**
- * Pipeline close with DN2 very slow but it won't lead to timeout.
- * Client finishes close successfully.
- */
- @Test
- public void pipeline_Fi_43() throws IOException {
- run41_43(FiTestUtil.getMethodName(), 2);
- }
-
- /**
- * Pipeline close:
- * DN0 throws an OutOfMemoryException
- * right after it received a close request from client.
- * Client gets an IOException and determine DN0 bad.
- */
- @Test
- public void pipeline_Fi_44() throws IOException {
- final String methodName = FiTestUtil.getMethodName();
- runPipelineCloseTest(methodName, new OomAction(methodName, 0));
- }
-
- /**
- * Pipeline close:
- * DN1 throws an OutOfMemoryException
- * right after it received a close request from client.
- * Client gets an IOException and determine DN1 bad.
- */
- @Test
- public void pipeline_Fi_45() throws IOException {
- final String methodName = FiTestUtil.getMethodName();
- runPipelineCloseTest(methodName, new OomAction(methodName, 1));
- }
-
- /**
- * Pipeline close:
- * DN2 throws an OutOfMemoryException
- * right after it received a close request from client.
- * Client gets an IOException and determine DN2 bad.
- */
- @Test
- public void pipeline_Fi_46() throws IOException {
- final String methodName = FiTestUtil.getMethodName();
- runPipelineCloseTest(methodName, new OomAction(methodName, 2));
- }
-
- private static void run47_48(String name, int i) throws IOException {
- runPipelineCloseAck(name, i, new OomAction(name, i));
- }
-
- /**
- * Pipeline close:
- * DN1 throws an OutOfMemoryException right after
- * it received a close ack from DN2.
- * Client gets an IOException and determine DN1 bad.
- */
- @Test
- public void pipeline_Fi_47() throws IOException {
- run47_48(FiTestUtil.getMethodName(), 1);
- }
-
- /**
- * Pipeline close:
- * DN0 throws an OutOfMemoryException right after
- * it received a close ack from DN1.
- * Client gets an IOException and determine DN0 bad.
- */
- @Test
- public void pipeline_Fi_48() throws IOException {
- run47_48(FiTestUtil.getMethodName(), 0);
- }
-
- private static void runBlockFileCloseTest(String methodName,
- Action<DatanodeID, IOException> a) throws IOException {
- FiTestUtil.LOG.info("Running " + methodName + " ...");
- final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
- .initTest();
- t.fiBlockFileClose.set(a);
- write1byte(methodName);
- }
-
- private static void run49_51(String name, int i) throws IOException {
- runBlockFileCloseTest(name, new IoeAction(name, i, "DISK ERROR"));
- }
-
- /**
- * Pipeline close:
- * DN0 throws a disk error exception when it is closing the block file.
- * Client gets an IOException and determine DN0 bad.
- */
- @Test
- public void pipeline_Fi_49() throws IOException {
- run49_51(FiTestUtil.getMethodName(), 0);
- }
-
-
- /**
- * Pipeline close:
- * DN1 throws a disk error exception when it is closing the block file.
- * Client gets an IOException and determine DN1 bad.
- */
- @Test
- public void pipeline_Fi_50() throws IOException {
- run49_51(FiTestUtil.getMethodName(), 1);
- }
-
- /**
- * Pipeline close:
- * DN2 throws a disk error exception when it is closing the block file.
- * Client gets an IOException and determine DN2 bad.
- */
- @Test
- public void pipeline_Fi_51() throws IOException {
- run49_51(FiTestUtil.getMethodName(), 2);
- }
-}
\ No newline at end of file
+}
diff --git a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java
index 46caea4..d2868b7 100644
--- a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java
+++ b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java
@@ -23,13 +23,13 @@
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fi.DataTransferTestUtil;
-import org.apache.hadoop.fi.FiTestUtil;
import org.apache.hadoop.fi.DataTransferTestUtil.CountdownDoosAction;
import org.apache.hadoop.fi.DataTransferTestUtil.CountdownOomAction;
import org.apache.hadoop.fi.DataTransferTestUtil.CountdownSleepAction;
import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
import org.apache.hadoop.fi.DataTransferTestUtil.SleepAction;
import org.apache.hadoop.fi.DataTransferTestUtil.VerificationAction;
+import org.apache.hadoop.fi.FiTestUtil;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -37,9 +37,8 @@
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.server.datanode.BlockReceiver;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.log4j.Level;
-
import org.junit.Assert;
import org.junit.Test;
@@ -71,6 +70,7 @@
{
((Log4JLogger) BlockReceiver.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)DataTransferProtocol.LOG).getLogger().setLevel(Level.ALL);
}
/**
* 1. create files with dfs
@@ -88,8 +88,8 @@
FiTestUtil.LOG.info("size=" + size + ", nPackets=" + nPackets
+ ", lastPacketSize=" + lastPacketSize);
- final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(REPLICATION).build();
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf
+ ).numDataNodes(REPLICATION + 1).build();
final FileSystem dfs = cluster.getFileSystem();
try {
final Path p = new Path("/" + methodName + "/foo");
diff --git a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java
index 0f39595..1468222 100644
--- a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java
+++ b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java
@@ -19,76 +19,29 @@
import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fi.DataTransferTestUtil;
-import org.apache.hadoop.fi.FiTestUtil;
+import org.apache.hadoop.fi.DataTransferTestUtil.DataNodeAction;
import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
+import org.apache.hadoop.fi.DataTransferTestUtil.DatanodeMarkingAction;
+import org.apache.hadoop.fi.DataTransferTestUtil.IoeAction;
+import org.apache.hadoop.fi.DataTransferTestUtil.OomAction;
import org.apache.hadoop.fi.DataTransferTestUtil.SleepAction;
+import org.apache.hadoop.fi.FiTestUtil;
import org.apache.hadoop.fi.FiTestUtil.Action;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.fi.FiTestUtil.ConstraintSatisfactionAction;
+import org.apache.hadoop.fi.FiTestUtil.MarkerConstraint;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.junit.Assert;
import org.junit.Test;
/** Test DataTransferProtocol with fault injection. */
public class TestFiPipelineClose {
- static final short REPLICATION = 3;
- static final long BLOCKSIZE = 1L * (1L << 20);
-
- static final Configuration conf = new HdfsConfiguration();
- static {
- conf.setInt(DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY, 1);
- conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, REPLICATION);
- conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000);
- }
-
- static private FSDataOutputStream createFile(FileSystem fs, Path p
- ) throws IOException {
- return fs.create(p, true, fs.getConf().getInt("io.file.buffer.size", 4096),
- REPLICATION, BLOCKSIZE);
- }
-
- /**
- * 1. create files with dfs
- * 2. write 1 byte
- * 3. close file
- * 4. open the same file
- * 5. read the 1 byte and compare results
- */
- private static void write1byte(String methodName) throws IOException {
- final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(REPLICATION).format(true).build();
- final FileSystem dfs = cluster.getFileSystem();
- try {
- final Path p = new Path("/" + methodName + "/foo");
- final FSDataOutputStream out = createFile(dfs, p);
- out.write(1);
- out.close();
-
- final FSDataInputStream in = dfs.open(p);
- final int b = in.read();
- in.close();
- Assert.assertEquals(1, b);
- }
- finally {
- dfs.close();
- cluster.shutdown();
- }
- }
-
- private static void runPipelineCloseTest(String methodName,
+ private static void runPipelineCloseTest(String methodName,
Action<DatanodeID, IOException> a) throws IOException {
FiTestUtil.LOG.info("Running " + methodName + " ...");
final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
.initTest();
t.fiPipelineClose.set(a);
- write1byte(methodName);
+ TestFiDataTransferProtocol.write1byte(methodName);
}
/**
@@ -123,4 +76,175 @@
final String methodName = FiTestUtil.getMethodName();
runPipelineCloseTest(methodName, new SleepAction(methodName, 2, 0));
}
+
+ private static void run41_43(String name, int i) throws IOException {
+ runPipelineCloseTest(name, new SleepAction(name, i, 3000));
+ }
+
+ private static void runPipelineCloseAck(String name, int i, DataNodeAction a
+ ) throws IOException {
+ FiTestUtil.LOG.info("Running " + name + " ...");
+ final DataTransferTest t = (DataTransferTest)DataTransferTestUtil.initTest();
+ final MarkerConstraint marker = new MarkerConstraint(name);
+ t.fiPipelineClose.set(new DatanodeMarkingAction(name, i, marker));
+ t.fiPipelineAck.set(new ConstraintSatisfactionAction<DatanodeID, IOException>(a, marker));
+ TestFiDataTransferProtocol.write1byte(name);
+ }
+
+ private static void run39_40(String name, int i) throws IOException {
+ runPipelineCloseAck(name, i, new SleepAction(name, i, 0));
+ }
+
+ /**
+ * Pipeline close:
+ * DN1 never responses after received close ack DN2.
+ * Client gets an IOException and determine DN1 bad.
+ */
+ @Test
+ public void pipeline_Fi_39() throws IOException {
+ run39_40(FiTestUtil.getMethodName(), 1);
+ }
+
+ /**
+ * Pipeline close:
+ * DN0 never responses after received close ack DN1.
+ * Client gets an IOException and determine DN0 bad.
+ */
+ @Test
+ public void pipeline_Fi_40() throws IOException {
+ run39_40(FiTestUtil.getMethodName(), 0);
+ }
+
+ /**
+ * Pipeline close with DN0 very slow but it won't lead to timeout.
+ * Client finishes close successfully.
+ */
+ @Test
+ public void pipeline_Fi_41() throws IOException {
+ run41_43(FiTestUtil.getMethodName(), 0);
+ }
+
+ /**
+ * Pipeline close with DN1 very slow but it won't lead to timeout.
+ * Client finishes close successfully.
+ */
+ @Test
+ public void pipeline_Fi_42() throws IOException {
+ run41_43(FiTestUtil.getMethodName(), 1);
+ }
+
+ /**
+ * Pipeline close with DN2 very slow but it won't lead to timeout.
+ * Client finishes close successfully.
+ */
+ @Test
+ public void pipeline_Fi_43() throws IOException {
+ run41_43(FiTestUtil.getMethodName(), 2);
+ }
+
+ /**
+ * Pipeline close:
+ * DN0 throws an OutOfMemoryException
+ * right after it received a close request from client.
+ * Client gets an IOException and determine DN0 bad.
+ */
+ @Test
+ public void pipeline_Fi_44() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ runPipelineCloseTest(methodName, new OomAction(methodName, 0));
+ }
+
+ /**
+ * Pipeline close:
+ * DN1 throws an OutOfMemoryException
+ * right after it received a close request from client.
+ * Client gets an IOException and determine DN1 bad.
+ */
+ @Test
+ public void pipeline_Fi_45() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ runPipelineCloseTest(methodName, new OomAction(methodName, 1));
+ }
+
+ /**
+ * Pipeline close:
+ * DN2 throws an OutOfMemoryException
+ * right after it received a close request from client.
+ * Client gets an IOException and determine DN2 bad.
+ */
+ @Test
+ public void pipeline_Fi_46() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ runPipelineCloseTest(methodName, new OomAction(methodName, 2));
+ }
+
+ private static void run47_48(String name, int i) throws IOException {
+ runPipelineCloseAck(name, i, new OomAction(name, i));
+ }
+
+ /**
+ * Pipeline close:
+ * DN1 throws an OutOfMemoryException right after
+ * it received a close ack from DN2.
+ * Client gets an IOException and determine DN1 bad.
+ */
+ @Test
+ public void pipeline_Fi_47() throws IOException {
+ run47_48(FiTestUtil.getMethodName(), 1);
+ }
+
+ /**
+ * Pipeline close:
+ * DN0 throws an OutOfMemoryException right after
+ * it received a close ack from DN1.
+ * Client gets an IOException and determine DN0 bad.
+ */
+ @Test
+ public void pipeline_Fi_48() throws IOException {
+ run47_48(FiTestUtil.getMethodName(), 0);
+ }
+
+ private static void runBlockFileCloseTest(String methodName,
+ Action<DatanodeID, IOException> a) throws IOException {
+ FiTestUtil.LOG.info("Running " + methodName + " ...");
+ final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+ .initTest();
+ t.fiBlockFileClose.set(a);
+ TestFiDataTransferProtocol.write1byte(methodName);
+ }
+
+ private static void run49_51(String name, int i) throws IOException {
+ runBlockFileCloseTest(name, new IoeAction(name, i, "DISK ERROR"));
+ }
+
+ /**
+ * Pipeline close:
+ * DN0 throws a disk error exception when it is closing the block file.
+ * Client gets an IOException and determine DN0 bad.
+ */
+ @Test
+ public void pipeline_Fi_49() throws IOException {
+ run49_51(FiTestUtil.getMethodName(), 0);
+ }
+
+
+ /**
+ * Pipeline close:
+ * DN1 throws a disk error exception when it is closing the block file.
+ * Client gets an IOException and determine DN1 bad.
+ */
+ @Test
+ public void pipeline_Fi_50() throws IOException {
+ run49_51(FiTestUtil.getMethodName(), 1);
+ }
+
+ /**
+ * Pipeline close:
+ * DN2 throws a disk error exception when it is closing the block file.
+ * Client gets an IOException and determine DN2 bad.
+ */
+ @Test
+ public void pipeline_Fi_51() throws IOException {
+ run49_51(FiTestUtil.getMethodName(), 2);
+ }
}
diff --git a/src/test/hdfs/org/apache/hadoop/cli/CLITestCmdDFS.java b/src/test/hdfs/org/apache/hadoop/cli/CLITestCmdDFS.java
new file mode 100644
index 0000000..ece2261
--- /dev/null
+++ b/src/test/hdfs/org/apache/hadoop/cli/CLITestCmdDFS.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cli;
+
+import org.apache.hadoop.cli.util.*;
+import org.apache.hadoop.hdfs.tools.DFSAdmin;
+
+public class CLITestCmdDFS extends CLITestCmd {
+ public CLITestCmdDFS(String str, CLICommandTypes type) {
+ super(str, type);
+ }
+
+ @Override
+ public CommandExecutor getExecutor(String tag) throws IllegalArgumentException {
+ if (getType() instanceof CLICommandDFSAdmin)
+ return new FSCmdExecutor(tag, new DFSAdmin());
+ return super.getExecutor(tag);
+ }
+}
diff --git a/src/test/hdfs/org/apache/hadoop/cli/CLITestHelperDFS.java b/src/test/hdfs/org/apache/hadoop/cli/CLITestHelperDFS.java
new file mode 100644
index 0000000..fc3567e
--- /dev/null
+++ b/src/test/hdfs/org/apache/hadoop/cli/CLITestHelperDFS.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cli;
+
+import org.apache.hadoop.cli.util.CLICommandDFSAdmin;
+import org.apache.hadoop.cli.util.CLITestCmd;
+import org.xml.sax.SAXException;
+
+public class CLITestHelperDFS extends CLITestHelper {
+
+ @Override
+ protected TestConfigFileParser getConfigParser() {
+ return new TestConfigFileParserDFS();
+ }
+
+ class TestConfigFileParserDFS extends CLITestHelper.TestConfigFileParser {
+ @Override
+ public void endElement(String uri, String localName, String qName)
+ throws SAXException {
+ if (qName.equals("dfs-admin-command")) {
+ if (testCommands != null) {
+ testCommands.add(new CLITestCmdDFS(charString,
+ new CLICommandDFSAdmin()));
+ } else if (cleanupCommands != null) {
+ cleanupCommands.add(new CLITestCmdDFS(charString,
+ new CLICommandDFSAdmin()));
+ }
+ } else {
+ super.endElement(uri, localName, qName);
+ }
+ }
+ }
+}
diff --git a/src/test/hdfs/org/apache/hadoop/cli/CmdFactoryDFS.java b/src/test/hdfs/org/apache/hadoop/cli/CmdFactoryDFS.java
deleted file mode 100644
index 4d615f9..0000000
--- a/src/test/hdfs/org/apache/hadoop/cli/CmdFactoryDFS.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.cli;
-
-import org.apache.hadoop.cli.util.CLICommands;
-import org.apache.hadoop.cli.util.CLITestData;
-import org.apache.hadoop.cli.util.CmdFactory;
-import org.apache.hadoop.cli.util.CommandExecutor;
-import org.apache.hadoop.hdfs.tools.DFSAdmin;
-
-public abstract class CmdFactoryDFS extends CmdFactory {
- public static CommandExecutor getCommandExecutor(CLITestData.TestCmd cmd,
- String tag)
- throws IllegalArgumentException {
- CommandExecutor executor;
- switch (cmd.getType()) {
- case DFSADMIN:
- executor = new CLICommands.FSCmdExecutor(tag, new DFSAdmin());
- break;
- default:
- executor = CmdFactory.getCommandExecutor(cmd, tag);
- }
- return executor;
- }
-}
diff --git a/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java b/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java
index 4f73b23..45e3bdc 100644
--- a/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java
+++ b/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.cli;
-import org.apache.hadoop.cli.util.CLITestData.TestCmd;
+import org.apache.hadoop.cli.util.CLICommand;
import org.apache.hadoop.cli.util.CommandExecutor.Result;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -31,7 +31,7 @@
import org.junit.Before;
import org.junit.Test;
-public class TestHDFSCLI extends CLITestHelper {
+public class TestHDFSCLI extends CLITestHelperDFS {
protected MiniDFSCluster dfsCluster = null;
protected DistributedFileSystem dfs = null;
@@ -85,13 +85,13 @@
protected String expandCommand(final String cmd) {
String expCmd = cmd;
expCmd = expCmd.replaceAll("NAMENODE", namenode);
- expCmd = super.expandCommand(cmd);
+ expCmd = super.expandCommand(expCmd);
return expCmd;
}
@Override
- protected Result execute(TestCmd cmd) throws Exception {
- return CmdFactoryDFS.getCommandExecutor(cmd, namenode).executeCommand(cmd.getCmd());
+ protected Result execute(CLICommand cmd) throws Exception {
+ return cmd.getExecutor(namenode).executeCommand(cmd.getCmd());
}
@Test
diff --git a/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml b/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml
index 3ec1eed..7da99b5 100644
--- a/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml
+++ b/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml
@@ -7068,7 +7068,7 @@
<comparators>
<comparator>
<type>TokenComparator</type>
- <expected-output>Can not find listing for /file1</expected-output>
+ <expected-output>count: Can not find listing for /file1</expected-output>
</comparator>
</comparators>
</test>
@@ -7083,7 +7083,7 @@
<comparators>
<comparator>
<type>TokenComparator</type>
- <expected-output>Can not find listing for file1</expected-output>
+ <expected-output>count: Can not find listing for file1</expected-output>
</comparator>
</comparators>
</test>
@@ -7426,7 +7426,7 @@
<comparators>
<comparator>
<type>TokenComparator</type>
- <expected-output>Can not find listing for /file1</expected-output>
+ <expected-output>count: Can not find listing for /file1</expected-output>
</comparator>
</comparators>
</test>
@@ -7441,7 +7441,7 @@
<comparators>
<comparator>
<type>TokenComparator</type>
- <expected-output>Can not find listing for file1</expected-output>
+ <expected-output>count: Can not find listing for file1</expected-output>
</comparator>
</comparators>
</test>
@@ -7778,7 +7778,7 @@
<comparators>
<comparator>
<type>TokenComparator</type>
- <expected-output>Can not find listing for hdfs:///file1</expected-output>
+ <expected-output>count: Can not find listing for hdfs:/file1</expected-output>
</comparator>
</comparators>
</test>
@@ -7957,7 +7957,7 @@
<comparators>
<comparator>
<type>TokenComparator</type>
- <expected-output>Can not find listing for hdfs:///file1</expected-output>
+ <expected-output>count: Can not find listing for hdfs:/file1</expected-output>
</comparator>
</comparators>
</test>
@@ -8150,7 +8150,7 @@
<comparators>
<comparator>
<type>RegexpComparator</type>
- <expected-output>Can not find listing for hdfs://\w+[.a-z]*:[0-9]+/file1</expected-output>
+ <expected-output>count: Can not find listing for hdfs://\w+[.a-z]*:[0-9]+/file1</expected-output>
</comparator>
</comparators>
</test>
@@ -8329,7 +8329,7 @@
<comparators>
<comparator>
<type>RegexpComparator</type>
- <expected-output>Can not find listing for hdfs://\w+[.a-z]*:[0-9]+/file1</expected-output>
+ <expected-output>count: Can not find listing for hdfs://\w+[.a-z]*:[0-9]+/file1</expected-output>
</comparator>
</comparators>
</test>
@@ -15096,741 +15096,6 @@
</comparators>
</test>
- <test> <!-- TESTED -->
- <description>help: help for ls</description>
- <test-commands>
- <command>-fs NAMENODE -help ls</command>
- </test-commands>
- <cleanup-commands>
- </cleanup-commands>
- <comparators>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^-ls <path>:( |\t)*List the contents that match the specified file pattern. If( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*path is not specified, the contents of /user/<currentUser>( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*path is not specified, the contents of /user/<currentUser>( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*will be listed. Directory entries are of the form( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*dirName \(full path\) <dir>( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*and file entries are of the form( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*fileName\(full path\) <r n> size( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*where n is the number of replicas specified for the file( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*and size is the size of the file, in bytes.( )*</expected-output>
- </comparator>
- </comparators>
- </test>
-
- <test> <!-- TESTED -->
- <description>help: help for lsr</description>
- <test-commands>
- <command>-fs NAMENODE -help lsr</command>
- </test-commands>
- <cleanup-commands>
- </cleanup-commands>
- <comparators>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^-lsr <path>:( |\t)*Recursively list the contents that match the specified( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*file pattern.( |\t)*Behaves very similarly to hadoop fs -ls,( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*except that the data is shown for all the entries in the( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*subtree.( )*</expected-output>
- </comparator>
- </comparators>
- </test>
-
- <test> <!-- TESTED -->
- <description>help: help for get</description>
- <test-commands>
- <command>-fs NAMENODE -help get</command>
- </test-commands>
- <cleanup-commands>
- <!-- No cleanup -->
- </cleanup-commands>
- <comparators>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^-get( )*\[-ignoreCrc\]( )*\[-crc\]( )*<src> <localdst>:( |\t)*Copy files that match the file pattern <src>( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*to the local name.( )*<src> is kept.( )*When copying mutiple,( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*files, the destination must be a directory.( )*</expected-output>
- </comparator>
- </comparators>
- </test>
-
- <test> <!-- TESTED -->
- <description>help: help for du</description>
- <test-commands>
- <command>-fs NAMENODE -help du</command>
- </test-commands>
- <cleanup-commands>
- </cleanup-commands>
- <comparators>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^-du \[-s\] \[-h\] <path>:\s+Show the amount of space, in bytes, used by the files that\s*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^\s*match the specified file pattern. The following flags are optional:</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^\s*-s\s*Rather than showing the size of each individual file that</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^\s*matches the pattern, shows the total \(summary\) size.</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^\s*-h\s*Formats the sizes of files in a human-readable fashion</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>\s*rather than a number of bytes.</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^\s*Note that, even without the -s option, this only shows size summaries</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^\s*one level deep into a directory.</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^\s*The output is in the form </expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^\s*size\s+name\(full path\)\s*</expected-output>
- </comparator>
- </comparators>
- </test>
-
- <test> <!-- TESTED -->
- <description>help: help for dus</description>
- <test-commands>
- <command>-fs NAMENODE -help dus</command>
- </test-commands>
- <cleanup-commands>
- </cleanup-commands>
- <comparators>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^-dus <path>:( |\t)*Show the amount of space, in bytes, used by the files that( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*match the specified file pattern. This is equivalent to -du -s above.</expected-output>
- </comparator>
- </comparators>
- </test>
-
- <test> <!-- TESTED -->
- <description>help: help for count</description>
- <test-commands>
- <command>-fs NAMENODE -help count</command>
- </test-commands>
- <cleanup-commands>
- </cleanup-commands>
- <comparators>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^-count\[-q\] <path>: Count the number of directories, files and bytes under the paths( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*that match the specified file pattern. The output columns are:( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*DIR_COUNT FILE_COUNT CONTENT_SIZE FILE_NAME or( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*QUOTA REMAINING_QUATA SPACE_QUOTA REMAINING_SPACE_QUOTA( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*DIR_COUNT FILE_COUNT CONTENT_SIZE FILE_NAME( )*</expected-output>
- </comparator>
- </comparators>
- </test>
-
- <test> <!-- TESTED -->
- <description>help: help for mv</description>
- <test-commands>
- <command>-fs NAMENODE -help mv</command>
- </test-commands>
- <cleanup-commands>
- </cleanup-commands>
- <comparators>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^-mv <src> <dst>:( |\t)*Move files that match the specified file pattern <src>( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*to a destination <dst>. When moving multiple files, the( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*destination must be a directory.( )*</expected-output>
- </comparator>
- </comparators>
- </test>
-
- <test> <!-- TESTED -->
- <description>help: help for cp</description>
- <test-commands>
- <command>-fs NAMENODE -help cp</command>
- </test-commands>
- <cleanup-commands>
- </cleanup-commands>
- <comparators>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^-cp <src> <dst>:( |\t)*Copy files that match the file pattern <src> to a( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*destination. When copying multiple files, the destination( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*must be a directory.( )*</expected-output>
- </comparator>
- </comparators>
- </test>
-
- <test> <!-- TESTED -->
- <description>help: help for rm</description>
- <test-commands>
- <command>-fs NAMENODE -help rm</command>
- </test-commands>
- <cleanup-commands>
- </cleanup-commands>
- <comparators>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^-rm \[-skipTrash\] <src>:( |\t)*Delete all files that match the specified file pattern.( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*Equivalent to the Unix command "rm <src>"( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*-skipTrash option bypasses trash, if enabled, and immediately( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*deletes <src>( )*</expected-output>
- </comparator>
- </comparators>
- </test>
-
- <test> <!-- TESTED -->
- <description>help: help for rmr</description>
- <test-commands>
- <command>-fs NAMENODE -help rmr</command>
- </test-commands>
- <cleanup-commands>
- </cleanup-commands>
- <comparators>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^-rmr \[-skipTrash\] <src>:( |\t)*Remove all directories which match the specified file( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*pattern. Equivalent to the Unix command "rm -rf <src>"( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*-skipTrash option bypasses trash, if enabled, and immediately( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*deletes <src>( )*</expected-output>
- </comparator>
- </comparators>
- </test>
-
- <test> <!-- TESTED -->
- <description>help: help for put</description>
- <test-commands>
- <command>-fs NAMENODE -help put</command>
- </test-commands>
- <cleanup-commands>
- </cleanup-commands>
- <comparators>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^-put <localsrc> ... <dst>:( |\t)*Copy files from the local file system( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*into fs.( )*</expected-output>
- </comparator>
- </comparators>
- </test>
-
- <test> <!-- TESTED -->
- <description>help: help for copyFromLocal</description>
- <test-commands>
- <command>-fs NAMENODE -help copyFromLocal</command>
- </test-commands>
- <cleanup-commands>
- </cleanup-commands>
- <comparators>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^-copyFromLocal <localsrc> ... <dst>:( )*Identical to the -put command.( )*</expected-output>
- </comparator>
- </comparators>
- </test>
-
- <test> <!-- TESTED -->
- <description>help: help for moveFromLocal</description>
- <test-commands>
- <command>-fs NAMENODE -help moveFromLocal</command>
- </test-commands>
- <cleanup-commands>
- </cleanup-commands>
- <comparators>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^-moveFromLocal <localsrc> ... <dst>: Same as -put, except that the source is( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*deleted after it's copied.( )*</expected-output>
- </comparator>
- </comparators>
- </test>
-
- <test> <!-- TESTED -->
- <description>help: help for get</description>
- <test-commands>
- <command>-fs NAMENODE -help get</command>
- </test-commands>
- <cleanup-commands>
- <!-- No cleanup -->
- </cleanup-commands>
- <comparators>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^-get( )*\[-ignoreCrc\]( )*\[-crc\]( )*<src> <localdst>:( |\t)*Copy files that match the file pattern <src>( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*to the local name.( )*<src> is kept.( )*When copying mutiple,( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*files, the destination must be a directory.( )*</expected-output>
- </comparator>
- </comparators>
- </test>
-
- <test> <!-- TESTED -->
- <description>help: help for getmerge</description>
- <test-commands>
- <command>-fs NAMENODE -help getmerge</command>
- </test-commands>
- <cleanup-commands>
- </cleanup-commands>
- <comparators>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^-getmerge <src> <localdst>: Get all the files in the directories that( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*match the source file pattern and merge and sort them to only( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*one file on local fs. <src> is kept.( )*</expected-output>
- </comparator>
- </comparators>
- </test>
-
- <test> <!-- TESTED -->
- <description>help: help for cat</description>
- <test-commands>
- <command>-fs NAMENODE -help cat</command>
- </test-commands>
- <cleanup-commands>
- <!-- No cleanup -->
- </cleanup-commands>
- <comparators>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^-cat <src>:( |\t)*Fetch all files that match the file pattern <src>( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*and display their content on stdout.</expected-output>
- </comparator>
- </comparators>
- </test>
-
- <test> <!-- TESTED -->
- <description>help: help for copyToLocal</description>
- <test-commands>
- <command>-fs NAMENODE -help copyToLocal</command>
- </test-commands>
- <cleanup-commands>
- </cleanup-commands>
- <comparators>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^-copyToLocal \[-ignoreCrc\] \[-crc\] <src> <localdst>:( )*Identical to the -get command.( )*</expected-output>
- </comparator>
- </comparators>
- </test>
-
- <test> <!-- TESTED -->
- <description>help: help for moveToLocal</description>
- <test-commands>
- <command>-fs NAMENODE -help moveToLocal</command>
- </test-commands>
- <cleanup-commands>
- </cleanup-commands>
- <comparators>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^-moveToLocal <src> <localdst>:( )*Not implemented yet( )*</expected-output>
- </comparator>
- </comparators>
- </test>
-
- <test> <!-- TESTED -->
- <description>help: help for mkdir</description>
- <test-commands>
- <command>-fs NAMENODE -help mkdir</command>
- </test-commands>
- <cleanup-commands>
- </cleanup-commands>
- <comparators>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^-mkdir <path>:( |\t)*Create a directory in specified location.( )*</expected-output>
- </comparator>
- </comparators>
- </test>
-
- <test> <!-- TESTED -->
- <description>help: help for setrep</description>
- <test-commands>
- <command>-fs NAMENODE -help setrep</command>
- </test-commands>
- <cleanup-commands>
- </cleanup-commands>
- <comparators>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^-setrep \[-R\] \[-w\] <rep> <path/file>:( )*Set the replication level of a file.( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*The -R flag requests a recursive change of replication level( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*for an entire tree.( )*</expected-output>
- </comparator>
- </comparators>
- </test>
-
- <test> <!-- TESTED -->
- <description>help: help for touchz</description>
- <test-commands>
- <command>-fs NAMENODE -help touchz</command>
- </test-commands>
- <cleanup-commands>
- </cleanup-commands>
- <comparators>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^-touchz <path>: Creates a file of zero length( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*at <path> with current time as the timestamp of that <path>.( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)* An error is returned if the file exists with non-zero length( )*</expected-output>
- </comparator>
- </comparators>
- </test>
-
- <test> <!-- TESTED -->
- <description>help: help for test</description>
- <test-commands>
- <command>-fs NAMENODE -help test</command>
- </test-commands>
- <cleanup-commands>
- </cleanup-commands>
- <comparators>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^-test -\[ezd\] <path>: If file \{ exists, has zero length, is a directory( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*then return 0, else return 1.( )*</expected-output>
- </comparator>
- </comparators>
- </test>
-
- <test> <!-- TESTED -->
- <description>help: help for stat</description>
- <test-commands>
- <command>-fs NAMENODE -help stat</command>
- </test-commands>
- <cleanup-commands>
- </cleanup-commands>
- <comparators>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^-stat \[format\] <path>: Print statistics about the file/directory at <path>( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*in the specified format. Format accepts filesize in blocks \(%b\), filename \(%n\),( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*block size \(%o\), replication \(%r\), modification date \(%y, %Y\)( )*</expected-output>
- </comparator>
- </comparators>
- </test>
-
- <test> <!-- TESTED -->
- <description>help: help for tail</description>
- <test-commands>
- <command>-fs NAMENODE -help tail</command>
- </test-commands>
- <cleanup-commands>
- </cleanup-commands>
- <comparators>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^-tail \[-f\] <file>: Show the last 1KB of the file.( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*The -f option shows apended data as the file grows.( )*</expected-output>
- </comparator>
- </comparators>
- </test>
-
- <test> <!-- TESTED -->
- <description>help: help for chmod</description>
- <test-commands>
- <command>-fs NAMENODE -help chmod</command>
- </test-commands>
- <cleanup-commands>
- </cleanup-commands>
- <comparators>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^-chmod \[-R\] <MODE\[,MODE\]... \| OCTALMODE> PATH...( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*Changes permissions of a file.( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*This works similar to shell's chmod with a few exceptions.( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*-R( |\t)*modifies the files recursively. This is the only option( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*currently supported.( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*MODE( |\t)*Mode is same as mode used for chmod shell command.( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*Only letters recognized are 'rwxXt'. E.g. \+t,a\+r,g-w,\+rwx,o=r( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*OCTALMODE Mode specifed in 3 or 4 digits. If 4 digits, the first may( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*be 1 or 0 to turn the sticky bit on or off, respectively.( )*Unlike( |\t)*shell command, it is not possible to specify only part of the mode( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*E.g. 754 is same as u=rwx,g=rx,o=r( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*If none of 'augo' is specified, 'a' is assumed and unlike( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*shell command, no umask is applied.( )*</expected-output>
- </comparator>
- </comparators>
- </test>
-
- <test> <!-- TESTED -->
- <description>help: help for chown</description>
- <test-commands>
- <command>-fs NAMENODE -help chown</command>
- </test-commands>
- <cleanup-commands>
- </cleanup-commands>
- <comparators>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^-chown \[-R\] \[OWNER\]\[:\[GROUP\]\] PATH...( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*Changes owner and group of a file.( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*This is similar to shell's chown with a few exceptions.( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*-R( |\t)*modifies the files recursively. This is the only option( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*currently supported.( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*If only owner or group is specified then only owner or( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*group is modified.( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*The owner and group names may only cosists of digits, alphabet,( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*and any of '-_.@/' i.e. \[-_.@/a-zA-Z0-9\]. The names are case( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*sensitive.( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*WARNING: Avoid using '.' to separate user name and group though( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*Linux allows it. If user names have dots in them and you are( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*using local file system, you might see surprising results since( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*shell command 'chown' is used for local files.( )*</expected-output>
- </comparator>
- </comparators>
- </test>
-
- <test> <!-- TESTED -->
- <description>help: help for chgrp</description>
- <test-commands>
- <command>-fs NAMENODE -help chgrp</command>
- </test-commands>
- <cleanup-commands>
- </cleanup-commands>
- <comparators>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^-chgrp \[-R\] GROUP PATH...( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*This is equivalent to -chown ... :GROUP ...( )*</expected-output>
- </comparator>
- </comparators>
- </test>
-
- <test> <!-- TESTED -->
- <description>help: help for help</description>
- <test-commands>
- <command>-fs NAMENODE -help help</command>
- </test-commands>
- <cleanup-commands>
- </cleanup-commands>
- <comparators>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^-help \[cmd\]:( |\t)*Displays help for given command or all commands if none( )*</expected-output>
- </comparator>
- <comparator>
- <type>RegexpComparator</type>
- <expected-output>^( |\t)*is specified.( )*</expected-output>
- </comparator>
- </comparators>
- </test>
-
<test> <!--Tested -->
<description>help: help for dfsadmin report</description>
<test-commands>
@@ -16233,7 +15498,7 @@
<comparators>
<comparator>
<type>SubstringComparator</type>
- <expected-output>Can not find listing for /test1</expected-output>
+ <expected-output>setSpaceQuota: Directory does not exist: /test1</expected-output>
</comparator>
</comparators>
</test>
@@ -16299,7 +15564,7 @@
<comparators>
<comparator>
<type>SubstringComparator</type>
- <expected-output>Can not find listing for /test1</expected-output>
+ <expected-output>clrQuota: Directory does not exist: /test1</expected-output>
</comparator>
</comparators>
</test>
diff --git a/src/test/hdfs/org/apache/hadoop/cli/util/CLICommandDFSAdmin.java b/src/test/hdfs/org/apache/hadoop/cli/util/CLICommandDFSAdmin.java
new file mode 100644
index 0000000..03c441c
--- /dev/null
+++ b/src/test/hdfs/org/apache/hadoop/cli/util/CLICommandDFSAdmin.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cli.util;
+
+public class CLICommandDFSAdmin implements CLICommandTypes {
+}
diff --git a/src/test/hdfs/org/apache/hadoop/fs/TestFcHdfsSymlink.java b/src/test/hdfs/org/apache/hadoop/fs/TestFcHdfsSymlink.java
index 4e7b12e..c68cef6 100644
--- a/src/test/hdfs/org/apache/hadoop/fs/TestFcHdfsSymlink.java
+++ b/src/test/hdfs/org/apache/hadoop/fs/TestFcHdfsSymlink.java
@@ -241,5 +241,17 @@
} catch (IOException x) {
// Expected
}
- }
-}
\ No newline at end of file
+ }
+
+ @Test
+ /** Test symlink owner */
+ public void testLinkOwner() throws IOException {
+ Path file = new Path(testBaseDir1(), "file");
+ Path link = new Path(testBaseDir1(), "symlinkToFile");
+ createAndWriteFile(file);
+ fc.createSymlink(file, link, false);
+ FileStatus stat_file = fc.getFileStatus(file);
+ FileStatus stat_link = fc.getFileStatus(link);
+ assertEquals(stat_link.getOwner(), stat_file.getOwner());
+ }
+}
diff --git a/src/test/hdfs/org/apache/hadoop/fs/TestResolveHdfsSymlink.java b/src/test/hdfs/org/apache/hadoop/fs/TestResolveHdfsSymlink.java
new file mode 100644
index 0000000..0c17dc6
--- /dev/null
+++ b/src/test/hdfs/org/apache/hadoop/fs/TestResolveHdfsSymlink.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests whether FileContext can resolve an hdfs path that has a symlink to
+ * local file system. Also tests getDelegationTokens API in file context with
+ * underlying file system as Hdfs.
+ */
+public class TestResolveHdfsSymlink {
+ private static MiniDFSCluster cluster = null;
+
+ @BeforeClass
+ public static void setUp() throws IOException {
+ Configuration conf = new HdfsConfiguration();
+ cluster = new MiniDFSCluster.Builder(conf).build();
+ cluster.getNamesystem().getDelegationTokenSecretManager().startThreads();
+ cluster.waitActive();
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Tests resolution of an hdfs symlink to the local file system.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testFcResolveAfs() throws IOException, InterruptedException {
+ Configuration conf = new Configuration();
+ FileContext fcLocal = FileContext.getLocalFSFileContext();
+ FileContext fcHdfs = FileContext.getFileContext(cluster.getFileSystem()
+ .getUri());
+
+ Path alphaLocalPath = new Path(fcLocal.getDefaultFileSystem().getUri()
+ .toString(), "/tmp/alpha");
+ DFSTestUtil.createFile(FileSystem.getLocal(conf), alphaLocalPath, 16,
+ (short) 1, 2);
+
+ Path linkTarget = new Path(fcLocal.getDefaultFileSystem().getUri()
+ .toString(), "/tmp");
+ Path hdfsLink = new Path(fcHdfs.getDefaultFileSystem().getUri().toString(),
+ "/tmp/link");
+ fcHdfs.createSymlink(linkTarget, hdfsLink, true);
+
+ Path alphaHdfsPathViaLink = new Path(fcHdfs.getDefaultFileSystem().getUri()
+ .toString()
+ + "/tmp/link/alpha");
+
+ Set<AbstractFileSystem> afsList = fcHdfs
+ .resolveAbstractFileSystems(alphaHdfsPathViaLink);
+ Assert.assertEquals(2, afsList.size());
+ for (AbstractFileSystem afs : afsList) {
+ if ((!afs.equals(fcHdfs.getDefaultFileSystem()))
+ && (!afs.equals(fcLocal.getDefaultFileSystem()))) {
+ Assert.fail("Failed to resolve AFS correctly");
+ }
+ }
+ }
+
+ /**
+ * Tests delegation token APIs in FileContext for Hdfs; and renew and cancel
+ * APIs in Hdfs.
+ *
+ * @throws UnsupportedFileSystemException
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testFcDelegationToken() throws UnsupportedFileSystemException,
+ IOException, InterruptedException {
+ FileContext fcHdfs = FileContext.getFileContext(cluster.getFileSystem()
+ .getUri());
+ final AbstractFileSystem afs = fcHdfs.getDefaultFileSystem();
+ final List<Token<?>> tokenList =
+ afs.getDelegationTokens(UserGroupInformation.getCurrentUser()
+ .getUserName());
+ ((Hdfs) afs).renewDelegationToken((Token<DelegationTokenIdentifier>) tokenList
+ .get(0));
+ ((Hdfs) afs).cancelDelegationToken(
+ (Token<? extends AbstractDelegationTokenIdentifier>) tokenList.get(0));
+ }
+}
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java b/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
index 84cdbf0..422c46b 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -403,7 +403,8 @@
public static DataTransferProtocol.Status transferRbw(final ExtendedBlock b,
final DFSClient dfsClient, final DatanodeInfo... datanodes) throws IOException {
Assert.assertEquals(2, datanodes.length);
- final Socket s = DFSOutputStream.createSocketForPipeline(datanodes, dfsClient);
+ final Socket s = DFSOutputStream.createSocketForPipeline(datanodes[0],
+ datanodes.length, dfsClient);
final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length);
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
NetUtils.getOutputStream(s, writeTimeout),
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java
index 53fc15b..115416a 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java
@@ -47,6 +47,7 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.FSDataset;
+import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
@@ -1286,4 +1287,18 @@
System.out.println("results:\n" + results);
return results;
}
+
+ /**
+ * default setting is file:// which is not a DFS
+ * so DFSAdmin should throw and catch InvalidArgumentException
+ * and return -1 exit code.
+ * @throws Exception
+ */
+ public void testInvalidShell() throws Exception {
+ Configuration conf = new Configuration(); // default FS (non-DFS)
+ DFSAdmin admin = new DFSAdmin();
+ admin.setConf(conf);
+ int res = admin.run(new String[] {"-refreshNodes"});
+ assertEquals("expected to fail -1", res , -1);
+ }
}
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
index ef2e924..88cf7bd 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
@@ -44,6 +44,10 @@
public class TestDistributedFileSystem {
private static final Random RAN = new Random();
+ {
+ ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+ }
+
private boolean dualPortTesting = false;
private HdfsConfiguration getTestConfiguration() {
@@ -100,26 +104,94 @@
@Test
public void testDFSClient() throws Exception {
Configuration conf = getTestConfiguration();
+ final long grace = 1000L;
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
- final Path filepath = new Path("/test/LeaseChecker/foo");
+ final String filepathstring = "/test/LeaseChecker/foo";
+ final Path[] filepaths = new Path[4];
+ for(int i = 0; i < filepaths.length; i++) {
+ filepaths[i] = new Path(filepathstring + i);
+ }
final long millis = System.currentTimeMillis();
{
DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
- assertFalse(dfs.dfs.isLeaseCheckerStarted());
+ dfs.dfs.leasechecker.setGraceSleepPeriod(grace);
+ assertFalse(dfs.dfs.leasechecker.isRunning());
- //create a file
- FSDataOutputStream out = dfs.create(filepath);
- assertTrue(dfs.dfs.isLeaseCheckerStarted());
-
- //write something and close
- out.writeLong(millis);
- assertTrue(dfs.dfs.isLeaseCheckerStarted());
- out.close();
- assertTrue(dfs.dfs.isLeaseCheckerStarted());
+ {
+ //create a file
+ final FSDataOutputStream out = dfs.create(filepaths[0]);
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ //write something
+ out.writeLong(millis);
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ //close
+ out.close();
+ Thread.sleep(grace/4*3);
+ //within grace period
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ for(int i = 0; i < 3; i++) {
+ if (dfs.dfs.leasechecker.isRunning()) {
+ Thread.sleep(grace/2);
+ }
+ }
+ //passed grace period
+ assertFalse(dfs.dfs.leasechecker.isRunning());
+ }
+
+ {
+ //create file1
+ final FSDataOutputStream out1 = dfs.create(filepaths[1]);
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ //create file2
+ final FSDataOutputStream out2 = dfs.create(filepaths[2]);
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+
+ //write something to file1
+ out1.writeLong(millis);
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ //close file1
+ out1.close();
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+
+ //write something to file2
+ out2.writeLong(millis);
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ //close file2
+ out2.close();
+ Thread.sleep(grace/4*3);
+ //within grace period
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ }
+
+ {
+ //create file3
+ final FSDataOutputStream out3 = dfs.create(filepaths[3]);
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ Thread.sleep(grace/4*3);
+ //passed previous grace period, should still running
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ //write something to file3
+ out3.writeLong(millis);
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ //close file3
+ out3.close();
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ Thread.sleep(grace/4*3);
+ //within grace period
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ for(int i = 0; i < 3; i++) {
+ if (dfs.dfs.leasechecker.isRunning()) {
+ Thread.sleep(grace/2);
+ }
+ }
+ //passed grace period
+ assertFalse(dfs.dfs.leasechecker.isRunning());
+ }
+
dfs.close();
}
@@ -146,15 +218,15 @@
{
DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
- assertFalse(dfs.dfs.isLeaseCheckerStarted());
+ assertFalse(dfs.dfs.leasechecker.isRunning());
//open and check the file
- FSDataInputStream in = dfs.open(filepath);
- assertFalse(dfs.dfs.isLeaseCheckerStarted());
+ FSDataInputStream in = dfs.open(filepaths[0]);
+ assertFalse(dfs.dfs.leasechecker.isRunning());
assertEquals(millis, in.readLong());
- assertFalse(dfs.dfs.isLeaseCheckerStarted());
+ assertFalse(dfs.dfs.leasechecker.isRunning());
in.close();
- assertFalse(dfs.dfs.isLeaseCheckerStarted());
+ assertFalse(dfs.dfs.leasechecker.isRunning());
dfs.close();
}
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend2.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend2.java
index 35a561a..9733695 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend2.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend2.java
@@ -59,7 +59,7 @@
private byte[] fileContents = null;
- int numDatanodes = 5;
+ int numDatanodes = 6;
int numberOfFiles = 50;
int numThreads = 10;
int numAppendsPerThread = 20;
@@ -350,7 +350,7 @@
// Insert them into a linked list.
//
for (int i = 0; i < numberOfFiles; i++) {
- short replication = (short)(AppendTestUtil.nextInt(numDatanodes) + 1);
+ final int replication = AppendTestUtil.nextInt(numDatanodes - 2) + 1;
Path testFile = new Path("/" + i + ".dat");
FSDataOutputStream stm =
AppendTestUtil.createFile(fs, testFile, replication);
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java
index 6806359..0748e9d 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java
@@ -149,7 +149,7 @@
*/
@Test(timeout=60000)
public void testRecoverFinalizedBlock() throws Throwable {
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5).build();
try {
cluster.waitActive();
@@ -220,7 +220,7 @@
*/
@Test(timeout=60000)
public void testCompleteOtherLeaseHoldersFile() throws Throwable {
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5).build();
try {
cluster.waitActive();
@@ -296,8 +296,7 @@
* Mockito answer helper that triggers one latch as soon as the
* method is called, then waits on another before continuing.
*/
- @SuppressWarnings("unchecked")
- private static class DelayAnswer implements Answer {
+ private static class DelayAnswer implements Answer<Object> {
private final CountDownLatch fireLatch = new CountDownLatch(1);
private final CountDownLatch waitLatch = new CountDownLatch(1);
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
index 3322756..fa69017 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
@@ -630,7 +630,8 @@
expectedException != null
&& expectedException instanceof FileNotFoundException);
- EnumSet<CreateFlag> overwriteFlag = EnumSet.of(CreateFlag.OVERWRITE);
+ EnumSet<CreateFlag> overwriteFlag =
+ EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
// Overwrite a file in root dir, should succeed
out = createNonRecursive(fs, path, 1, overwriteFlag);
out.close();
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
index 8e0c2a3..ae70bd7 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
@@ -26,6 +26,7 @@
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
@@ -202,19 +203,12 @@
try {
dfs2.create(filepath, false, BUF_SIZE, REPLICATION_NUM, BLOCK_SIZE);
fail("Creation of an existing file should never succeed.");
+ } catch (FileAlreadyExistsException ex) {
+ done = true;
+ } catch (AlreadyBeingCreatedException ex) {
+ AppendTestUtil.LOG.info("GOOD! got " + ex.getMessage());
} catch (IOException ioe) {
- final String message = ioe.getMessage();
- if (message.contains("file exists")) {
- AppendTestUtil.LOG.info("done", ioe);
- done = true;
- }
- else if (message.contains(
- AlreadyBeingCreatedException.class.getSimpleName())) {
- AppendTestUtil.LOG.info("GOOD! got " + message);
- }
- else {
- AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
- }
+ AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
}
if (!done) {
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java
index 4347180..6145313 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java
@@ -19,7 +19,6 @@
import java.io.IOException;
import java.io.OutputStream;
-import java.security.PrivilegedExceptionAction;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
@@ -71,7 +70,7 @@
final int half = BLOCK_SIZE/2;
//a. On Machine M1, Create file. Write half block of data.
- // Invoke (DFSOutputStream).fsync() on the dfs file handle.
+ // Invoke DFSOutputStream.hflush() on the dfs file handle.
// Do not close file yet.
{
final FSDataOutputStream out = fs.create(p, true,
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index efcc9b8..3cec060 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -497,6 +497,16 @@
return null;
}
+ @Override
+ public synchronized String getReplicaString(String bpid, long blockId) {
+ Replica r = null;
+ final Map<Block, BInfo> map = blockMap.get(bpid);
+ if (map != null) {
+ r = map.get(new Block(blockId));
+ }
+ return r == null? "null": r.toString();
+ }
+
@Override // FSDatasetInterface
public Block getStoredBlock(String bpid, long blkid) throws IOException {
final Map<Block, BInfo> map = blockMap.get(bpid);
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
index d458268..9f9daec 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
@@ -511,11 +511,11 @@
long start = System.currentTimeMillis();
int count = 0;
while (r == null) {
- waitTil(50);
+ waitTil(5);
r = ((FSDataset) cluster.getDataNodes().get(DN_N1).getFSDataset()).
fetchReplicaInfo(bpid, bl.getBlockId());
long waiting_period = System.currentTimeMillis() - start;
- if (count++ % 10 == 0)
+ if (count++ % 100 == 0)
if(LOG.isDebugEnabled()) {
LOG.debug("Has been waiting for " + waiting_period + " ms.");
}
@@ -530,7 +530,7 @@
}
start = System.currentTimeMillis();
while (state != HdfsConstants.ReplicaState.TEMPORARY) {
- waitTil(100);
+ waitTil(5);
state = r.getState();
if(LOG.isDebugEnabled()) {
LOG.debug("Keep waiting for " + bl.getBlockName() +
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
index fb7909a..b519752 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
@@ -555,7 +555,7 @@
// dummyActionNoSynch(fileIdx);
nameNode.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(),
clientName, new EnumSetWritable<CreateFlag>(EnumSet
- .of(CreateFlag.OVERWRITE)), true, replication, BLOCK_SIZE);
+ .of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication, BLOCK_SIZE);
long end = System.currentTimeMillis();
for(boolean written = !closeUponCreate; !written;
written = nameNode.complete(fileNames[daemonId][inputIdx],
@@ -971,7 +971,7 @@
for(int idx=0; idx < nrFiles; idx++) {
String fileName = nameGenerator.getNextFileName("ThroughputBench");
nameNode.create(fileName, FsPermission.getDefault(), clientName,
- new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.OVERWRITE)), true, replication,
+ new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication,
BLOCK_SIZE);
ExtendedBlock lastBlock = addBlocks(fileName, clientName);
nameNode.complete(fileName, clientName, lastBlock);
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java
index 85d37ca..ec27e21 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java
@@ -77,7 +77,7 @@
String newRacks[] = {"/rack2"} ;
cluster.startDataNodes(conf, 1, true, null, newRacks);
- while ( (numRacks < 2) || (curReplicas < REPLICATION_FACTOR) ||
+ while ( (numRacks < 2) || (curReplicas != REPLICATION_FACTOR) ||
(neededReplicationSize > 0) ) {
LOG.info("Waiting for replication");
Thread.sleep(600);
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
index 391112b..744573a 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
@@ -42,6 +42,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -738,11 +739,13 @@
public void testSaveNamespace() throws IOException {
MiniDFSCluster cluster = null;
DistributedFileSystem fs = null;
+ FileContext fc;
try {
Configuration conf = new HdfsConfiguration();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes).format(false).build();
cluster.waitActive();
fs = (DistributedFileSystem)(cluster.getFileSystem());
+ fc = FileContext.getFileContext(cluster.getURI(0));
// Saving image without safe mode should fail
DFSAdmin admin = new DFSAdmin(conf);
@@ -758,6 +761,12 @@
Path file = new Path("namespace.dat");
writeFile(fs, file, replication);
checkFile(fs, file, replication);
+
+ // create new link
+ Path symlink = new Path("file.link");
+ fc.createSymlink(file, symlink, false);
+ assertTrue(fc.getFileLinkStatus(symlink).isSymlink());
+
// verify that the edits file is NOT empty
Collection<URI> editsDirs = cluster.getNameEditsDirs(0);
for(URI uri : editsDirs) {
@@ -786,6 +795,8 @@
cluster.waitActive();
fs = (DistributedFileSystem)(cluster.getFileSystem());
checkFile(fs, file, replication);
+ fc = FileContext.getFileContext(cluster.getURI(0));
+ assertTrue(fc.getFileLinkStatus(symlink).isSymlink());
} finally {
if(fs != null) fs.close();
if(cluster!= null) cluster.shutdown();
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
index 5020ff7..56aeec3 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
@@ -20,12 +20,15 @@
import junit.framework.TestCase;
import java.io.*;
import java.net.URI;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.*;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -303,4 +306,37 @@
if(cluster != null) cluster.shutdown();
}
}
+
+ public void testEditChecksum() throws Exception {
+ // start a cluster
+ Configuration conf = new HdfsConfiguration();
+ MiniDFSCluster cluster = null;
+ FileSystem fileSys = null;
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
+ cluster.waitActive();
+ fileSys = cluster.getFileSystem();
+ final FSNamesystem namesystem = cluster.getNamesystem();
+
+ FSImage fsimage = namesystem.getFSImage();
+ final FSEditLog editLog = fsimage.getEditLog();
+ fileSys.mkdirs(new Path("/tmp"));
+ File editFile = editLog.getFsEditName();
+ editLog.close();
+ cluster.shutdown();
+ long fileLen = editFile.length();
+ System.out.println("File name: " + editFile + " len: " + fileLen);
+ RandomAccessFile rwf = new RandomAccessFile(editFile, "rw");
+ rwf.seek(fileLen-4); // seek to checksum bytes
+ int b = rwf.readInt();
+ rwf.seek(fileLen-4);
+ rwf.writeInt(b+1);
+ rwf.close();
+
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).format(false).build();
+ fail("should not be able to start");
+ } catch (ChecksumException e) {
+ // expected
+ }
+ }
}
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
index 901cbb0..b4d0e51 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
@@ -33,9 +33,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.cli.CmdFactoryDFS;
-import org.apache.hadoop.cli.util.CLITestData;
-import org.apache.hadoop.cli.util.CommandExecutor;
+import org.apache.hadoop.cli.CLITestCmdDFS;
+import org.apache.hadoop.cli.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -350,10 +349,9 @@
String cmd = "-fs NAMENODE -restoreFailedStorage false";
String namenode = config.get(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "file:///");
- CommandExecutor executor =
- CmdFactoryDFS.getCommandExecutor(
- new CLITestData.TestCmd(cmd, CLITestData.TestCmd.CommandType.DFSADMIN),
- namenode);
+ CommandExecutor executor =
+ new CLITestCmdDFS(cmd, new CLICommandDFSAdmin()).getExecutor(namenode);
+
executor.executeCommand(cmd);
restore = fsi.getStorage().getRestoreFailedStorage();
assertFalse("After set true call restore is " + restore, restore);
diff --git a/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java b/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
index 82485ae..c8e9a18 100644
--- a/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
+++ b/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
@@ -20,6 +20,7 @@
import static org.junit.Assert.*;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
@@ -120,4 +121,33 @@
0L, 0L, preferredBlockSize);
}
+ @Test
+ public void testGetFullPathName() {
+ PermissionStatus perms = new PermissionStatus(
+ userName, null, FsPermission.getDefault());
+
+ replication = 3;
+ preferredBlockSize = 128*1024*1024;
+ INodeFile inf = new INodeFile(perms, null, replication,
+ 0L, 0L, preferredBlockSize);
+ inf.setLocalName("f");
+
+ INodeDirectory root = new INodeDirectory(INodeDirectory.ROOT_NAME, perms);
+ INodeDirectory dir = new INodeDirectory("d", perms);
+
+ assertEquals("f", inf.getFullPathName());
+ assertEquals("", inf.getLocalParentDir());
+
+ dir.addChild(inf, false, false);
+ assertEquals("d"+Path.SEPARATOR+"f", inf.getFullPathName());
+ assertEquals("d", inf.getLocalParentDir());
+
+ root.addChild(dir, false, false);
+ assertEquals(Path.SEPARATOR+"d"+Path.SEPARATOR+"f", inf.getFullPathName());
+ assertEquals(Path.SEPARATOR+"d", dir.getFullPathName());
+
+ assertEquals(Path.SEPARATOR, root.getFullPathName());
+ assertEquals(Path.SEPARATOR, root.getLocalParentDir());
+
+ }
}