HDFS-1592. Datanode startup doesn't honor volumes.tolerated. Contributed by Bharath Mundlapudi.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@1127995 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 725b739..d33c9f5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -626,6 +626,9 @@
 
     HDFS-1999. Tests use deprecated configs. (Aaron T. Myers via eli)
 
+    HDFS-1592. Datanode startup doesn't honor volumes.tolerated. 
+    (Bharath Mundlapudi via jitendra)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index a4f691e..52e7dad 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -998,9 +998,14 @@
       if (blockScanner != null) {
         blockScanner.removeBlockPool(this.getBlockPoolId());
       }
-     
-      data.shutdownBlockPool(this.getBlockPoolId());
-      storage.removeBlockPoolStorage(this.getBlockPoolId());
+    
+      if (data != null) { 
+        data.shutdownBlockPool(this.getBlockPoolId());
+      }
+
+      if (storage != null) {
+        storage.removeBlockPoolStorage(this.getBlockPoolId());
+      }
     }
 
     /**
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
index 1d94382..139e921 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
@@ -53,6 +53,7 @@
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.StringUtils;
 
 /** 
  * Data storage information file.
@@ -170,14 +171,17 @@
         }
       } catch (IOException ioe) {
         sd.unlock();
-        throw ioe;
+        LOG.warn("Ignoring storage directory "+ dataDir
+        		+ " due to an exception: " + StringUtils.stringifyException(ioe));
+        //continue with other good dirs
+        continue;
       }
       // add to the storage list
       addStorageDir(sd);
       dataDirStates.add(curState);
     }
 
-    if (dataDirs.size() == 0)  // none of the data dirs exist
+    if (dataDirs.size() == 0 || dataDirStates.size() == 0)  // none of the data dirs exist
       throw new IOException(
           "All specified directories are not accessible or do not exist.");
 
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java b/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
index 3f37020..e9c1484 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
@@ -1145,12 +1145,21 @@
     final int volFailuresTolerated =
       conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
                   DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);
-    this.validVolsRequired = storage.getNumStorageDirs() - volFailuresTolerated; 
-    if (validVolsRequired < 1 ||
-        validVolsRequired > storage.getNumStorageDirs()) {
-      DataNode.LOG.error("Invalid value " + volFailuresTolerated + " for " +
-          DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY);
+
+    String[] dataDirs = conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
+
+    int volsConfigured = (dataDirs == null) ? 0 : dataDirs.length;
+
+    this.validVolsRequired = volsConfigured - volFailuresTolerated;
+
+    if (validVolsRequired < 1
+        || validVolsRequired > storage.getNumStorageDirs()) {
+      throw new DiskErrorException("Too many failed volumes - "
+          + "current valid volumes: " + storage.getNumStorageDirs() 
+          + ", volumes configured: " + volsConfigured 
+          + ", volume failures tolerated: " + volFailuresTolerated );
     }
+
     FSVolume[] volArray = new FSVolume[storage.getNumStorageDirs()];
     for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
       volArray[idx] = new FSVolume(storage.getStorageDir(idx).getCurrentDir(),
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureToleration.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureToleration.java
index 4bb93bd..610241d 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureToleration.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureToleration.java
@@ -18,10 +18,13 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.File;
+import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -129,32 +132,91 @@
     DFSTestUtil.waitReplication(fs, file2, (short)2);
   }
 
+  /** 
+   * Restart the cluster with a new volume tolerated value.
+   * @param volTolerated
+   * @param manageCluster
+   * @throws IOException
+   */
+  private void restartCluster(int volTolerated, boolean manageCluster)
+      throws IOException {
+    //Make sure no datanode is running
+    cluster.shutdownDataNodes();
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, volTolerated);
+    cluster.startDataNodes(conf, 1, manageCluster, null, null);
+    cluster.waitActive();
+  }
+
   /**
-   * Test invalid DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY values.
+   * Test for different combination of volume configs and volumes tolerated 
+   * values.
    */
   @Test
-  public void testInvalidFailedVolumesConfig() throws Exception {
-    assumeTrue(!System.getProperty("os.name").startsWith("Windows"));
-
-    /*
-     * Bring up another datanode that has an invalid value set.
-     * We should still be able to create a file with two replicas
-     * since the minimum valid volume parameter is only checked
-     * when we experience a disk error.
-     */
-    conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, -1);
-    cluster.startDataNodes(conf, 1, true, null, null);
-    cluster.waitActive();
-    Path file1 = new Path("/test1");
-    DFSTestUtil.createFile(fs, file1, 1024, (short)2, 1L);
-    DFSTestUtil.waitReplication(fs, file1, (short)2);
+  public void testVolumeAndTolerableConfiguration() throws Exception {
+    // Check if Block Pool Service exit for an invalid conf value.
+    testVolumeConfig(-1, 0, false, true);
 
     // Ditto if the value is too big.
-    conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 100);
-    cluster.startDataNodes(conf, 1, true, null, null);
-    cluster.waitActive();
-    Path file2 = new Path("/test1");
-    DFSTestUtil.createFile(fs, file2, 1024, (short)2, 1L);
-    DFSTestUtil.waitReplication(fs, file2, (short)2);
+    testVolumeConfig(100, 0, false, true);
+    
+    // Test for one failed volume
+    testVolumeConfig(0, 1, false, false);
+    
+    // Test for one failed volume with 1 tolerable volume
+    testVolumeConfig(1, 1, true, false);
+    
+    // Test all good volumes
+    testVolumeConfig(0, 0, true, false);
+    
+    // Test all failed volumes
+    testVolumeConfig(0, 2, false, false);
   }
+
+  /**
+   * Tests for a given volumes to be tolerated and volumes failed.
+   * 
+   * @param volumesTolerated
+   * @param volumesFailed
+   * @param expectedBPServiceState
+   * @param clusterManaged
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private void testVolumeConfig(int volumesTolerated, int volumesFailed,
+      boolean expectedBPServiceState, boolean clusterManaged)
+      throws IOException, InterruptedException {
+    assumeTrue(!System.getProperty("os.name").startsWith("Windows"));
+    final int dnIndex = 0;
+    File[] dirs = {
+        new File(MiniDFSCluster.getStorageDir(dnIndex, 0), "current"),
+        new File(MiniDFSCluster.getStorageDir(dnIndex, 1), "current") };
+
+    try {
+      for (int i = 0; i < volumesFailed; i++) {
+        prepareDirToFail(dirs[i]);
+      }
+      restartCluster(volumesTolerated, clusterManaged);
+      assertEquals(expectedBPServiceState, cluster.getDataNodes().get(0)
+          .isBPServiceAlive(cluster.getNamesystem().getBlockPoolId()));
+    } finally {
+      // restore its old permission
+      for (File dir : dirs) {
+        FileUtil.chmod(dir.toString(), "755");
+      }
+    }
+  }
+
+  /** 
+   * Prepare directories for a failure, set dir permission to 000
+   * @param dir
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private void prepareDirToFail(File dir) throws IOException,
+      InterruptedException {
+    dir.mkdirs();
+    assertTrue("Couldn't chmod local vol", FileUtil
+        .chmod(dir.toString(), "000") == 0);
+  }
+
 }