HDFS-2020. Fix TestDFSUpgradeFromImage by removing the use of DataNode as a singleton. Contributed by Suresh Srinivas.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@1130368 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 891afb9..e22e194 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -659,6 +659,9 @@
     HDFS-2021. Update numBytesAcked before sending the ack in PacketResponder.
     (John George via szetszwo)
 
+    HDFS-2020. Fix TestDFSUpgradeFromImage by removing the use of DataNode
+    as a singleton. (suresh via todd)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java b/src/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
index 51ed956..304a64b 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
@@ -63,10 +63,6 @@
 
   private String blockpoolID = ""; // id of the blockpool
 
-  BlockPoolSliceStorage() {
-    super(NodeType.DATA_NODE);
-  }
-
   public BlockPoolSliceStorage(StorageInfo storageInfo, String bpid) {
     super(NodeType.DATA_NODE, storageInfo);
     blockpoolID = bpid;
@@ -84,13 +80,14 @@
   /**
    * Analyze storage directories. Recover from previous transitions if required.
    * 
+   * @param datanode Datanode to which this storage belongs to
    * @param nsInfo namespace information
    * @param dataDirs storage directories of block pool
    * @param startOpt startup option
    * @throws IOException on error
    */
-  void recoverTransitionRead(NamespaceInfo nsInfo, Collection<File> dataDirs,
-      StartupOption startOpt) throws IOException {
+  void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo,
+      Collection<File> dataDirs, StartupOption startOpt) throws IOException {
     assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() 
         : "Block-pool and name-node layout versions must be the same.";
 
@@ -140,7 +137,7 @@
     // During startup some of them can upgrade or roll back
     // while others could be up-to-date for the regular startup.
     for (int idx = 0; idx < getNumStorageDirs(); idx++) {
-      doTransition(getStorageDir(idx), nsInfo, startOpt);
+      doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
       assert getLayoutVersion() == nsInfo.getLayoutVersion() 
           : "Data-node and name-node layout versions must be the same.";
       assert getCTime() == nsInfo.getCTime() 
@@ -228,12 +225,13 @@
    * Upgrade if this.LV > LAYOUT_VERSION || this.cTime < namenode.cTime Regular
    * startup if this.LV = LAYOUT_VERSION && this.cTime = namenode.cTime
    * 
-   * @param sd storage directory,
+   * @param dn DataNode to which this storage belongs to
+   * @param sd storage directory <SD>/current/<bpid>
    * @param nsInfo namespace info
    * @param startOpt startup option
    * @throws IOException
    */
-  private void doTransition(StorageDirectory sd, // i.e. <SD>/current/<bpid>
+  private void doTransition(DataNode datanode, StorageDirectory sd,
       NamespaceInfo nsInfo, StartupOption startOpt) throws IOException {
     if (startOpt == StartupOption.ROLLBACK)
       doRollback(sd, nsInfo); // rollback if applicable
@@ -259,7 +257,9 @@
       return; // regular startup
     
     // verify necessity of a distributed upgrade
-    verifyDistributedUpgradeProgress(nsInfo);
+    UpgradeManagerDatanode um = 
+      datanode.getUpgradeManagerDatanode(nsInfo.getBlockPoolID());
+    verifyDistributedUpgradeProgress(um, nsInfo);
     if (this.layoutVersion > FSConstants.LAYOUT_VERSION
         || this.cTime < nsInfo.getCTime()) {
       doUpgrade(sd, nsInfo); // upgrade
@@ -474,10 +474,8 @@
     LOG.info( hardLink.linkStats.report() );
   }
 
-  private void verifyDistributedUpgradeProgress(NamespaceInfo nsInfo)
-      throws IOException {
-    UpgradeManagerDatanode um = 
-      DataNode.getUpgradeManagerDatanode(nsInfo.getBlockPoolID());
+  private void verifyDistributedUpgradeProgress(UpgradeManagerDatanode um,
+      NamespaceInfo nsInfo) throws IOException {
     assert um != null : "DataNode.upgradeManager is null.";
     um.setUpgradeState(false, getLayoutVersion());
     um.initializeUpgrade(nsInfo);
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java b/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
index 9504867..535619e 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
@@ -29,7 +29,6 @@
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode.BPOfferService;
@@ -101,7 +100,7 @@
   private void waitForInit(String bpid) {
     UpgradeManagerDatanode um = null;
     if(bpid != null && !bpid.equals(""))
-      um = DataNode.getUpgradeManagerDatanode(bpid);
+      um = datanode.getUpgradeManagerDatanode(bpid);
     
     while ((um != null && ! um.isUpgradeCompleted())
         || (getBlockPoolSetSize() < datanode.getAllBpOs().length)
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index bbcdf89..c73bc05 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -37,7 +37,6 @@
 import java.net.UnknownHostException;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
-import java.security.NoSuchAlgorithmException;
 import java.security.PrivilegedExceptionAction;
 import java.security.SecureRandom;
 import java.util.AbstractList;
@@ -353,7 +352,6 @@
   DataNodeMetrics metrics;
   private InetSocketAddress selfAddr;
   
-  private static volatile DataNode datanodeObject = null;
   private volatile String hostName; // Host name of this datanode
   
   private static String dnThreadName;
@@ -398,8 +396,6 @@
            final SecureResources resources) throws IOException {
     super(conf);
 
-    DataNode.setDataNode(this);
-    
     try {
       hostName = getHostName(conf);
       startDataNode(conf, dataDirs, resources);
@@ -590,7 +586,7 @@
       reason = "verification is supported only with FSDataset";
     } 
     if (reason == null) {
-      directoryScanner = new DirectoryScanner((FSDataset) data, conf);
+      directoryScanner = new DirectoryScanner(this, (FSDataset) data, conf);
       directoryScanner.start();
     } else {
       LOG.info("Periodic Directory Tree Verification scan is disabled because " +
@@ -793,7 +789,8 @@
         bpRegistration.storageInfo.clusterID = bpNSInfo.clusterID;
       } else {
         // read storage info, lock data dirs and transition fs state if necessary          
-        storage.recoverTransitionRead(blockPoolId, bpNSInfo, dataDirs, startOpt);
+        storage.recoverTransitionRead(DataNode.this, blockPoolId, bpNSInfo,
+            dataDirs, startOpt);
         LOG.info("setting up storage: nsid=" + storage.namespaceID + ";bpid="
             + blockPoolId + ";lv=" + storage.layoutVersion + ";nsInfo="
             + bpNSInfo);
@@ -1324,7 +1321,7 @@
     synchronized UpgradeManagerDatanode getUpgradeManager() {
       if(upgradeManager == null)
         upgradeManager = 
-          new UpgradeManagerDatanode(DataNode.getDataNode(), blockPoolId);
+          new UpgradeManagerDatanode(DataNode.this, blockPoolId);
       
       return upgradeManager;
     }
@@ -1403,7 +1400,7 @@
       conf.getBoolean("dfs.datanode.simulateddatastorage", false);
 
     if (simulatedFSDataset) {
-      storage.createStorageID();
+      storage.createStorageID(getPort());
       // it would have been better to pass storage as a parameter to
       // constructor below - need to augment ReflectionUtils used below.
       conf.set(DFS_DATANODE_STORAGEID_KEY, getStorageId());
@@ -1416,7 +1413,7 @@
         throw new IOException(StringUtils.stringifyException(e));
       }
     } else {
-      data = new FSDataset(storage, conf);
+      data = new FSDataset(this, storage, conf);
     }
   }
 
@@ -1491,15 +1488,6 @@
            SocketChannel.open().socket() : new Socket();                                   
   }
 
-  private static void setDataNode(DataNode node) {
-    datanodeObject = node;
-  }
-
-  /** Return the DataNode object */
-  public static DataNode getDataNode() {
-    return datanodeObject;
-  } 
-
   public static InterDatanodeProtocol createInterDataNodeProtocolProxy(
       DatanodeID datanodeid, final Configuration conf, final int socketTimeout)
     throws IOException {
@@ -1551,11 +1539,7 @@
     dnId.storageID = createNewStorageId(dnId.getPort());
   }
   
-  static String createNewStorageId() {
-    return createNewStorageId(datanodeObject.getPort());
-  }
-  
-  private static String createNewStorageId(int port) {
+  static String createNewStorageId(int port) {
     /* Return 
      * "DS-randInt-ipaddr-currentTimeMillis"
      * It is considered extermely rare for all these numbers to match
@@ -1728,9 +1712,8 @@
     return threadGroup == null ? 0 : threadGroup.activeCount();
   }
     
-  static UpgradeManagerDatanode getUpgradeManagerDatanode(String bpid) {
-    DataNode dn = getDataNode();
-    BPOfferService bpos = dn.blockPoolManager.get(bpid);
+  UpgradeManagerDatanode getUpgradeManagerDatanode(String bpid) {
+    BPOfferService bpos = blockPoolManager.get(bpid);
     if(bpos==null) {
       return null;
     }
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
index 6af702f..42f6988 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
@@ -109,11 +109,11 @@
     this.storageID = newStorageID;
   }
   
-  synchronized void createStorageID() {
+  synchronized void createStorageID(int datanodePort) {
     if (storageID != null && !storageID.isEmpty()) {
       return;
     }
-    storageID = DataNode.createNewStorageId();
+    storageID = DataNode.createNewStorageId(datanodePort);
   }
   
   /**
@@ -130,10 +130,9 @@
    * @param startOpt startup option
    * @throws IOException
    */
-  synchronized void recoverTransitionRead(NamespaceInfo nsInfo,
-                             Collection<File> dataDirs,
-                             StartupOption startOpt
-                             ) throws IOException {
+  synchronized void recoverTransitionRead(DataNode datanode,
+      NamespaceInfo nsInfo, Collection<File> dataDirs, StartupOption startOpt)
+      throws IOException {
     if (initilized) {
       // DN storage has been initialized, no need to do anything
       return;
@@ -192,13 +191,13 @@
     // During startup some of them can upgrade or rollback 
     // while others could be uptodate for the regular startup.
     for(int idx = 0; idx < getNumStorageDirs(); idx++) {
-      doTransition(getStorageDir(idx), nsInfo, startOpt);
+      doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
       assert this.getLayoutVersion() == nsInfo.getLayoutVersion() :
         "Data-node and name-node layout versions must be the same.";
     }
     
     // make sure we have storage id set - if not - generate new one
-    createStorageID();
+    createStorageID(datanode.getPort());
     
     // 3. Update all storages. Some of them might have just been formatted.
     this.writeAll();
@@ -210,16 +209,17 @@
   /**
    * recoverTransitionRead for a specific block pool
    * 
+   * @param datanode DataNode
    * @param bpID Block pool Id
    * @param nsInfo Namespace info of namenode corresponding to the block pool
    * @param dataDirs Storage directories
    * @param startOpt startup option
    * @throws IOException on error
    */
-  void recoverTransitionRead(String bpID, NamespaceInfo nsInfo,
+  void recoverTransitionRead(DataNode datanode, String bpID, NamespaceInfo nsInfo,
       Collection<File> dataDirs, StartupOption startOpt) throws IOException {
     // First ensure datanode level format/snapshot/rollback is completed
-    recoverTransitionRead(nsInfo, dataDirs, startOpt);
+    recoverTransitionRead(datanode, nsInfo, dataDirs, startOpt);
     
     // Create list of storage directories for the block pool
     Collection<File> bpDataDirs = new ArrayList<File>();
@@ -234,7 +234,7 @@
     BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(
         nsInfo.getNamespaceID(), bpID, nsInfo.getCTime(), nsInfo.getClusterID());
     
-    bpStorage.recoverTransitionRead(nsInfo, bpDataDirs, startOpt);
+    bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt);
     addBlockPoolStorage(bpID, bpStorage);
   }
 
@@ -358,12 +358,14 @@
    * Upgrade if this.LV > LAYOUT_VERSION || this.cTime < namenode.cTime
    * Regular startup if this.LV = LAYOUT_VERSION && this.cTime = namenode.cTime
    * 
+   * @param datanode Datanode to which this storage belongs to
    * @param sd  storage directory
    * @param nsInfo  namespace info
    * @param startOpt  startup option
    * @throws IOException
    */
-  private void doTransition( StorageDirectory sd, 
+  private void doTransition( DataNode datanode,
+                             StorageDirectory sd, 
                              NamespaceInfo nsInfo, 
                              StartupOption startOpt
                              ) throws IOException {
@@ -399,7 +401,10 @@
         && this.cTime == nsInfo.getCTime())
       return; // regular startup
     // verify necessity of a distributed upgrade
-    verifyDistributedUpgradeProgress(nsInfo);
+    UpgradeManagerDatanode um = 
+      datanode.getUpgradeManagerDatanode(nsInfo.getBlockPoolID());
+    verifyDistributedUpgradeProgress(um, nsInfo);
+    
     // do upgrade
     if (this.layoutVersion > FSConstants.LAYOUT_VERSION
         || this.cTime < nsInfo.getCTime()) {
@@ -729,11 +734,9 @@
     }
   }
 
-  private void verifyDistributedUpgradeProgress(
+  private void verifyDistributedUpgradeProgress(UpgradeManagerDatanode um,
                   NamespaceInfo nsInfo
                 ) throws IOException {
-    UpgradeManagerDatanode um = 
-      DataNode.getUpgradeManagerDatanode(nsInfo.getBlockPoolID());
     assert um != null : "DataNode.upgradeManager is null.";
     um.setUpgradeState(false, getLayoutVersion());
     um.initializeUpgrade(nsInfo);
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
index 9b38a5f..68eac2c 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
@@ -57,6 +57,7 @@
   private static final Log LOG = LogFactory.getLog(DirectoryScanner.class);
   private static final int DEFAULT_SCAN_INTERVAL = 21600;
 
+  private final DataNode datanode;
   private final FSDataset dataset;
   private final ExecutorService reportCompileThreadPool;
   private final ScheduledExecutorService masterThread;
@@ -221,7 +222,8 @@
     }
   }
 
-  DirectoryScanner(FSDataset dataset, Configuration conf) {
+  DirectoryScanner(DataNode dn, FSDataset dataset, Configuration conf) {
+    this.datanode = dn;
     this.dataset = dataset;
     int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
         DEFAULT_SCAN_INTERVAL);
@@ -273,7 +275,7 @@
       String[] bpids = dataset.getBPIdlist();
       for(String bpid : bpids) {
         UpgradeManagerDatanode um = 
-          DataNode.getUpgradeManagerDatanode(bpid);
+          datanode.getUpgradeManagerDatanode(bpid);
         if (um != null && !um.isUpgradeCompleted()) {
           //If distributed upgrades underway, exit and wait for next cycle.
           LOG.warn("this cycle terminating immediately because Distributed Upgrade is in process");
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java b/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
index 41b4797..9faa486 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
@@ -1118,6 +1118,7 @@
     return f;
   }
     
+  private final DataNode datanode;
   final FSVolumeSet volumes;
   private final int maxBlocksPerDir;
   final ReplicasMap volumeMap;
@@ -1133,7 +1134,9 @@
   /**
    * An FSDataset has a directory where it loads its data files.
    */
-  public FSDataset(DataStorage storage, Configuration conf) throws IOException {
+  public FSDataset(DataNode datanode, DataStorage storage, Configuration conf)
+      throws IOException {
+    this.datanode = datanode;
     this.maxBlocksPerDir = 
       conf.getInt(DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_KEY,
                   DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_DEFAULT);
@@ -2000,7 +2003,6 @@
         return f;
    
       // if file is not null, but doesn't exist - possibly disk failed
-      DataNode datanode = DataNode.getDataNode();
       datanode.checkDiskError();
     }
     
@@ -2246,7 +2248,6 @@
    */
   public void checkAndUpdate(String bpid, long blockId, File diskFile,
       File diskMetaFile, FSVolume vol) {
-    DataNode datanode = DataNode.getDataNode();
     Block corruptBlock = null;
     ReplicaInfo memBlockInfo;
     synchronized (this) {
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java b/src/java/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java
index b6df950..091cdaa 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java
@@ -125,7 +125,7 @@
     // Complete the upgrade by calling the manager method
     try {
       UpgradeManagerDatanode upgradeManager = 
-        DataNode.getUpgradeManagerDatanode(bpid);
+        dataNode.getUpgradeManagerDatanode(bpid);
       if(upgradeManager != null)
         upgradeManager.completeUpgrade();
     } catch(IOException e) {
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
index ae7caae..10fb1e4 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
@@ -89,8 +89,10 @@
       final XMLOutputter xml = new XMLOutputter(out, "UTF-8");
       xml.declaration();
 
+      final ServletContext context = getServletContext();
+      final DataNode datanode = (DataNode) context.getAttribute("datanode");
       final Configuration conf = 
-        new HdfsConfiguration(DataNode.getDataNode().getConf());
+        new HdfsConfiguration(datanode.getConf());
       final int socketTimeout = conf.getInt(
           DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
           HdfsConstants.READ_TIMEOUT);
@@ -99,7 +101,7 @@
       
       try {
         final DFSClient dfs = DatanodeJspHelper.getDFSClient(request, 
-            DataNode.getDataNode(), conf, getUGI(request, conf));
+            datanode, conf, getUGI(request, conf));
         final ClientProtocol nnproxy = dfs.getNamenode();
         final MD5MD5CRC32FileChecksum checksum = DFSClient.getFileChecksum(
             filename, nnproxy, socketFactory, socketTimeout);
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java b/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java
index 0049281..f447ea7 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java
@@ -20,10 +20,10 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintWriter;
-import java.net.InetSocketAddress;
 import java.util.Enumeration;
 import java.util.List;
 
+import javax.servlet.ServletContext;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
@@ -46,14 +46,14 @@
 
   public static final String CONTENT_LENGTH = "Content-Length";
 
-  static DataNode datanode = DataNode.getDataNode();
-  
   /** getting a client for connecting to dfs */
   protected DFSClient getDFSClient(HttpServletRequest request)
       throws IOException, InterruptedException {
     final Configuration conf =
       (Configuration) getServletContext().getAttribute(JspHelper.CURRENT_CONF);
     UserGroupInformation ugi = getUGI(request, conf);
+    final ServletContext context = getServletContext();
+    final DataNode datanode = (DataNode) context.getAttribute("datanode");
     return DatanodeJspHelper.getDFSClient(request, datanode, conf, ugi);
   }
   
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
index a783f92..85eb8fe 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
@@ -231,7 +231,8 @@
       fds = (FSDataset) cluster.getDataNodes().get(0).getFSDataset();
       CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
                   parallelism);
-      scanner = new DirectoryScanner(fds, CONF);
+      DataNode dn = cluster.getDataNodes().get(0);
+      scanner = new DirectoryScanner(dn, fds, CONF);
       scanner.setRetainDiffs(true);
 
       // Add files with 100 blocks